diffusers-源码解析-四十七-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(四十七)

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_latent_upscale.py

# 版权声明,说明该文件的版权所有者及其保留的权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版的规定使用此文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 您不得在未遵守许可证的情况下使用此文件
# you may not use this file except in compliance with the License.
# 可在以下网址获取许可证副本
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有约定,软件按“原样”提供
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 无任何形式的保证或条件,无论明示或暗示
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 请查看许可证以获取特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings  # 导入警告模块,用于发出警告信息
from typing import Callable, List, Optional, Union  # 导入类型提示工具

import numpy as np  # 导入 NumPy 库以支持数组操作
import PIL.Image  # 导入 PIL 库用于图像处理
import torch  # 导入 PyTorch 库用于深度学习操作
import torch.nn.functional as F  # 导入 PyTorch 的功能性神经网络模块
from transformers import CLIPTextModel, CLIPTokenizer  # 从 transformers 库导入 CLIP 模型和分词器

from ...image_processor import PipelineImageInput, VaeImageProcessor  # 从本地模块导入图像处理相关类
from ...loaders import FromSingleFileMixin  # 从本地模块导入单文件加载器
from ...models import AutoencoderKL, UNet2DConditionModel  # 从本地模块导入模型
from ...schedulers import EulerDiscreteScheduler  # 从本地模块导入调度器
from ...utils import deprecate, logging  # 从本地模块导入工具函数和日志记录
from ...utils.torch_utils import randn_tensor  # 从本地模块导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin  # 从管道工具导入相关类


logger = logging.get_logger(__name__)  # 创建一个日志记录器,用于记录信息,禁用 pylint 的无效名称警告


# 预处理函数,用于处理输入图像
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.preprocess
def preprocess(image):
    # 发出关于该方法已弃用的警告,提示将来版本会移除
    warnings.warn(
        "The preprocess method is deprecated and will be removed in a future version. Please"
        " use VaeImageProcessor.preprocess instead",
        FutureWarning,
    )
    # 如果输入是张量,直接返回
    if isinstance(image, torch.Tensor):
        return image
    # 如果输入是 PIL 图像,将其包装成列表
    elif isinstance(image, PIL.Image.Image):
        image = [image]

    # 检查列表中的第一个元素是否为 PIL 图像
    if isinstance(image[0], PIL.Image.Image):
        # 获取图像的宽度和高度
        w, h = image[0].size
        # 将宽度和高度调整为 64 的整数倍
        w, h = (x - x % 64 for x in (w, h))  # resize to integer multiple of 64

        # 将图像调整为新的大小并转换为数组格式
        image = [np.array(i.resize((w, h)))[None, :] for i in image]
        # 沿第一个轴连接图像数组
        image = np.concatenate(image, axis=0)
        # 归一化图像数据并转换数据类型
        image = np.array(image).astype(np.float32) / 255.0
        # 调整数组维度顺序
        image = image.transpose(0, 3, 1, 2)
        # 将图像数据从 [0, 1] 范围缩放到 [-1, 1] 范围
        image = 2.0 * image - 1.0
        # 将 NumPy 数组转换为 PyTorch 张量
        image = torch.from_numpy(image)
    # 如果列表中的第一个元素是张量,沿着第 0 维度连接它们
    elif isinstance(image[0], torch.Tensor):
        image = torch.cat(image, dim=0)
    # 返回处理后的图像
    return image


class StableDiffusionLatentUpscalePipeline(DiffusionPipeline, StableDiffusionMixin, FromSingleFileMixin):
    r"""
    用于将 Stable Diffusion 输出图像分辨率按 2 倍放大的管道。

    该模型继承自 [`DiffusionPipeline`]. 有关所有管道通用方法(下载、保存、在特定设备上运行等)的文档,请查看超类文档
    The pipeline also inherits the following loading methods:
        - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
    # 定义参数说明
    Args:
        vae ([`AutoencoderKL`]):
            # 用于编码和解码图像的变分自编码器(VAE)模型,处理图像和潜在表示之间的转换
        text_encoder ([`~transformers.CLIPTextModel`]):
            # 冻结的文本编码器,使用 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 模型
        tokenizer ([`~transformers.CLIPTokenizer`]):
            # 一个 `CLIPTokenizer` 用于对文本进行分词
        unet ([`UNet2DConditionModel`]):
            # 用于对编码后的图像潜在表示进行去噪的 `UNet2DConditionModel`
        scheduler ([`SchedulerMixin`]):
            # 一个 [`EulerDiscreteScheduler`],与 `unet` 配合使用以去噪编码后的图像潜在表示
    """

    # 定义模型的计算顺序
    model_cpu_offload_seq = "text_encoder->unet->vae"

    def __init__(
        # 初始化方法,接受多个模型作为参数
        self,
        vae: AutoencoderKL,
        text_encoder: CLIPTextModel,
        tokenizer: CLIPTokenizer,
        unet: UNet2DConditionModel,
        scheduler: EulerDiscreteScheduler,
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 注册模块,将传入的模型进行存储
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            scheduler=scheduler,
        )
        # 计算 VAE 的缩放因子,基于配置中的块输出通道数量
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建图像处理器,用于处理解码后的图像
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, resample="bicubic")

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的方法,用于解码潜在表示
    def decode_latents(self, latents):
        # 设置弃用信息,提示用户该方法将在1.0.0版本中移除
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)

        # 根据 VAE 的缩放因子调整潜在表示的值
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在表示,返回图像
        image = self.vae.decode(latents, return_dict=False)[0]
        # 将解码后的图像归一化到[0, 1]范围
        image = (image / 2 + 0.5).clamp(0, 1)
        # 转换图像数据为 float32 格式,以确保与 bfloat16 兼容,并避免显著开销
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image
    # 检查输入参数的有效性
    def check_inputs(self, prompt, image, callback_steps):
        # 如果 `prompt` 不是字符串或列表类型,则引发错误
        if not isinstance(prompt, str) and not isinstance(prompt, list):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 如果 `image` 不是张量、PIL图像或列表类型,则引发错误
        if (
            not isinstance(image, torch.Tensor)
            and not isinstance(image, PIL.Image.Image)
            and not isinstance(image, list)
        ):
            raise ValueError(
                f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or `list` but is {type(image)}"
            )

        # 验证如果 `image` 是列表或张量,则 `prompt` 和 `image` 的批量大小应相同
        if isinstance(image, (list, torch.Tensor)):
            # 如果 `prompt` 是字符串,批量大小为1
            if isinstance(prompt, str):
                batch_size = 1
            else:
                # 否则,批量大小为 `prompt` 的长度
                batch_size = len(prompt)
            # 如果 `image` 是列表,获取其批量大小
            if isinstance(image, list):
                image_batch_size = len(image)
            else:
                # 否则,获取 `image` 的第一维度大小(批量大小)
                image_batch_size = image.shape[0] if image.ndim == 4 else 1
            # 如果 `prompt` 和 `image` 的批量大小不匹配,则引发错误
            if batch_size != image_batch_size:
                raise ValueError(
                    f"`prompt` has batch size {batch_size} and `image` has batch size {image_batch_size}."
                    " Please make sure that passed `prompt` matches the batch size of `image`."
                )

        # 检查 `callback_steps` 是否为正整数
        if (callback_steps is None) or (
            callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
        ):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale 中复制的方法
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义潜在变量的形状
        shape = (batch_size, num_channels_latents, height, width)
        # 如果没有提供 `latents`,则生成随机的潜在变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果提供的 `latents` 形状不匹配,则引发错误
            if latents.shape != shape:
                raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
            # 将 `latents` 移动到指定设备
            latents = latents.to(device)

        # 按调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    # 在计算图中不追踪梯度
    @torch.no_grad()
    # 定义一个可调用的类方法,接收多个参数用于生成图像
        def __call__(
            self,
            # 用户输入的提示文本,可以是字符串或字符串列表
            prompt: Union[str, List[str]],
            # 输入的图像,默认为 None
            image: PipelineImageInput = None,
            # 推理步骤的数量,默认为 75
            num_inference_steps: int = 75,
            # 指导缩放因子,默认为 9.0
            guidance_scale: float = 9.0,
            # 负提示文本,可以是字符串或字符串列表,默认为 None
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 随机数生成器,可以是单个或多个 torch.Generator,默认为 None
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 潜在变量,默认为 None
            latents: Optional[torch.Tensor] = None,
            # 输出类型,默认为 "pil" (Python Imaging Library)
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认为 True
            return_dict: bool = True,
            # 回调函数,用于在推理过程中处理某些操作,默认为 None
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
            # 回调函数调用的步数间隔,默认为 1
            callback_steps: int = 1,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_upscale.py

# 版权声明,表明该代码的版权所有者及相关权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版("许可证")授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,软件
# 在许可证下分发是以“原样”基础提供的,
# 不附带任何明示或暗示的担保或条件。
# 请参阅许可证以了解有关权限和
# 限制的具体信息。

# 导入 inspect 模块,用于获取对象的信息
import inspect
# 导入 warnings 模块,用于发出警告
import warnings
# 从 typing 模块导入类型注解支持
from typing import Any, Callable, Dict, List, Optional, Union

# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 PIL 库中的 Image 类,用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 CLIP 相关类
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer

# 从相对路径导入其他处理器和加载器
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import FromSingleFileMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
from ...models import AutoencoderKL, UNet2DConditionModel
# 从注意力处理器模块导入相关类
from ...models.attention_processor import (
    AttnProcessor2_0,
    XFormersAttnProcessor,
)
# 从 Lora 模块导入调整 Lora 规模的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 从调度器模块导入相关类
from ...schedulers import DDPMScheduler, KarrasDiffusionSchedulers
# 从工具模块导入各种工具函数
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
# 从 Torch 工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入相关类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从当前包导入稳定扩散管道输出
from . import StableDiffusionPipelineOutput

# 创建日志记录器,用于记录当前模块的日志
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name


# 定义预处理函数,处理输入图像
def preprocess(image):
    # 发出警告,提示该方法已弃用,未来版本将被移除
    warnings.warn(
        "The preprocess method is deprecated and will be removed in a future version. Please"
        " use VaeImageProcessor.preprocess instead",
        FutureWarning,
    )
    # 如果输入是张量,直接返回
    if isinstance(image, torch.Tensor):
        return image
    # 如果输入是 PIL 图像,转换为列表
    elif isinstance(image, PIL.Image.Image):
        image = [image]

    # 检查列表中第一个元素是否为 PIL 图像
    if isinstance(image[0], PIL.Image.Image):
        # 获取图像的宽和高
        w, h = image[0].size
        # 将宽高调整为 64 的整数倍
        w, h = (x - x % 64 for x in (w, h))  # resize to integer multiple of 64

        # 调整所有图像的大小并转换为 NumPy 数组
        image = [np.array(i.resize((w, h)))[None, :] for i in image]
        # 将所有图像数组沿第一个轴连接
        image = np.concatenate(image, axis=0)
        # 将数组转换为浮点型并归一化到 [0, 1]
        image = np.array(image).astype(np.float32) / 255.0
        # 调整数组维度顺序为 (N, C, H, W)
        image = image.transpose(0, 3, 1, 2)
        # 将像素值范围从 [0, 1] 映射到 [-1, 1]
        image = 2.0 * image - 1.0
        # 将 NumPy 数组转换为 PyTorch 张量
        image = torch.from_numpy(image)
    # 如果列表中第一个元素是张量,沿着第一个维度连接它们
    elif isinstance(image[0], torch.Tensor):
        image = torch.cat(image, dim=0)
    # 返回处理后的图像
    return image


# 定义 StableDiffusionUpscalePipeline 类,继承多个基类
class StableDiffusionUpscalePipeline(
    DiffusionPipeline,
    StableDiffusionMixin,
    TextualInversionLoaderMixin,
    StableDiffusionLoraLoaderMixin,
    FromSingleFileMixin,
):
    r"""
    使用 Stable Diffusion 2 进行文本引导的图像超分辨率的管道。

    此模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道通用方法的实现(下载、保存、在特定设备上运行等)。
    # 管道还继承了以下加载方法:
        # - `~loaders.TextualInversionLoaderMixin.load_textual_inversion` 用于加载文本反转嵌入
        # - `~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights` 用于加载 LoRA 权重
        # - `~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights` 用于保存 LoRA 权重
        # - `~loaders.FromSingleFileMixin.from_single_file` 用于加载 `.ckpt` 文件

    # 参数说明
    Args:
        vae ([`AutoencoderKL`]):  # 变分自编码器(VAE)模型,用于将图像编码和解码为潜在表示
            Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
        text_encoder ([`~transformers.CLIPTextModel`]):  # 冻结的文本编码器
            Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        tokenizer ([`~transformers.CLIPTokenizer`]):  # 用于标记化文本的 CLIPTokenizer
            A `CLIPTokenizer` to tokenize text.
        unet ([`UNet2DConditionModel`]):  # 用于去噪编码图像潜在的 UNet2DConditionModel
            A `UNet2DConditionModel` to denoise the encoded image latents.
        low_res_scheduler ([`SchedulerMixin`]):  # 用于向低分辨率条件图像添加初始噪声的调度器,必须是 DDPMScheduler 的实例
            A scheduler used to add initial noise to the low resolution conditioning image. It must be an instance of
            [`DDPMScheduler`].
        scheduler ([`SchedulerMixin`]):  # 与 UNet 一起使用的调度器,用于去噪编码图像潜在
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
    """

    # 定义 CPU 离线加载的模型顺序
    model_cpu_offload_seq = "text_encoder->unet->vae"
    # 定义可选组件的列表
    _optional_components = ["watermarker", "safety_checker", "feature_extractor"]
    # 定义不包含在 CPU 离线加载中的组件
    _exclude_from_cpu_offload = ["safety_checker"]

    # 初始化方法
    def __init__(  # 定义初始化方法
        self,
        vae: AutoencoderKL,  # 接收变分自编码器(VAE)模型
        text_encoder: CLIPTextModel,  # 接收文本编码器
        tokenizer: CLIPTokenizer,  # 接收标记化器
        unet: UNet2DConditionModel,  # 接收 UNet 模型
        low_res_scheduler: DDPMScheduler,  # 接收低分辨率调度器
        scheduler: KarrasDiffusionSchedulers,  # 接收调度器
        safety_checker: Optional[Any] = None,  # 可选的安全检查器
        feature_extractor: Optional[CLIPImageProcessor] = None,  # 可选的特征提取器
        watermarker: Optional[Any] = None,  # 可选的水印处理器
        max_noise_level: int = 350,  # 最大噪声级别,默认为350
    ):
        # 初始化父类
        super().__init__()

        # 检查 VAE 是否有配置属性
        if hasattr(
            vae, "config"
        ):  # 检查 VAE 是否具有配置属性 `scaling_factor`,如果未设置为 0.08333,则设置为 0.08333 并发出弃用警告
            # 确定 `scaling_factor` 是否已设置为 0.08333
            is_vae_scaling_factor_set_to_0_08333 = (
                hasattr(vae.config, "scaling_factor") and vae.config.scaling_factor == 0.08333
            )
            # 如果 `scaling_factor` 未设置为 0.08333,则执行以下操作
            if not is_vae_scaling_factor_set_to_0_08333:
                # 创建弃用消息,说明配置问题及建议
                deprecation_message = (
                    "The configuration file of the vae does not contain `scaling_factor` or it is set to"
                    f" {vae.config.scaling_factor}, which seems highly unlikely. If your checkpoint is a fine-tuned"
                    " version of `stabilityai/stable-diffusion-x4-upscaler` you should change 'scaling_factor' to"
                    " 0.08333 Please make sure to update the config accordingly, as not doing so might lead to"
                    " incorrect results in future versions. If you have downloaded this checkpoint from the Hugging"
                    " Face Hub, it would be very nice if you could open a Pull Request for the `vae/config.json` file"
                )
                # 记录弃用警告,并更新 `scaling_factor` 为 0.08333
                deprecate("wrong scaling_factor", "1.0.0", deprecation_message, standard_warn=False)
                vae.register_to_config(scaling_factor=0.08333)

        # 注册各个模块到当前配置中
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            low_res_scheduler=low_res_scheduler,
            scheduler=scheduler,
            safety_checker=safety_checker,
            watermarker=watermarker,
            feature_extractor=feature_extractor,
        )
        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建 VAE 图像处理器实例,使用双三次插值法
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, resample="bicubic")
        # 将最大噪声水平注册到配置中
        self.register_to_config(max_noise_level=max_noise_level)

    def run_safety_checker(self, image, device, dtype):
        # 如果存在安全检查器,则执行安全检查
        if self.safety_checker is not None:
            # 对输入图像进行后处理以适配安全检查器
            feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
            # 将后处理后的图像转换为张量,并移动到指定设备
            safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
            # 进行安全检查,获取处理后的图像及检测结果
            image, nsfw_detected, watermark_detected = self.safety_checker(
                images=image,
                clip_input=safety_checker_input.pixel_values.to(dtype=dtype),
            )
        else:
            # 如果没有安全检查器,则将检测结果设置为 None
            nsfw_detected = None
            watermark_detected = None

            # 如果存在 UNet 的卸载钩子,则执行卸载操作
            if hasattr(self, "unet_offload_hook") and self.unet_offload_hook is not None:
                self.unet_offload_hook.offload()

        # 返回处理后的图像及检测结果
        return image, nsfw_detected, watermark_detected

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的代码
    # 定义一个私有方法用于编码提示信息
    def _encode_prompt(
        self,  # 方法的第一个参数,表示实例本身
        prompt,  # 要编码的提示信息
        device,  # 设备(如 CPU 或 GPU)
        num_images_per_prompt,  # 每个提示生成的图像数量
        do_classifier_free_guidance,  # 是否使用无分类器引导
        negative_prompt=None,  # 可选的负面提示信息
        prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
        lora_scale: Optional[float] = None,  # 可选的 LoRA 缩放因子
        **kwargs,  # 其他可选参数
    ):
        # 警告信息,表示此方法已被弃用,未来版本将删除
        deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
        # 调用 deprecate 方法记录弃用信息
        deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)

        # 调用 encode_prompt 方法编码提示,返回一个元组
        prompt_embeds_tuple = self.encode_prompt(
            prompt=prompt,  # 传递提示信息
            device=device,  # 传递设备
            num_images_per_prompt=num_images_per_prompt,  # 传递每个提示的图像数量
            do_classifier_free_guidance=do_classifier_free_guidance,  # 传递无分类器引导标志
            negative_prompt=negative_prompt,  # 传递负面提示
            prompt_embeds=prompt_embeds,  # 传递提示嵌入
            negative_prompt_embeds=negative_prompt_embeds,  # 传递负面提示嵌入
            lora_scale=lora_scale,  # 传递 LoRA 缩放因子
            **kwargs,  # 传递其他参数
        )

        # 为向后兼容,将元组中的提示嵌入连接在一起
        prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])

        # 返回连接后的提示嵌入
        return prompt_embeds

    # 从 StableDiffusionPipeline 类中复制的编码提示方法
    def encode_prompt(
        self,  # 方法的第一个参数,表示实例本身
        prompt,  # 要编码的提示信息
        device,  # 设备(如 CPU 或 GPU)
        num_images_per_prompt,  # 每个提示生成的图像数量
        do_classifier_free_guidance,  # 是否使用无分类器引导
        negative_prompt=None,  # 可选的负面提示信息
        prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
        lora_scale: Optional[float] = None,  # 可选的 LoRA 缩放因子
        clip_skip: Optional[int] = None,  # 可选的剪切跳过参数
    # 从 StableDiffusionPipeline 类中复制的准备额外步骤参数的方法
    def prepare_extra_step_kwargs(self, generator, eta):  # 准备额外的调度器步骤参数
        # 为调度器步骤准备额外的参数,因为并非所有调度器的签名相同
        # eta (η) 仅在 DDIMScheduler 中使用,对其他调度器将被忽略
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 应在 [0, 1] 之间

        # 检查调度器步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}  # 初始化一个空字典以存储额外参数
        if accepts_eta:  # 如果接受 eta
            extra_step_kwargs["eta"] = eta  # 将 eta 添加到字典中

        # 检查调度器步骤是否接受生成器参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:  # 如果接受生成器
            extra_step_kwargs["generator"] = generator  # 将生成器添加到字典中
        # 返回准备好的额外步骤参数
        return extra_step_kwargs

    # 从 StableDiffusionPipeline 类中复制的解码潜在变量的方法
    # 解码潜在表示
    def decode_latents(self, latents):
        # 定义一个弃用提示消息,告知用户该方法即将被移除
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        # 调用弃用函数,记录使用该方法的警告信息
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
        # 根据 VAE 的缩放因子调整潜在表示的值
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在表示,返回的第一项为解码后的图像
        image = self.vae.decode(latents, return_dict=False)[0]
        # 将图像值缩放到 [0, 1] 范围并进行裁剪
        image = (image / 2 + 0.5).clamp(0, 1)
        # 将图像转换为 float32 格式,以提高兼容性
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image
    
    # 检查输入参数的有效性
    def check_inputs(
        self,
        prompt,  # 输入提示
        image,  # 输入图像
        noise_level,  # 噪声水平
        callback_steps,  # 回调步骤
        negative_prompt=None,  # 可选的负面提示
        prompt_embeds=None,  # 可选的提示嵌入
        negative_prompt_embeds=None,  # 可选的负面提示嵌入
    )
    
    # 准备潜在表示
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义潜在表示的形状
        shape = (batch_size, num_channels_latents, height, width)
        # 如果没有提供潜在表示,随机生成
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果提供的潜在表示形状不匹配,则引发错误
            if latents.shape != shape:
                raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
            # 将潜在表示移动到指定设备
            latents = latents.to(device)
    
        # 按调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在表示
        return latents
    
    # 升级 VAE 的数据类型
    def upcast_vae(self):
        # 获取 VAE 的当前数据类型
        dtype = self.vae.dtype
        # 将 VAE 转换为 float32 数据类型
        self.vae.to(dtype=torch.float32)
        # 检查是否使用了特定的注意力处理器
        use_torch_2_0_or_xformers = isinstance(
            self.vae.decoder.mid_block.attentions[0].processor,
            (
                AttnProcessor2_0,
                XFormersAttnProcessor,
            ),
        )
        # 如果使用 xformers 或 torch_2_0,则注意力块不需要是 float32,从而节省内存
        if use_torch_2_0_or_xformers:
            # 将后量化卷积层转换为相同数据类型
            self.vae.post_quant_conv.to(dtype)
            # 将解码器输入卷积层转换为相同数据类型
            self.vae.decoder.conv_in.to(dtype)
            # 将解码器中间块转换为相同数据类型
            self.vae.decoder.mid_block.to(dtype)
    
    # 关闭梯度计算以节省内存
    @torch.no_grad()
    # 定义一个可调用的方法,允许传入多个参数进行处理
        def __call__(
            # 用户输入的提示信息,可以是字符串或字符串列表
            self,
            prompt: Union[str, List[str]] = None,
            # 输入的图像数据,类型为 PipelineImageInput
            image: PipelineImageInput = None,
            # 推理步骤的数量,默认值为 75
            num_inference_steps: int = 75,
            # 指导比例,用于控制生成图像的样式,默认值为 9.0
            guidance_scale: float = 9.0,
            # 噪声级别,影响生成图像的随机性,默认值为 20
            noise_level: int = 20,
            # 负提示信息,可以是字符串或字符串列表,控制生成图像的方向
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: Optional[int] = 1,
            # 额外的参数,影响生成过程,默认为 0.0
            eta: float = 0.0,
            # 随机数生成器,可以是单个或多个 torch.Generator
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 预先计算的潜在表示,类型为 torch.Tensor
            latents: Optional[torch.Tensor] = None,
            # 预先计算的提示嵌入,类型为 torch.Tensor
            prompt_embeds: Optional[torch.Tensor] = None,
            # 预先计算的负提示嵌入,类型为 torch.Tensor
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 输出类型,默认为 "pil",表示以 PIL 格式返回
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认为 True
            return_dict: bool = True,
            # 回调函数,可用于处理生成过程中的状态,返回 None
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
            # 回调函数的调用频率,默认为每一步 1 次
            callback_steps: int = 1,
            # 跨注意力的额外参数,可以为字典类型
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 指定跳过的剪切层级,默认为 None
            clip_skip: int = None,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_unclip.py

# 版权信息,表示此文件的版权归 HuggingFace 团队所有
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可证发布的声明
# Licensed under the Apache License, Version 2.0 (the "License");
# 只有在遵循许可证的情况下才能使用此文件
# you may not use this file except in compliance with the License.
# 可在以下链接获取许可证副本
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律适用或书面同意,否则本软件按 "现状" 方式分发
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 不提供任何形式的担保或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 详见许可证中对权限和限制的具体规定
# See the License for the specific language governing permissions and
# limitations under the License.

# 导入 inspect 模块,用于检查对象的属性和方法
import inspect
# 从 typing 模块导入各种类型注解,方便类型检查
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 PyTorch 库
import torch
# 导入 CLIP 文本模型和分词器
from transformers import CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer
# 从 CLIP 模型中导入文本模型输出的类型
from transformers.models.clip.modeling_clip import CLIPTextModelOutput

# 导入图像处理器
from ...image_processor import VaeImageProcessor
# 导入加载器的混合类,用于处理特定加载逻辑
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入自动编码器和其他模型
from ...models import AutoencoderKL, PriorTransformer, UNet2DConditionModel
# 导入时间步嵌入函数
from ...models.embeddings import get_timestep_embedding
# 导入 LoRA 相关的调整函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 导入工具函数,处理各种实用功能
from ...utils import (
    USE_PEFT_BACKEND,
    deprecate,
    logging,
    replace_example_docstring,
    scale_lora_layers,
    unscale_lora_layers,
)
# 导入用于生成随机张量的函数
from ...utils.torch_utils import randn_tensor
# 导入扩散管道相关类
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin
# 导入稳定反向图像归一化器
from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer

# 创建日志记录器,记录该模块的日志信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,展示如何使用 StableUnCLIPPipeline
EXAMPLE_DOC_STRING = """
    Examples:
        ```py
        >>> import torch
        >>> from diffusers import StableUnCLIPPipeline

        >>> pipe = StableUnCLIPPipeline.from_pretrained(
        ...     "fusing/stable-unclip-2-1-l", torch_dtype=torch.float16
        ... )  # TODO update model path
        >>> pipe = pipe.to("cuda")

        >>> prompt = "a photo of an astronaut riding a horse on mars"
        >>> images = pipe(prompt).images
        >>> images[0].save("astronaut_horse.png")
        ```py
"""

# 定义 StableUnCLIPPipeline 类,继承多个混合类
class StableUnCLIPPipeline(
    DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin
):
    """
    使用稳定反向 CLIP 的文本到图像生成管道。

    此模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道实现的通用方法
    (下载、保存、在特定设备上运行等)。

    该管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
    # 定义函数参数
    Args:
        prior_tokenizer ([`CLIPTokenizer`]):  # 指定用于文本的 CLIP 标记器
            A [`CLIPTokenizer`].  # 说明这是一个 CLIP 标记器
        prior_text_encoder ([`CLIPTextModelWithProjection`]):  # 指定冻结的 CLIP 文本编码器
            Frozen [`CLIPTextModelWithProjection`] text-encoder.  # 说明这是一个冻结的文本编码器
        prior ([`PriorTransformer`]):  # 指定用于图像嵌入的 unCLIP 先验模型
            The canonical unCLIP prior to approximate the image embedding from the text embedding.  # 说明这是一个标准的 unCLIP 先验,用于从文本嵌入近似图像嵌入
        prior_scheduler ([`KarrasDiffusionSchedulers`]):  # 指定用于去噪过程的调度器
            Scheduler used in the prior denoising process.  # 说明这是在先验去噪过程中使用的调度器
        image_normalizer ([`StableUnCLIPImageNormalizer`]):  # 指定用于标准化图像嵌入的标准化器
            Used to normalize the predicted image embeddings before the noise is applied and un-normalize the image  # 说明用于在应用噪声之前标准化预测的图像嵌入,并在应用噪声后反标准化图像嵌入
            embeddings after the noise has been applied.  # 继续说明标准化的过程
        image_noising_scheduler ([`KarrasDiffusionSchedulers`]):  # 指定用于添加噪声的调度器
            Noise schedule for adding noise to the predicted image embeddings. The amount of noise to add is determined  # 说明这是用于对预测的图像嵌入添加噪声的调度器,噪声量由 `noise_level` 决定
            by the `noise_level`.  # 说明噪声量的确定依据
        tokenizer ([`CLIPTokenizer`]):  # 指定用于文本的 CLIP 标记器
            A [`CLIPTokenizer`].  # 说明这是一个 CLIP 标记器
        text_encoder ([`CLIPTextModel`]):  # 指定冻结的 CLIP 文本编码器
            Frozen [`CLIPTextModel`] text-encoder.  # 说明这是一个冻结的文本编码器
        unet ([`UNet2DConditionModel`]):  # 指定用于去噪的 UNet 模型
            A [`UNet2DConditionModel`] to denoise the encoded image latents.  # 说明这是一个用于去噪编码图像潜变量的 UNet 模型
        scheduler ([`KarrasDiffusionSchedulers`]):  # 指定与 UNet 结合使用的调度器
            A scheduler to be used in combination with `unet` to denoise the encoded image latents.  # 说明这是与 UNet 结合使用的去噪调度器
        vae ([`AutoencoderKL`]):  # 指定变分自编码器模型
            Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.  # 说明这是一个变分自编码器模型,用于将图像编码和解码为潜在表示
    """  # 结束文档字符串

    _exclude_from_cpu_offload = ["prior", "image_normalizer"]  # 指定不进行 CPU 卸载的组件列表
    model_cpu_offload_seq = "text_encoder->prior_text_encoder->unet->vae"  # 定义模型的 CPU 卸载顺序

    # prior components  # 注释说明以下是先验组件
    prior_tokenizer: CLIPTokenizer  # 声明 prior_tokenizer 为 CLIPTokenizer 类型
    prior_text_encoder: CLIPTextModelWithProjection  # 声明 prior_text_encoder 为 CLIPTextModelWithProjection 类型
    prior: PriorTransformer  # 声明 prior 为 PriorTransformer 类型
    prior_scheduler: KarrasDiffusionSchedulers  # 声明 prior_scheduler 为 KarrasDiffusionSchedulers 类型

    # image noising components  # 注释说明以下是图像噪声组件
    image_normalizer: StableUnCLIPImageNormalizer  # 声明 image_normalizer 为 StableUnCLIPImageNormalizer 类型
    image_noising_scheduler: KarrasDiffusionSchedulers  # 声明 image_noising_scheduler 为 KarrasDiffusionSchedulers 类型

    # regular denoising components  # 注释说明以下是常规去噪组件
    tokenizer: CLIPTokenizer  # 声明 tokenizer 为 CLIPTokenizer 类型
    text_encoder: CLIPTextModel  # 声明 text_encoder 为 CLIPTextModel 类型
    unet: UNet2DConditionModel  # 声明 unet 为 UNet2DConditionModel 类型
    scheduler: KarrasDiffusionSchedulers  # 声明 scheduler 为 KarrasDiffusionSchedulers 类型

    vae: AutoencoderKL  # 声明 vae 为 AutoencoderKL 类型

    def __init__(  # 定义构造函数
        self,  # 指向实例本身
        # prior components  # 注释说明以下是先验组件
        prior_tokenizer: CLIPTokenizer,  # 指定构造函数参数 prior_tokenizer 为 CLIPTokenizer 类型
        prior_text_encoder: CLIPTextModelWithProjection,  # 指定构造函数参数 prior_text_encoder 为 CLIPTextModelWithProjection 类型
        prior: PriorTransformer,  # 指定构造函数参数 prior 为 PriorTransformer 类型
        prior_scheduler: KarrasDiffusionSchedulers,  # 指定构造函数参数 prior_scheduler 为 KarrasDiffusionSchedulers 类型
        # image noising components  # 注释说明以下是图像噪声组件
        image_normalizer: StableUnCLIPImageNormalizer,  # 指定构造函数参数 image_normalizer 为 StableUnCLIPImageNormalizer 类型
        image_noising_scheduler: KarrasDiffusionSchedulers,  # 指定构造函数参数 image_noising_scheduler 为 KarrasDiffusionSchedulers 类型
        # regular denoising components  # 注释说明以下是常规去噪组件
        tokenizer: CLIPTokenizer,  # 指定构造函数参数 tokenizer 为 CLIPTokenizer 类型
        text_encoder: CLIPTextModelWithProjection,  # 指定构造函数参数 text_encoder 为 CLIPTextModelWithProjection 类型
        unet: UNet2DConditionModel,  # 指定构造函数参数 unet 为 UNet2DConditionModel 类型
        scheduler: KarrasDiffusionSchedulers,  # 指定构造函数参数 scheduler 为 KarrasDiffusionSchedulers 类型
        # vae  # 注释说明以下是变分自编码器
        vae: AutoencoderKL,  # 指定构造函数参数 vae 为 AutoencoderKL 类型
    # 结束括号,表示类构造函数的参数列表结束
    ):
        # 调用父类构造函数
        super().__init__()

        # 注册所需模块及其参数
        self.register_modules(
            prior_tokenizer=prior_tokenizer,  # 注册先前的分词器
            prior_text_encoder=prior_text_encoder,  # 注册先前的文本编码器
            prior=prior,  # 注册先前模型
            prior_scheduler=prior_scheduler,  # 注册先前的调度器
            image_normalizer=image_normalizer,  # 注册图像归一化器
            image_noising_scheduler=image_noising_scheduler,  # 注册图像噪声调度器
            tokenizer=tokenizer,  # 注册当前的分词器
            text_encoder=text_encoder,  # 注册当前的文本编码器
            unet=unet,  # 注册 U-Net 模型
            scheduler=scheduler,  # 注册调度器
            vae=vae,  # 注册变分自编码器
        )

        # 计算 VAE 的缩放因子,基于其配置中的输出通道数
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建图像处理器,使用 VAE 缩放因子
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

    # 从 UnCLIPPipeline 复制的方法,调整为处理先前的提示
    def _encode_prior_prompt(
        self,
        prompt,  # 输入提示
        device,  # 设备信息
        num_images_per_prompt,  # 每个提示生成的图像数量
        do_classifier_free_guidance,  # 是否使用无分类器自由引导
        text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None,  # 可选的文本模型输出
        text_attention_mask: Optional[torch.Tensor] = None,  # 可选的文本注意力掩码
    # 从 StableDiffusionPipeline 复制的方法,调整为处理当前的提示
    def _encode_prompt(
        self,
        prompt,  # 输入提示
        device,  # 设备信息
        num_images_per_prompt,  # 每个提示生成的图像数量
        do_classifier_free_guidance,  # 是否使用无分类器自由引导
        negative_prompt=None,  # 可选的负面提示
        prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
        lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
        **kwargs,  # 其他可选参数
    ):
        # 发出弃用警告,提示将来版本中移除此方法
        deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
        deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)

        # 调用新的编码提示方法,生成嵌入元组
        prompt_embeds_tuple = self.encode_prompt(
            prompt=prompt,  # 输入提示
            device=device,  # 设备信息
            num_images_per_prompt=num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance=do_classifier_free_guidance,  # 是否使用无分类器自由引导
            negative_prompt=negative_prompt,  # 负面提示
            prompt_embeds=prompt_embeds,  # 提示嵌入
            negative_prompt_embeds=negative_prompt_embeds,  # 负面提示嵌入
            lora_scale=lora_scale,  # Lora 缩放因子
            **kwargs,  # 其他参数
        )

        # 将嵌入元组的内容连接以兼容旧版本
        prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])

        # 返回合并后的提示嵌入
        return prompt_embeds

    # 从 StableDiffusionPipeline 复制的方法,用于编码提示
    # 定义一个编码提示的函数
        def encode_prompt(
            # 提示内容
            self,
            prompt,
            # 设备信息
            device,
            # 每个提示生成的图像数量
            num_images_per_prompt,
            # 是否进行无分类器自由引导
            do_classifier_free_guidance,
            # 可选的负面提示
            negative_prompt=None,
            # 可选的提示嵌入
            prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的负面提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的 Lora 缩放因子
            lora_scale: Optional[float] = None,
            # 可选的剪辑跳过参数
            clip_skip: Optional[int] = None,
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
        def decode_latents(self, latents):
            # 定义过时消息
            deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
            # 调用 deprecate 函数以显示警告信息
            deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
            # 根据 VAE 配置缩放因子调整潜变量
            latents = 1 / self.vae.config.scaling_factor * latents
            # 解码潜变量生成图像
            image = self.vae.decode(latents, return_dict=False)[0]
            # 将图像数据归一化到 [0, 1] 之间
            image = (image / 2 + 0.5).clamp(0, 1)
            # 将图像转换为 float32 格式以便兼容 bfloat16
            image = image.cpu().permute(0, 2, 3, 1).float().numpy()
            # 返回解码后的图像
            return image
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
        def prepare_prior_extra_step_kwargs(self, generator, eta):
            # 准备用于 prior_scheduler 步骤的额外参数,因为并非所有的 prior_schedulers 具有相同的参数签名
            # eta 仅在 DDIMScheduler 中使用,其他 prior_schedulers 会忽略它
            # eta 在 DDIM 论文中的对应符号为 η: https://arxiv.org/abs/2010.02502
            # eta 的值应在 [0, 1] 之间
    
            # 检查 prior_scheduler 是否接受 eta 参数
            accepts_eta = "eta" in set(inspect.signature(self.prior_scheduler.step).parameters.keys())
            # 初始化额外参数字典
            extra_step_kwargs = {}
            # 如果接受 eta,添加到额外参数字典中
            if accepts_eta:
                extra_step_kwargs["eta"] = eta
    
            # 检查 prior_scheduler 是否接受 generator 参数
            accepts_generator = "generator" in set(inspect.signature(self.prior_scheduler.step).parameters.keys())
            # 如果接受 generator,添加到额外参数字典中
            if accepts_generator:
                extra_step_kwargs["generator"] = generator
            # 返回准备好的额外参数
            return extra_step_kwargs
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
    # 定义准备额外参数的函数,供调度器步骤使用
    def prepare_extra_step_kwargs(self, generator, eta):
        # 准备调度器步骤所需的额外参数,因为不同调度器的参数签名不同
        # eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 其值应在 [0, 1] 之间
    
        # 检查调度器的步骤函数是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化存储额外参数的字典
        extra_step_kwargs = {}
        # 如果接受 eta 参数,则将其添加到额外参数字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta
    
        # 检查调度器的步骤函数是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator 参数,则将其添加到额外参数字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回构建好的额外参数字典
        return extra_step_kwargs
    
    # 定义检查输入参数的函数
    def check_inputs(
        self,
        prompt,  # 输入提示
        height,  # 生成图像的高度
        width,   # 生成图像的宽度
        callback_steps,  # 回调的步数
        noise_level,  # 噪声级别
        negative_prompt=None,  # 可选的负面提示
        prompt_embeds=None,  # 可选的提示嵌入
        negative_prompt_embeds=None,  # 可选的负面提示嵌入
    ):
        # 检查高度和宽度是否为8的倍数,如果不是,则抛出值错误
        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # 检查回调步骤是否为正整数,如果不是,则抛出值错误
        if (callback_steps is None) or (
            callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
        ):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

        # 检查是否同时提供了提示和提示嵌入,如果是,则抛出值错误
        if prompt is not None and prompt_embeds is not None:
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Please make sure to define only one of the two."
            )

        # 检查提示和提示嵌入是否都未定义,如果是,则抛出值错误
        if prompt is None and prompt_embeds is None:
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )

        # 检查提示是否为字符串或列表,如果不是,则抛出值错误
        if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查是否同时提供了负提示和负提示嵌入,如果是,则抛出值错误
        if negative_prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                "Provide either `negative_prompt` or `negative_prompt_embeds`. Cannot leave both `negative_prompt` and `negative_prompt_embeds` undefined."
            )

        # 检查是否同时提供了提示和负提示,如果是,则检查类型是否一致
        if prompt is not None and negative_prompt is not None:
            if type(prompt) is not type(negative_prompt):
                raise TypeError(
                    f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
                    f" {type(prompt)}."
                )

        # 检查提示嵌入和负提示嵌入的形状是否一致,如果不一致则抛出值错误
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

        # 检查噪声级别是否在有效范围内,如果不在,则抛出值错误
        if noise_level < 0 or noise_level >= self.image_noising_scheduler.config.num_train_timesteps:
            raise ValueError(
                f"`noise_level` must be between 0 and {self.image_noising_scheduler.config.num_train_timesteps - 1}, inclusive."
            )

    # 从 diffusers.pipelines.unclip.pipeline_unclip.UnCLIPPipeline.prepare_latents 复制的代码
    # 准备潜在向量,参数包括形状、数据类型、设备、生成器、潜在向量和调度器
    def prepare_latents(self, shape, dtype, device, generator, latents, scheduler):
        # 如果没有给定潜在向量,则随机生成一个
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 检查给定潜在向量的形状是否与预期形状一致
            if latents.shape != shape:
                raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
            # 将潜在向量移动到指定设备
            latents = latents.to(device)

        # 将潜在向量与调度器的初始噪声标准差相乘
        latents = latents * scheduler.init_noise_sigma
        # 返回处理后的潜在向量
        return latents

    # 为图像嵌入添加噪声,噪声量由噪声级别控制
    def noise_image_embeddings(
        self,
        image_embeds: torch.Tensor,
        noise_level: int,
        noise: Optional[torch.Tensor] = None,
        generator: Optional[torch.Generator] = None,
    ):
        """
        向图像嵌入添加噪声,噪声量由 `noise_level` 输入控制。较高的
        `noise_level` 会增加最终未去噪图像的方差。

        噪声的应用有两种方式:
        1. 噪声调度直接应用于嵌入。
        2. 将正弦时间嵌入向量附加到输出。

        在这两种情况下,噪声量均由相同的 `noise_level` 控制。

        嵌入在应用噪声之前会被归一化,应用噪声后会被反归一化。
        """
        # 如果没有提供噪声,则随机生成噪声
        if noise is None:
            noise = randn_tensor(
                image_embeds.shape, generator=generator, device=image_embeds.device, dtype=image_embeds.dtype
            )

        # 创建与图像嵌入数量相同的噪声级别张量
        noise_level = torch.tensor([noise_level] * image_embeds.shape[0], device=image_embeds.device)

        # 将图像归一化器移动到图像嵌入所在设备
        self.image_normalizer.to(image_embeds.device)
        # 对图像嵌入进行归一化处理
        image_embeds = self.image_normalizer.scale(image_embeds)

        # 使用噪声调度器将噪声添加到图像嵌入
        image_embeds = self.image_noising_scheduler.add_noise(image_embeds, timesteps=noise_level, noise=noise)

        # 对图像嵌入进行反归一化处理
        image_embeds = self.image_normalizer.unscale(image_embeds)

        # 获取时间步嵌入,控制噪声的时间步
        noise_level = get_timestep_embedding(
            timesteps=noise_level, embedding_dim=image_embeds.shape[-1], flip_sin_to_cos=True, downscale_freq_shift=0
        )

        # 将时间步嵌入转换为与图像嵌入相同的数据类型
        noise_level = noise_level.to(image_embeds.dtype)

        # 将时间步嵌入与图像嵌入拼接在一起
        image_embeds = torch.cat((image_embeds, noise_level), 1)

        # 返回包含噪声的图像嵌入
        return image_embeds

    # 禁用梯度计算以提高推理性能
    @torch.no_grad()
    # 用示例文档字符串替换当前文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义一个可调用的方法,用于处理去噪过程
    def __call__(
        self,
        # 正常的去噪过程参数
        # 提示文本,可以是字符串或字符串列表,默认为 None
        prompt: Optional[Union[str, List[str]]] = None,
        # 输出图像的高度,默认为 None
        height: Optional[int] = None,
        # 输出图像的宽度,默认为 None
        width: Optional[int] = None,
        # 推理步骤的数量,默认为 20
        num_inference_steps: int = 20,
        # 引导比例,默认为 10.0
        guidance_scale: float = 10.0,
        # 负提示文本,可以是字符串或字符串列表,默认为 None
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为 1
        num_images_per_prompt: Optional[int] = 1,
        # 控制噪声的参数,默认为 0.0
        eta: float = 0.0,
        # 用于随机数生成的生成器,默认为 None
        generator: Optional[torch.Generator] = None,
        # 潜在变量张量,默认为 None
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入张量,默认为 None
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负提示嵌入张量,默认为 None
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 输出类型,默认为 "pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典格式,默认为 True
        return_dict: bool = True,
        # 可选的回调函数,接收步骤和张量,默认为 None
        callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
        # 回调函数调用的步骤间隔,默认为 1
        callback_steps: int = 1,
        # 跨注意力的可选参数,默认为 None
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 噪声水平,默认为 0
        noise_level: int = 0,
        # 先验参数
        # 先验推理步骤的数量,默认为 25
        prior_num_inference_steps: int = 25,
        # 先验引导比例,默认为 4.0
        prior_guidance_scale: float = 4.0,
        # 先验潜在变量张量,默认为 None
        prior_latents: Optional[torch.Tensor] = None,
        # 可选的跳过剪辑步骤,默认为 None
        clip_skip: Optional[int] = None,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_unclip_img2img.py

# 版权声明,2024年由 HuggingFace 团队保留所有权利。
# 
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守该许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律要求或书面协议另有约定,根据许可证分发的软件是按“原样”提供的,
# 不提供任何种类的担保或条件,无论是明示还是暗示。
# 请参阅许可证以了解特定语言规定的权限和限制。

import inspect  # 导入 inspect 模块以获取有关活跃对象的信息
from typing import Any, Callable, Dict, List, Optional, Union  # 导入类型提示,便于静态类型检查

import PIL.Image  # 导入 PIL.Image 用于图像处理
import torch  # 导入 PyTorch 以使用深度学习功能
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection  # 从 transformers 导入 CLIP 相关模型和处理器

from ...image_processor import VaeImageProcessor  # 导入 VAE 图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 导入稳定扩散和文本反转加载混合器
from ...models import AutoencoderKL, UNet2DConditionModel  # 导入模型类
from ...models.embeddings import get_timestep_embedding  # 导入获取时间步嵌入的函数
from ...models.lora import adjust_lora_scale_text_encoder  # 导入调整 LoRA 缩放的函数
from ...schedulers import KarrasDiffusionSchedulers  # 导入 Karras 扩散调度器
from ...utils import (  # 导入工具函数
    USE_PEFT_BACKEND,  # 导入用于 PEFT 后端的常量
    deprecate,  # 导入用于标记弃用功能的装饰器
    logging,  # 导入日志记录模块
    replace_example_docstring,  # 导入用于替换示例文档字符串的函数
    scale_lora_layers,  # 导入缩放 LoRA 层的函数
    unscale_lora_layers,  # 导入取消缩放 LoRA 层的函数
)
from ...utils.torch_utils import randn_tensor  # 从自定义 PyTorch 工具中导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin  # 导入扩散管道及其输出类
from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer  # 导入稳定 UnCLIP 图像归一化器

logger = logging.get_logger(__name__)  # 创建一个用于当前模块的日志记录器,禁用 pylint 名称检查

EXAMPLE_DOC_STRING = """  # 定义示例文档字符串,用于展示如何使用管道
    Examples:  # 示例的说明
        ```py  # 示例代码块的开始
        >>> import requests  # 导入 requests 库用于发起 HTTP 请求
        >>> import torch  # 导入 PyTorch 库
        >>> from PIL import Image  # 从 PIL 导入图像处理模块
        >>> from io import BytesIO  # 导入 BytesIO 用于处理字节流

        >>> from diffusers import StableUnCLIPImg2ImgPipeline  # 从 diffusers 导入 StableUnCLIPImg2ImgPipeline 类

        >>> pipe = StableUnCLIPImg2ImgPipeline.from_pretrained(  # 从预训练模型加载管道
        ...     "stabilityai/stable-diffusion-2-1-unclip-small", torch_dtype=torch.float16  # 指定模型名称和数据类型
        ... )
        >>> pipe = pipe.to("cuda")  # 将管道移动到 CUDA 设备以加速计算

        >>> url = "https://raw.githubusercontent.com/CompVis/stable-diffusion/main/assets/stable-samples/img2img/sketch-mountains-input.jpg"  # 定义要加载的图像 URL

        >>> response = requests.get(url)  # 发起 GET 请求以获取图像
        >>> init_image = Image.open(BytesIO(response.content)).convert("RGB")  # 打开图像并转换为 RGB 格式
        >>> init_image = init_image.resize((768, 512))  # 将图像调整为指定大小

        >>> prompt = "A fantasy landscape, trending on artstation"  # 定义生成图像的提示文本

        >>> images = pipe(init_image, prompt).images  # 使用管道生成图像
        >>> images[0].save("fantasy_landscape.png")  # 保存生成的图像为 PNG 文件
        ```py  # 示例代码块的结束
"""

class StableUnCLIPImg2ImgPipeline(  # 定义 StableUnCLIPImg2ImgPipeline 类
    DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin  # 继承多个混合器以实现功能组合
):
    """
    使用稳定的 unCLIP 进行文本引导的图像到图像生成的管道。  # 类文档字符串,说明该类的功能

    该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道的通用方法(下载、保存、在特定设备上运行等)。  # 提供有关超类的信息
    # 管道还继承以下加载方法:
    # - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
    # - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
    # - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
    
    # 参数说明:
    # feature_extractor ([`CLIPImageProcessor`]):
    # 图像预处理的特征提取器,编码之前使用。
    # image_encoder ([`CLIPVisionModelWithProjection`]):
    # CLIP 视觉模型,用于编码图像。
    # image_normalizer ([`StableUnCLIPImageNormalizer`]):
    # 用于在添加噪声之前规范化预测的图像嵌入,并在添加噪声后反规范化图像嵌入。
    # image_noising_scheduler ([`KarrasDiffusionSchedulers`]):
    # 添加噪声到预测的图像嵌入的噪声调度器,噪声量由 `noise_level` 决定。
    # tokenizer (`~transformers.CLIPTokenizer`):
    # 一个 [`~transformers.CLIPTokenizer`]。
    # text_encoder ([`~transformers.CLIPTextModel`]):
    # 冻结的 [`~transformers.CLIPTextModel`] 文本编码器。
    # unet ([`UNet2DConditionModel`]):
    # 用于去噪编码图像潜变量的 [`UNet2DConditionModel`]。
    # scheduler ([`KarrasDiffusionSchedulers`]):
    # 与 `unet` 结合使用的调度器,用于去噪编码图像潜变量。
    # vae ([`AutoencoderKL`]):
    # 变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。
    # """
    
    # 定义模型在 CPU 上卸载的顺序
    model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
    # 要从 CPU 卸载中排除的组件
    _exclude_from_cpu_offload = ["image_normalizer"]
    
    # 图像编码组件
    feature_extractor: CLIPImageProcessor
    image_encoder: CLIPVisionModelWithProjection
    
    # 图像加噪声组件
    image_normalizer: StableUnCLIPImageNormalizer
    image_noising_scheduler: KarrasDiffusionSchedulers
    
    # 常规去噪组件
    tokenizer: CLIPTokenizer
    text_encoder: CLIPTextModel
    unet: UNet2DConditionModel
    scheduler: KarrasDiffusionSchedulers
    
    # 变分自编码器
    vae: AutoencoderKL
    
    # 初始化方法
    def __init__(
        # 图像编码组件
        feature_extractor: CLIPImageProcessor,
        image_encoder: CLIPVisionModelWithProjection,
        # 图像加噪声组件
        image_normalizer: StableUnCLIPImageNormalizer,
        image_noising_scheduler: KarrasDiffusionSchedulers,
        # 常规去噪组件
        tokenizer: CLIPTokenizer,
        text_encoder: CLIPTextModel,
        unet: UNet2DConditionModel,
        scheduler: KarrasDiffusionSchedulers,
        # 变分自编码器
        vae: AutoencoderKL,
    # 初始化父类
        ):
            super().__init__()
    
            # 注册模块,包括特征提取器、图像编码器等
            self.register_modules(
                feature_extractor=feature_extractor,
                image_encoder=image_encoder,
                image_normalizer=image_normalizer,
                image_noising_scheduler=image_noising_scheduler,
                tokenizer=tokenizer,
                text_encoder=text_encoder,
                unet=unet,
                scheduler=scheduler,
                vae=vae,
            )
    
            # 计算 VAE 的缩放因子
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建图像处理器,使用 VAE 的缩放因子
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
    
        # 从 StableDiffusionPipeline 复制的编码提示函数
        def _encode_prompt(
            self,
            prompt,
            device,
            num_images_per_prompt,
            do_classifier_free_guidance,
            negative_prompt=None,
            prompt_embeds: Optional[torch.Tensor] = None,
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            lora_scale: Optional[float] = None,
            **kwargs,
        ):
            # 警告用户该函数已弃用,未来版本将被移除
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用 encode_prompt 函数以编码提示
            prompt_embeds_tuple = self.encode_prompt(
                prompt=prompt,
                device=device,
                num_images_per_prompt=num_images_per_prompt,
                do_classifier_free_guidance=do_classifier_free_guidance,
                negative_prompt=negative_prompt,
                prompt_embeds=prompt_embeds,
                negative_prompt_embeds=negative_prompt_embeds,
                lora_scale=lora_scale,
                **kwargs,
            )
    
            # 连接提示嵌入以便于向后兼容
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回连接后的提示嵌入
            return prompt_embeds
    
        # 编码图像的函数
        def _encode_image(
            self,
            image,
            device,
            batch_size,
            num_images_per_prompt,
            do_classifier_free_guidance,
            noise_level,
            generator,
            image_embeds,
    ):
        # 获取图像编码器参数的数据类型
        dtype = next(self.image_encoder.parameters()).dtype

        # 检查输入的图像是否为 PIL 图像类型
        if isinstance(image, PIL.Image.Image):
            # 如果是 PIL 图像,则重复次数与批量大小相同
            repeat_by = batch_size
        else:
            # 否则,假设图像输入已经正确批处理,只需重复以匹配每个提示的图像数量
            #
            # 注意:这可能缺少一些边缘情况,比如已批处理和未批处理的 `image_embeds`。
            # 如果这些情况比较常见,需要仔细考虑输入的预期维度及其编码处理。
            repeat_by = num_images_per_prompt

        # 检查图像嵌入是否为 None
        if image_embeds is None:
            # 如果输入图像不是张量,则使用特征提取器将其转换为张量
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(images=image, return_tensors="pt").pixel_values

            # 将图像转移到指定设备,并转换为适当的数据类型
            image = image.to(device=device, dtype=dtype)
            # 使用图像编码器对图像进行编码,得到图像嵌入
            image_embeds = self.image_encoder(image).image_embeds

        # 对图像嵌入应用噪声图像嵌入处理
        image_embeds = self.noise_image_embeddings(
            image_embeds=image_embeds,
            noise_level=noise_level,
            generator=generator,
        )

        # 为每个生成的提示复制图像嵌入,使用适合 mps 的方法
        image_embeds = image_embeds.unsqueeze(1)
        # 获取嵌入的批量大小、序列长度和最后一个维度
        bs_embed, seq_len, _ = image_embeds.shape
        # 根据 repeat_by 重复图像嵌入
        image_embeds = image_embeds.repeat(1, repeat_by, 1)
        # 重新调整图像嵌入的形状
        image_embeds = image_embeds.view(bs_embed * repeat_by, seq_len, -1)
        # 挤出多余的维度
        image_embeds = image_embeds.squeeze(1)

        # 如果需要进行无分类器引导
        if do_classifier_free_guidance:
            # 创建与图像嵌入形状相同的零张量作为负提示嵌入
            negative_prompt_embeds = torch.zeros_like(image_embeds)

            # 对于无分类器引导,我们需要进行两次前向传播
            # 这里将无条件和文本嵌入拼接到一个批次中,以避免进行两次前向传播
            image_embeds = torch.cat([negative_prompt_embeds, image_embeds])

        # 返回处理后的图像嵌入
        return image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
    def encode_prompt(
        self,
        # 输入的提示文本
        prompt,
        # 设备类型
        device,
        # 每个提示的图像数量
        num_images_per_prompt,
        # 是否进行无分类器引导
        do_classifier_free_guidance,
        # 可选的负提示文本
        negative_prompt=None,
        # 可选的提示嵌入
        prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的 LoRA 缩放因子
        lora_scale: Optional[float] = None,
        # 可选的剪切跳过参数
        clip_skip: Optional[int] = None,
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的部分
    # 解码潜在向量
    def decode_latents(self, latents):
        # 定义过时警告信息,提示用户该方法将在 1.0.0 中被移除
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        # 触发过时警告,告知用户替代方法
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
        # 根据缩放因子调整潜在向量
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在向量,返回的第一个元素是图像数据
        image = self.vae.decode(latents, return_dict=False)[0]
        # 归一化图像数据到 [0, 1] 范围
        image = (image / 2 + 0.5).clamp(0, 1)
        # 将图像数据从 GPU 转移到 CPU,调整维度顺序并转换为 float32 格式,返回为 NumPy 数组
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的方法
        def prepare_extra_step_kwargs(self, generator, eta):
            # 为调度器步骤准备额外的参数,因为并非所有调度器都有相同的签名
            # eta (η) 仅在 DDIMScheduler 中使用,其他调度器将被忽略
            # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
            # 应该在 [0, 1] 范围内
    
            # 检查调度器是否接受 eta 参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 创建额外参数字典
            extra_step_kwargs = {}
            # 如果接受 eta,添加到额外参数字典中
            if accepts_eta:
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器是否接受 generator 参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 如果接受 generator,添加到额外参数字典中
            if accepts_generator:
                extra_step_kwargs["generator"] = generator
            # 返回额外参数字典
            return extra_step_kwargs
    
        # 检查输入参数
        def check_inputs(
            self,
            prompt,  # 输入的提示文本
            image,  # 输入的图像数据
            height,  # 图像高度
            width,  # 图像宽度
            callback_steps,  # 回调步骤
            noise_level,  # 噪声水平
            negative_prompt=None,  # 可选的负面提示文本
            prompt_embeds=None,  # 可选的提示嵌入
            negative_prompt_embeds=None,  # 可选的负面提示嵌入
            image_embeds=None,  # 可选的图像嵌入
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的部分
    # 准备潜在空间的张量,输入包括批量大小、通道数、图像高度和宽度等参数
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 根据输入参数计算潜在张量的形状
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器是否为列表且长度与批量大小匹配,若不匹配则抛出错误
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )
    
        # 如果未提供潜在张量,则生成一个随机的潜在张量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果提供了潜在张量,则将其移动到指定设备
            latents = latents.to(device)
    
        # 根据调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在张量
        return latents
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_unclip.StableUnCLIPPipeline.noise_image_embeddings 复制的函数
        def noise_image_embeddings(
            self,
            image_embeds: torch.Tensor,
            noise_level: int,
            noise: Optional[torch.Tensor] = None,
            generator: Optional[torch.Generator] = None,
    ):
        """
        向图像嵌入添加噪声。噪声的量由 `noise_level` 输入控制。较高的
        `noise_level` 增加最终无噪声图像的方差。

        噪声通过两种方式应用:
        1. 噪声调度直接应用于嵌入。
        2. 一个正弦时间嵌入向量附加到输出中。

        在这两种情况下,噪声的量由相同的 `noise_level` 控制。

        在应用噪声之前,嵌入会被归一化,在应用噪声之后再进行反归一化。
        """
        # 如果未提供噪声,则生成与图像嵌入形状相同的随机噪声
        if noise is None:
            noise = randn_tensor(
                # 生成随机噪声的形状与图像嵌入相同
                image_embeds.shape, generator=generator, device=image_embeds.device, dtype=image_embeds.dtype
            )

        # 创建一个与图像嵌入数量相同的噪声水平张量
        noise_level = torch.tensor([noise_level] * image_embeds.shape[0], device=image_embeds.device)

        # 将图像归一化器移动到与图像嵌入相同的设备
        self.image_normalizer.to(image_embeds.device)
        # 对图像嵌入进行归一化处理
        image_embeds = self.image_normalizer.scale(image_embeds)

        # 向图像嵌入添加噪声,使用噪声调度器
        image_embeds = self.image_noising_scheduler.add_noise(image_embeds, timesteps=noise_level, noise=noise)

        # 对添加噪声后的图像嵌入进行反归一化处理
        image_embeds = self.image_normalizer.unscale(image_embeds)

        # 获取时间步嵌入,并将其应用于图像嵌入
        noise_level = get_timestep_embedding(
            # 传入时间步、嵌入维度等参数来生成时间步嵌入
            timesteps=noise_level, embedding_dim=image_embeds.shape[-1], flip_sin_to_cos=True, downscale_freq_shift=0
        )

        # `get_timestep_embeddings` 不包含任何权重,始终返回 f32 张量,
        # 但我们可能在 fp16 中运行,因此需要在这里转换类型。
        # 可能有更好的封装方式。
        noise_level = noise_level.to(image_embeds.dtype)

        # 将时间步嵌入与图像嵌入在维度1上进行拼接
        image_embeds = torch.cat((image_embeds, noise_level), 1)

        # 返回处理后的图像嵌入
        return image_embeds

    # 不计算梯度以节省内存和加快计算
    @torch.no_grad()
    # 用于替换示例文档字符串的装饰器
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    def __call__(
        # 接收的图像,可以是张量或 PIL 图像
        image: Union[torch.Tensor, PIL.Image.Image] = None,
        # 用户提供的提示文本,可以是字符串或字符串列表
        prompt: Union[str, List[str]] = None,
        # 生成图像的高度,可选
        height: Optional[int] = None,
        # 生成图像的宽度,可选
        width: Optional[int] = None,
        # 推理步骤的数量,默认为20
        num_inference_steps: int = 20,
        # 引导缩放因子,控制生成图像与提示的匹配程度
        guidance_scale: float = 10,
        # 可选的负面提示文本,可以是字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为1
        num_images_per_prompt: Optional[int] = 1,
        # 调整生成图像的随机性参数,默认为0.0
        eta: float = 0.0,
        # 可选的随机数生成器
        generator: Optional[torch.Generator] = None,
        # 可选的潜在张量,通常用于输入
        latents: Optional[torch.Tensor] = None,
        # 可选的提示嵌入张量
        prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负面提示嵌入张量
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 输出类型,可选,默认为"pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典格式的输出,默认为True
        return_dict: bool = True,
        # 可选的回调函数,在特定步骤调用
        callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
        # 回调函数调用的步数间隔
        callback_steps: int = 1,
        # 可选的交叉注意力参数
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 噪声水平,控制噪声的强度
        noise_level: int = 0,
        # 可选的图像嵌入张量
        image_embeds: Optional[torch.Tensor] = None,
        # 可选的跳过的剪辑步骤
        clip_skip: Optional[int] = None,

.\diffusers\pipelines\stable_diffusion\safety_checker.py

# 版权声明,表明该代码的版权所有者及其权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 在 Apache 2.0 许可证下授权("许可证");
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,
# 否则根据许可证分发的软件是按“原样”基础提供的,
# 不提供任何明示或暗示的担保或条件。
# 有关许可证所 governing 权限和限制的详细信息,请参见许可证。

# 导入 NumPy 库,用于数值计算
import numpy as np
# 导入 PyTorch 库
import torch
# 导入 PyTorch 神经网络模块
import torch.nn as nn
# 从 transformers 库导入 CLIP 配置、视觉模型和预训练模型
from transformers import CLIPConfig, CLIPVisionModel, PreTrainedModel

# 从上级目录导入 logging 工具
from ...utils import logging

# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)

# 定义计算余弦距离的函数
def cosine_distance(image_embeds, text_embeds):
    # 对图像嵌入进行归一化处理
    normalized_image_embeds = nn.functional.normalize(image_embeds)
    # 对文本嵌入进行归一化处理
    normalized_text_embeds = nn.functional.normalize(text_embeds)
    # 返回归一化的图像嵌入和文本嵌入的矩阵乘法结果
    return torch.mm(normalized_image_embeds, normalized_text_embeds.t())

# 定义 StableDiffusionSafetyChecker 类,继承自 PreTrainedModel
class StableDiffusionSafetyChecker(PreTrainedModel):
    # 设置配置类为 CLIPConfig
    config_class = CLIPConfig
    # 指定主要输入名称为 "clip_input"
    main_input_name = "clip_input"

    # 指定不需要拆分的模块列表
    _no_split_modules = ["CLIPEncoderLayer"]

    # 初始化方法,接收 CLIPConfig 配置
    def __init__(self, config: CLIPConfig):
        # 调用父类的初始化方法
        super().__init__(config)

        # 初始化视觉模型,使用配置中的视觉部分
        self.vision_model = CLIPVisionModel(config.vision_config)
        # 创建线性层进行视觉投影,输入维度为 hidden_size,输出维度为 projection_dim
        self.visual_projection = nn.Linear(config.vision_config.hidden_size, config.projection_dim, bias=False)

        # 创建概念嵌入的可学习参数,维度为 (17, projection_dim),并设置为不需要梯度更新
        self.concept_embeds = nn.Parameter(torch.ones(17, config.projection_dim), requires_grad=False)
        # 创建特殊关心的嵌入参数,维度为 (3, projection_dim),并设置为不需要梯度更新
        self.special_care_embeds = nn.Parameter(torch.ones(3, config.projection_dim), requires_grad=False)

        # 创建概念嵌入权重参数,维度为 (17),并设置为不需要梯度更新
        self.concept_embeds_weights = nn.Parameter(torch.ones(17), requires_grad=False)
        # 创建特殊关心嵌入权重参数,维度为 (3),并设置为不需要梯度更新
        self.special_care_embeds_weights = nn.Parameter(torch.ones(3), requires_grad=False)

    # 该装饰器指示后续方法不需要计算梯度
    @torch.no_grad()
    # 前向传播方法,接收 CLIP 输入和图像
    def forward(self, clip_input, images):
        # 使用视觉模型处理 CLIP 输入,获取池化输出
        pooled_output = self.vision_model(clip_input)[1]  # pooled_output
        # 将池化输出通过视觉投影层,生成图像嵌入
        image_embeds = self.visual_projection(pooled_output)

        # 始终将结果转换为 float32,避免显著的开销,且与 bfloat16 兼容
        special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds).cpu().float().numpy()
        # 计算图像嵌入与概念嵌入之间的余弦距离
        cos_dist = cosine_distance(image_embeds, self.concept_embeds).cpu().float().numpy()

        # 初始化结果列表
        result = []
        # 获取批次大小
        batch_size = image_embeds.shape[0]
        # 遍历每一张图像
        for i in range(batch_size):
            # 初始化结果字典,包含特殊分数、特殊关心和概念分数
            result_img = {"special_scores": {}, "special_care": [], "concept_scores": {}, "bad_concepts": []}

            # 增加此值以创建更强的 `nfsw` 过滤器
            # 代价是增加过滤良性图像的可能性
            adjustment = 0.0

            # 遍历每个特殊概念的余弦距离
            for concept_idx in range(len(special_cos_dist[0])):
                # 获取当前概念的余弦距离
                concept_cos = special_cos_dist[i][concept_idx]
                # 获取当前概念的阈值
                concept_threshold = self.special_care_embeds_weights[concept_idx].item()
                # 计算并存储特殊分数
                result_img["special_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
                # 如果特殊分数大于0,添加到特殊关心列表
                if result_img["special_scores"][concept_idx] > 0:
                    result_img["special_care"].append({concept_idx, result_img["special_scores"][concept_idx]})
                    # 增加调整值
                    adjustment = 0.01

            # 遍历每个概念的余弦距离
            for concept_idx in range(len(cos_dist[0])):
                # 获取当前概念的余弦距离
                concept_cos = cos_dist[i][concept_idx]
                # 获取当前概念的阈值
                concept_threshold = self.concept_embeds_weights[concept_idx].item()
                # 计算并存储概念分数
                result_img["concept_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
                # 如果概念分数大于0,添加到不良概念列表
                if result_img["concept_scores"][concept_idx] > 0:
                    result_img["bad_concepts"].append(concept_idx)

            # 将当前图像的结果添加到结果列表
            result.append(result_img)

        # 检查是否存在任何不适宜内容的概念
        has_nsfw_concepts = [len(res["bad_concepts"]) > 0 for res in result]

        # 遍历每个结果,处理不适宜内容的图像
        for idx, has_nsfw_concept in enumerate(has_nsfw_concepts):
            if has_nsfw_concept:
                # 如果 images 是张量,将该图像替换为黑图
                if torch.is_tensor(images) or torch.is_tensor(images[0]):
                    images[idx] = torch.zeros_like(images[idx])  # black image
                # 如果 images 是 NumPy 数组,将该图像替换为黑图
                else:
                    images[idx] = np.zeros(images[idx].shape)  # black image

        # 如果检测到任何不适宜内容,记录警告信息
        if any(has_nsfw_concepts):
            logger.warning(
                "Potential NSFW content was detected in one or more images. A black image will be returned instead."
                " Try again with a different prompt and/or seed."
            )

        # 返回处理后的图像和不适宜内容的概念标识
        return images, has_nsfw_concepts

    # 在不计算梯度的情况下进行操作
    @torch.no_grad()
    # 定义一个处理输入的前向传播方法,接收 CLIP 输入和图像张量
    def forward_onnx(self, clip_input: torch.Tensor, images: torch.Tensor):
        # 通过视觉模型处理 CLIP 输入,获取池化输出
        pooled_output = self.vision_model(clip_input)[1]  # pooled_output
        # 将池化输出通过视觉投影生成图像嵌入
        image_embeds = self.visual_projection(pooled_output)
    
        # 计算图像嵌入与特殊关心嵌入之间的余弦距离
        special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds)
        # 计算图像嵌入与概念嵌入之间的余弦距离
        cos_dist = cosine_distance(image_embeds, self.concept_embeds)
    
        # 增加此值以创建更强的 NSFW 过滤器
        # 代价是增加过滤良性图像的可能性
        adjustment = 0.0
    
        # 计算特殊分数,考虑余弦距离、特殊关心嵌入权重和调整值
        special_scores = special_cos_dist - self.special_care_embeds_weights + adjustment
        # 对特殊分数进行四舍五入(注释掉的代码)
        # special_scores = special_scores.round(decimals=3)
        # 检查特殊分数是否大于零,返回布尔值表示是否关注
        special_care = torch.any(special_scores > 0, dim=1)
        # 如果特殊关心成立,调整值增加
        special_adjustment = special_care * 0.01
        # 扩展调整值以匹配余弦距离的形状
        special_adjustment = special_adjustment.unsqueeze(1).expand(-1, cos_dist.shape[1])
    
        # 计算概念分数,考虑余弦距离、概念嵌入权重和特殊调整
        concept_scores = (cos_dist - self.concept_embeds_weights) + special_adjustment
        # 对概念分数进行四舍五入(注释掉的代码)
        # concept_scores = concept_scores.round(decimals=3)
        # 检查概念分数是否大于零,返回布尔值表示是否有 NSFW 概念
        has_nsfw_concepts = torch.any(concept_scores > 0, dim=1)
    
        # 将具有 NSFW 概念的图像设置为黑色图像
        images[has_nsfw_concepts] = 0.0  # black image
    
        # 返回处理后的图像和 NSFW 概念的布尔值
        return images, has_nsfw_concepts

.\diffusers\pipelines\stable_diffusion\safety_checker_flax.py

# 版权声明,表示此代码归 HuggingFace 团队所有,保留所有权利。
# 
# 根据 Apache 2.0 许可证(“许可证”)授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件是以“原样”基础分发的,
# 不提供任何明示或暗示的保证或条件。
# 有关许可证下特定语言的权限和限制,请参阅许可证。

# 导入可选类型和元组类型
from typing import Optional, Tuple

# 导入 jax 库及其 numpy 模块
import jax
import jax.numpy as jnp
# 导入 flax 的模块和相关类
from flax import linen as nn
from flax.core.frozen_dict import FrozenDict
# 导入 CLIP 配置和模型基类
from transformers import CLIPConfig, FlaxPreTrainedModel
# 导入 CLIP 视觉模块
from transformers.models.clip.modeling_flax_clip import FlaxCLIPVisionModule


# 定义计算两个嵌入之间的余弦距离的函数
def jax_cosine_distance(emb_1, emb_2, eps=1e-12):
    # 归一化第一个嵌入,防止除以零
    norm_emb_1 = jnp.divide(emb_1.T, jnp.clip(jnp.linalg.norm(emb_1, axis=1), a_min=eps)).T
    # 归一化第二个嵌入,防止除以零
    norm_emb_2 = jnp.divide(emb_2.T, jnp.clip(jnp.linalg.norm(emb_2, axis=1), a_min=eps)).T
    # 返回两个归一化嵌入的点积,作为余弦相似度
    return jnp.matmul(norm_emb_1, norm_emb_2.T)


# 定义 Flax 稳定扩散安全检查器模块的类
class FlaxStableDiffusionSafetyCheckerModule(nn.Module):
    # 定义配置和数据类型属性
    config: CLIPConfig
    dtype: jnp.dtype = jnp.float32

    # 设置模块的组件
    def setup(self):
        # 初始化视觉模型
        self.vision_model = FlaxCLIPVisionModule(self.config.vision_config)
        # 定义视觉投影层,使用无偏置和指定数据类型
        self.visual_projection = nn.Dense(self.config.projection_dim, use_bias=False, dtype=self.dtype)

        # 定义概念嵌入的参数,初始化为全1的矩阵
        self.concept_embeds = self.param("concept_embeds", jax.nn.initializers.ones, (17, self.config.projection_dim))
        # 定义特殊关怀嵌入的参数,初始化为全1的矩阵
        self.special_care_embeds = self.param(
            "special_care_embeds", jax.nn.initializers.ones, (3, self.config.projection_dim)
        )

        # 定义概念嵌入权重的参数,初始化为全1的向量
        self.concept_embeds_weights = self.param("concept_embeds_weights", jax.nn.initializers.ones, (17,))
        # 定义特殊关怀嵌入权重的参数,初始化为全1的向量
        self.special_care_embeds_weights = self.param("special_care_embeds_weights", jax.nn.initializers.ones, (3,))
    # 定义调用方法,接受输入片段
        def __call__(self, clip_input):
            # 通过视觉模型处理输入片段,获取池化输出
            pooled_output = self.vision_model(clip_input)[1]
            # 将池化输出映射到图像嵌入
            image_embeds = self.visual_projection(pooled_output)
    
            # 计算图像嵌入与特殊关怀嵌入之间的余弦距离
            special_cos_dist = jax_cosine_distance(image_embeds, self.special_care_embeds)
            # 计算图像嵌入与概念嵌入之间的余弦距离
            cos_dist = jax_cosine_distance(image_embeds, self.concept_embeds)
    
            # 增加该值可创建更强的 `nfsw` 过滤器
            # 但可能会增加过滤良性图像输入的可能性
            adjustment = 0.0
    
            # 计算特殊关怀分数,考虑权重和调整值
            special_scores = special_cos_dist - self.special_care_embeds_weights[None, :] + adjustment
            # 将特殊关怀分数四舍五入到小数点后三位
            special_scores = jnp.round(special_scores, 3)
            # 判断是否有特殊关怀分数大于0
            is_special_care = jnp.any(special_scores > 0, axis=1, keepdims=True)
            # 如果图像有任何特殊关怀概念,使用较低的阈值
            special_adjustment = is_special_care * 0.01
    
            # 计算概念分数,考虑权重和特殊调整值
            concept_scores = cos_dist - self.concept_embeds_weights[None, :] + special_adjustment
            # 将概念分数四舍五入到小数点后三位
            concept_scores = jnp.round(concept_scores, 3)
            # 判断是否有 nfsw 概念分数大于0
            has_nsfw_concepts = jnp.any(concept_scores > 0, axis=1)
    
            # 返回是否包含 nfsw 概念的布尔值
            return has_nsfw_concepts
# 定义一个 Flax 稳定扩散安全检查器类,继承自 FlaxPreTrainedModel
class FlaxStableDiffusionSafetyChecker(FlaxPreTrainedModel):
    # 指定配置类为 CLIPConfig
    config_class = CLIPConfig
    # 指定主输入名称为 "clip_input"
    main_input_name = "clip_input"
    # 指定模块类为 FlaxStableDiffusionSafetyCheckerModule
    module_class = FlaxStableDiffusionSafetyCheckerModule

    # 初始化方法,接受配置和其他参数
    def __init__(
        self,
        config: CLIPConfig,
        input_shape: Optional[Tuple] = None,
        seed: int = 0,
        dtype: jnp.dtype = jnp.float32,
        _do_init: bool = True,
        **kwargs,
    ):
        # 如果输入形状未提供,使用默认值 (1, 224, 224, 3)
        if input_shape is None:
            input_shape = (1, 224, 224, 3)
        # 创建模块实例,传入配置和数据类型
        module = self.module_class(config=config, dtype=dtype, **kwargs)
        # 调用父类的初始化方法
        super().__init__(config, module, input_shape=input_shape, seed=seed, dtype=dtype, _do_init=_do_init)

    # 初始化权重的方法
    def init_weights(self, rng: jax.Array, input_shape: Tuple, params: FrozenDict = None) -> FrozenDict:
        # 生成输入张量,使用随机正态分布
        clip_input = jax.random.normal(rng, input_shape)

        # 分割随机数生成器以获得不同的随机种子
        params_rng, dropout_rng = jax.random.split(rng)
        # 定义随机种子字典
        rngs = {"params": params_rng, "dropout": dropout_rng}

        # 初始化模块参数
        random_params = self.module.init(rngs, clip_input)["params"]

        # 返回初始化的参数
        return random_params

    # 定义调用方法
    def __call__(
        self,
        clip_input,
        params: dict = None,
    ):
        # 转置输入张量的维度
        clip_input = jnp.transpose(clip_input, (0, 2, 3, 1))

        # 应用模块并返回结果
        return self.module.apply(
            {"params": params or self.params},
            jnp.array(clip_input, dtype=jnp.float32),
            rngs={},
        )

.\diffusers\pipelines\stable_diffusion\stable_unclip_image_normalizer.py

# 版权声明,说明版权信息和许可条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 该文件的使用须遵循许可证的规定
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非在适用的情况下或以书面形式达成一致,软件
# 在许可证下分发是基于 "按现状" 的基础,不提供任何形式的保证或条件
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 见许可证了解特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.

# 从 typing 模块导入可选和联合类型
from typing import Optional, Union

# 导入 PyTorch 库
import torch
# 从 torch 库中导入神经网络模块
from torch import nn

# 从配置工具中导入基类和注册装饰器
from ...configuration_utils import ConfigMixin, register_to_config
# 从模型工具中导入模型基类
from ...models.modeling_utils import ModelMixin


# 定义 StableUnCLIPImageNormalizer 类,继承自 ModelMixin 和 ConfigMixin
class StableUnCLIPImageNormalizer(ModelMixin, ConfigMixin):
    """
    该类用于保存用于稳定 unCLIP 的 CLIP 嵌入器的均值和标准差。

    在应用噪声之前,用于对图像嵌入进行标准化,并在去除标准化后的噪声图像
    嵌入时使用。
    """

    # 使用注册装饰器将此方法注册到配置中
    @register_to_config
    def __init__(
        self,
        embedding_dim: int = 768,  # 初始化时设定嵌入维度,默认值为 768
    ):
        # 调用父类构造函数
        super().__init__()

        # 定义均值参数,初始化为零的张量,形状为 (1, embedding_dim)
        self.mean = nn.Parameter(torch.zeros(1, embedding_dim))
        # 定义标准差参数,初始化为一的张量,形状为 (1, embedding_dim)
        self.std = nn.Parameter(torch.ones(1, embedding_dim))

    # 定义设备转换方法,可以将均值和标准差移动到指定设备和数据类型
    def to(
        self,
        torch_device: Optional[Union[str, torch.device]] = None,  # 可选的设备参数
        torch_dtype: Optional[torch.dtype] = None,  # 可选的数据类型参数
    ):
        # 将均值参数转换到指定的设备和数据类型
        self.mean = nn.Parameter(self.mean.to(torch_device).to(torch_dtype))
        # 将标准差参数转换到指定的设备和数据类型
        self.std = nn.Parameter(self.std.to(torch_device).to(torch_dtype))
        # 返回当前对象
        return self

    # 定义缩放方法,对嵌入进行标准化
    def scale(self, embeds):
        # 根据均值和标准差标准化嵌入
        embeds = (embeds - self.mean) * 1.0 / self.std
        # 返回标准化后的嵌入
        return embeds

    # 定义反缩放方法,将标准化的嵌入恢复为原始值
    def unscale(self, embeds):
        # 根据标准差和均值反标准化嵌入
        embeds = (embeds * self.std) + self.mean
        # 返回恢复后的嵌入
        return embeds

.\diffusers\pipelines\stable_diffusion\__init__.py

# 导入类型检查相关的模块
from typing import TYPE_CHECKING

# 从工具模块导入多个依赖项和函数
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 慢速导入标志
    OptionalDependencyNotAvailable,  # 可选依赖未找到的异常
    _LazyModule,  # 懒加载模块的工具
    get_objects_from_module,  # 从模块获取对象的函数
    is_flax_available,  # 检查 Flax 库是否可用
    is_k_diffusion_available,  # 检查 K-Diffusion 库是否可用
    is_k_diffusion_version,  # 检查 K-Diffusion 版本
    is_onnx_available,  # 检查 ONNX 库是否可用
    is_torch_available,  # 检查 PyTorch 库是否可用
    is_transformers_available,  # 检查 Transformers 库是否可用
    is_transformers_version,  # 检查 Transformers 版本
)

# 创建空字典以存储虚拟对象
_dummy_objects = {}
# 创建空字典以存储额外导入
_additional_imports = {}
# 定义初始导入结构,包含管道输出
_import_structure = {"pipeline_output": ["StableDiffusionPipelineOutput"]}

# 如果 Transformers 和 Flax 可用,则扩展管道输出
if is_transformers_available() and is_flax_available():
    _import_structure["pipeline_output"].extend(["FlaxStableDiffusionPipelineOutput"])
# 尝试检查 PyTorch 和 Transformers 的可用性
try:
    if not (is_transformers_available() and is_torch_available()):
        raise OptionalDependencyNotAvailable()  # 如果不可用则引发异常
except OptionalDependencyNotAvailable:
    # 导入虚拟对象以处理缺少的依赖项
    from ...utils import dummy_torch_and_transformers_objects  # noqa F403

    # 更新虚拟对象字典
    _dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
    # 如果可用,更新导入结构,添加多个管道
    _import_structure["clip_image_project_model"] = ["CLIPImageProjection"]
    _import_structure["pipeline_cycle_diffusion"] = ["CycleDiffusionPipeline"]
    _import_structure["pipeline_stable_diffusion"] = ["StableDiffusionPipeline"]
    _import_structure["pipeline_stable_diffusion_attend_and_excite"] = ["StableDiffusionAttendAndExcitePipeline"]
    _import_structure["pipeline_stable_diffusion_gligen"] = ["StableDiffusionGLIGENPipeline"]
    _import_structure["pipeline_stable_diffusion_gligen_text_image"] = ["StableDiffusionGLIGENTextImagePipeline"]
    _import_structure["pipeline_stable_diffusion_img2img"] = ["StableDiffusionImg2ImgPipeline"]
    _import_structure["pipeline_stable_diffusion_inpaint"] = ["StableDiffusionInpaintPipeline"]
    _import_structure["pipeline_stable_diffusion_inpaint_legacy"] = ["StableDiffusionInpaintPipelineLegacy"]
    _import_structure["pipeline_stable_diffusion_instruct_pix2pix"] = ["StableDiffusionInstructPix2PixPipeline"]
    _import_structure["pipeline_stable_diffusion_latent_upscale"] = ["StableDiffusionLatentUpscalePipeline"]
    _import_structure["pipeline_stable_diffusion_model_editing"] = ["StableDiffusionModelEditingPipeline"]
    _import_structure["pipeline_stable_diffusion_paradigms"] = ["StableDiffusionParadigmsPipeline"]
    _import_structure["pipeline_stable_diffusion_upscale"] = ["StableDiffusionUpscalePipeline"]
    _import_structure["pipeline_stable_unclip"] = ["StableUnCLIPPipeline"]
    _import_structure["pipeline_stable_unclip_img2img"] = ["StableUnCLIPImg2ImgPipeline"]
    _import_structure["safety_checker"] = ["StableDiffusionSafetyChecker"]
    _import_structure["stable_unclip_image_normalizer"] = ["StableUnCLIPImageNormalizer"]
# 尝试检查更严格的 Transformers 和 PyTorch 依赖条件
try:
    if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
        raise OptionalDependencyNotAvailable()  # 如果条件不满足则引发异常
except OptionalDependencyNotAvailable:
    # 导入缺失的管道以处理可选依赖
    from ...utils.dummy_torch_and_transformers_objects import (
        StableDiffusionImageVariationPipeline,  # 导入图像变体管道
    )
    # 更新 _dummy_objects 字典,将键 "StableDiffusionImageVariationPipeline" 映射到 StableDiffusionImageVariationPipeline 对象
    _dummy_objects.update({"StableDiffusionImageVariationPipeline": StableDiffusionImageVariationPipeline})
# 如果没有可选依赖,则添加 StableDiffusionImageVariationPipeline 到导入结构
else:
    _import_structure["pipeline_stable_diffusion_image_variation"] = ["StableDiffusionImageVariationPipeline"]

# 尝试检查依赖条件是否满足
try:
    # 判断 transformers 和 torch 是否可用,且 transformers 版本是否满足条件
    if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.26.0")):
        # 如果条件不满足,抛出依赖不可用异常
        raise OptionalDependencyNotAvailable()
# 捕获依赖不可用的异常
except OptionalDependencyNotAvailable:
    # 从虚拟对象模块导入 StableDiffusionDepth2ImgPipeline
    from ...utils.dummy_torch_and_transformers_objects import (
        StableDiffusionDepth2ImgPipeline,
    )

    # 更新虚拟对象字典,添加 StableDiffusionDepth2ImgPipeline
    _dummy_objects.update(
        {
            "StableDiffusionDepth2ImgPipeline": StableDiffusionDepth2ImgPipeline,
        }
    )
# 如果没有抛出异常,则添加 StableDiffusionDepth2ImgPipeline 到导入结构
else:
    _import_structure["pipeline_stable_diffusion_depth2img"] = ["StableDiffusionDepth2ImgPipeline"]

# 尝试检查其他依赖条件是否满足
try:
    # 判断 transformers 和 onnx 是否可用
    if not (is_transformers_available() and is_onnx_available()):
        # 如果条件不满足,抛出依赖不可用异常
        raise OptionalDependencyNotAvailable()
# 捕获依赖不可用的异常
except OptionalDependencyNotAvailable:
    # 从虚拟 ONNX 对象模块导入
    from ...utils import dummy_onnx_objects  # noqa F403

    # 更新虚拟对象字典,添加 ONNX 对象
    _dummy_objects.update(get_objects_from_module(dummy_onnx_objects))
# 如果没有抛出异常,则添加 ONNX 相关的导入结构
else:
    _import_structure["pipeline_onnx_stable_diffusion"] = [
        "OnnxStableDiffusionPipeline",
        "StableDiffusionOnnxPipeline",
    ]
    _import_structure["pipeline_onnx_stable_diffusion_img2img"] = ["OnnxStableDiffusionImg2ImgPipeline"]
    _import_structure["pipeline_onnx_stable_diffusion_inpaint"] = ["OnnxStableDiffusionInpaintPipeline"]
    _import_structure["pipeline_onnx_stable_diffusion_inpaint_legacy"] = ["OnnxStableDiffusionInpaintPipelineLegacy"]
    _import_structure["pipeline_onnx_stable_diffusion_upscale"] = ["OnnxStableDiffusionUpscalePipeline"]

# 检查 transformers 和 flax 是否可用
if is_transformers_available() and is_flax_available():
    # 从调度器模块导入 PNDMSchedulerState
    from ...schedulers.scheduling_pndm_flax import PNDMSchedulerState

    # 更新额外导入字典,添加 PNDMSchedulerState
    _additional_imports.update({"PNDMSchedulerState": PNDMSchedulerState})
    # 添加 flax 相关的导入结构
    _import_structure["pipeline_flax_stable_diffusion"] = ["FlaxStableDiffusionPipeline"]
    _import_structure["pipeline_flax_stable_diffusion_img2img"] = ["FlaxStableDiffusionImg2ImgPipeline"]
    _import_structure["pipeline_flax_stable_diffusion_inpaint"] = ["FlaxStableDiffusionInpaintPipeline"]
    _import_structure["safety_checker_flax"] = ["FlaxStableDiffusionSafetyChecker"]

# 检查类型检查或慢导入条件
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 尝试检查依赖条件
    try:
        # 判断 transformers 和 torch 是否可用
        if not (is_transformers_available() and is_torch_available()):
            # 如果条件不满足,抛出依赖不可用异常
            raise OptionalDependencyNotAvailable()

    # 捕获依赖不可用的异常
    except OptionalDependencyNotAvailable:
        # 从虚拟对象模块导入所有内容
        from ...utils.dummy_torch_and_transformers_objects import *
    else:
        # 从模块中导入 CLIPImageProjection 类
        from .clip_image_project_model import CLIPImageProjection
        # 从模块中导入 StableDiffusionPipeline 和 StableDiffusionPipelineOutput 类
        from .pipeline_stable_diffusion import (
            StableDiffusionPipeline,
            StableDiffusionPipelineOutput,
        )
        # 从模块中导入 StableDiffusionImg2ImgPipeline 类
        from .pipeline_stable_diffusion_img2img import StableDiffusionImg2ImgPipeline
        # 从模块中导入 StableDiffusionInpaintPipeline 类
        from .pipeline_stable_diffusion_inpaint import StableDiffusionInpaintPipeline
        # 从模块中导入 StableDiffusionInstructPix2PixPipeline 类
        from .pipeline_stable_diffusion_instruct_pix2pix import (
            StableDiffusionInstructPix2PixPipeline,
        )
        # 从模块中导入 StableDiffusionLatentUpscalePipeline 类
        from .pipeline_stable_diffusion_latent_upscale import (
            StableDiffusionLatentUpscalePipeline,
        )
        # 从模块中导入 StableDiffusionUpscalePipeline 类
        from .pipeline_stable_diffusion_upscale import StableDiffusionUpscalePipeline
        # 从模块中导入 StableUnCLIPPipeline 类
        from .pipeline_stable_unclip import StableUnCLIPPipeline
        # 从模块中导入 StableUnCLIPImg2ImgPipeline 类
        from .pipeline_stable_unclip_img2img import StableUnCLIPImg2ImgPipeline
        # 从模块中导入 StableDiffusionSafetyChecker 类
        from .safety_checker import StableDiffusionSafetyChecker
        # 从模块中导入 StableUnCLIPImageNormalizer 类
        from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer

    try:
        # 检查是否可用所需的 transformers 和 torch 库及其版本
        if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
            # 抛出可选依赖不可用异常
            raise OptionalDependencyNotAvailable()
    except OptionalDependencyNotAvailable:
        # 导入 dummy_torch_and_transformers_objects 中的 StableDiffusionImageVariationPipeline 类
        from ...utils.dummy_torch_and_transformers_objects import (
            StableDiffusionImageVariationPipeline,
        )
    else:
        # 从模块中导入 StableDiffusionImageVariationPipeline 类
        from .pipeline_stable_diffusion_image_variation import (
            StableDiffusionImageVariationPipeline,
        )

    try:
        # 检查是否可用所需的 transformers 和 torch 库及其版本
        if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.26.0")):
            # 抛出可选依赖不可用异常
            raise OptionalDependencyNotAvailable()
    except OptionalDependencyNotAvailable:
        # 导入 dummy_torch_and_transformers_objects 中的 StableDiffusionDepth2ImgPipeline 类
        from ...utils.dummy_torch_and_transformers_objects import StableDiffusionDepth2ImgPipeline
    else:
        # 从模块中导入 StableDiffusionDepth2ImgPipeline 类
        from .pipeline_stable_diffusion_depth2img import (
            StableDiffusionDepth2ImgPipeline,
        )

    try:
        # 检查是否可用所需的 transformers 和 onnx 库
        if not (is_transformers_available() and is_onnx_available()):
            # 抛出可选依赖不可用异常
            raise OptionalDependencyNotAvailable()
    except OptionalDependencyNotAvailable:
        # 从 dummy_onnx_objects 中导入所有对象
        from ...utils.dummy_onnx_objects import *
    else:
        # 从模块中导入 OnnxStableDiffusionPipeline 和 StableDiffusionOnnxPipeline 类
        from .pipeline_onnx_stable_diffusion import (
            OnnxStableDiffusionPipeline,
            StableDiffusionOnnxPipeline,
        )
        # 从模块中导入 OnnxStableDiffusionImg2ImgPipeline 类
        from .pipeline_onnx_stable_diffusion_img2img import (
            OnnxStableDiffusionImg2ImgPipeline,
        )
        # 从模块中导入 OnnxStableDiffusionInpaintPipeline 类
        from .pipeline_onnx_stable_diffusion_inpaint import (
            OnnxStableDiffusionInpaintPipeline,
        )
        # 从模块中导入 OnnxStableDiffusionUpscalePipeline 类
        from .pipeline_onnx_stable_diffusion_upscale import (
            OnnxStableDiffusionUpscalePipeline,
        )

    try:
        # 检查是否可用所需的 transformers 和 flax 库
        if not (is_transformers_available() and is_flax_available()):
            # 抛出可选依赖不可用异常
            raise OptionalDependencyNotAvailable()
    except OptionalDependencyNotAvailable:
        # 从 dummy_flax_objects 中导入所有对象
        from ...utils.dummy_flax_objects import *
    # 如果条件不满足,则执行以下导入操作
        else:
            # 从模块中导入 FlaxStableDiffusionPipeline 类
            from .pipeline_flax_stable_diffusion import FlaxStableDiffusionPipeline
            # 从模块中导入 FlaxStableDiffusionImg2ImgPipeline 类
            from .pipeline_flax_stable_diffusion_img2img import (
                FlaxStableDiffusionImg2ImgPipeline,
            )
            # 从模块中导入 FlaxStableDiffusionInpaintPipeline 类
            from .pipeline_flax_stable_diffusion_inpaint import (
                FlaxStableDiffusionInpaintPipeline,
            )
            # 从模块中导入 FlaxStableDiffusionPipelineOutput 类
            from .pipeline_output import FlaxStableDiffusionPipelineOutput
            # 从模块中导入 FlaxStableDiffusionSafetyChecker 类
            from .safety_checker_flax import FlaxStableDiffusionSafetyChecker
# 否则分支,处理模块的懒加载
else:
    # 导入 sys 模块以便操作模块相关功能
    import sys

    # 将当前模块名映射到一个懒加载模块实例
    sys.modules[__name__] = _LazyModule(
        __name__,  # 当前模块的名称
        globals()["__file__"],  # 当前模块的文件路径
        _import_structure,  # 定义的导入结构
        module_spec=__spec__,  # 当前模块的规格
    )

    # 遍历虚拟对象字典,将每个对象设置到当前模块中
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)
    # 遍历附加导入字典,将每个对象设置到当前模块中
    for name, value in _additional_imports.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\stable_diffusion_3\pipeline_output.py

# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入 List 和 Union 类型提示
from typing import List, Union

# 导入 numpy 库并简化为 np
import numpy as np
# 导入 PIL.Image 模块,用于处理图像
import PIL.Image

# 从上级模块的 utils 中导入 BaseOutput 类
from ...utils import BaseOutput


# 定义一个数据类 StableDiffusion3PipelineOutput,继承自 BaseOutput
@dataclass
class StableDiffusion3PipelineOutput(BaseOutput):
    """
    Stable Diffusion 管道的输出类。

    参数:
        images (`List[PIL.Image.Image]` 或 `np.ndarray`)
            长度为 `batch_size` 的去噪 PIL 图像列表或形状为 `(batch_size, height, width,
            num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组表示扩散管道的去噪图像。
    """

    # 定义一个属性 images,类型为列表或 numpy 数组
    images: Union[List[PIL.Image.Image], np.ndarray]

.\diffusers\pipelines\stable_diffusion_3\pipeline_stable_diffusion_3.py

# 版权声明,表明版权归 2024 Stability AI 和 HuggingFace Team 所有
# 
# 根据 Apache 许可证,版本 2.0("许可证")进行许可;
# 除非遵循许可证,否则您不能使用此文件。
# 您可以在以下网址获取许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律或书面同意,否则根据许可证分发的软件是以“现状”基础提供的,
# 不提供任何形式的保证或条件,无论是明示或暗示的。
# 有关许可证下权限和限制的具体条款,请参见许可证。

# 导入 inspect 模块,用于获取对象的各种信息
import inspect
# 从 typing 模块导入类型提示相关的类型
from typing import Any, Callable, Dict, List, Optional, Union

# 导入 PyTorch 库
import torch
# 从 transformers 库导入相关的模型和分词器
from transformers import (
    CLIPTextModelWithProjection,  # 导入 CLIP 文本模型
    CLIPTokenizer,                 # 导入 CLIP 分词器
    T5EncoderModel,                # 导入 T5 编码器模型
    T5TokenizerFast,               # 导入快速 T5 分词器
)

# 从本地模块导入图像处理器
from ...image_processor import VaeImageProcessor
# 从本地模块导入加载器混合类
from ...loaders import FromSingleFileMixin, SD3LoraLoaderMixin
# 从本地模块导入自动编码器模型
from ...models.autoencoders import AutoencoderKL
# 从本地模块导入变换器模型
from ...models.transformers import SD3Transformer2DModel
# 从本地模块导入调度器
from ...schedulers import FlowMatchEulerDiscreteScheduler
# 从本地模块导入各种工具函数
from ...utils import (
    USE_PEFT_BACKEND,              # 导入 PEFT 后端使用标志
    is_torch_xla_available,        # 导入检查 XLA 可用性的函数
    logging,                       # 导入日志记录工具
    replace_example_docstring,     # 导入替换示例文档字符串的工具
    scale_lora_layers,             # 导入缩放 LoRA 层的工具
    unscale_lora_layers,           # 导入取消缩放 LoRA 层的工具
)
# 从本地模块导入随机张量生成工具
from ...utils.torch_utils import randn_tensor
# 从本地模块导入扩散管道工具
from ..pipeline_utils import DiffusionPipeline
# 从本地模块导入稳定扩散管道输出类
from .pipeline_output import StableDiffusion3PipelineOutput

# 检查是否可用 torch_xla 库,若可用则导入
if is_torch_xla_available():
    import torch_xla.core.xla_model as xm  # 导入 XLA 模型相关功能

    XLA_AVAILABLE = True  # 设置 XLA 可用标志为 True
else:
    XLA_AVAILABLE = False  # 设置 XLA 可用标志为 False

# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,用于说明如何使用 StableDiffusion3Pipeline
EXAMPLE_DOC_STRING = """
    Examples:
        ```py
        >>> import torch
        >>> from diffusers import StableDiffusion3Pipeline

        >>> pipe = StableDiffusion3Pipeline.from_pretrained(
        ...     "stabilityai/stable-diffusion-3-medium-diffusers", torch_dtype=torch.float16
        ... )
        >>> pipe.to("cuda")  # 将管道移动到 GPU
        >>> prompt = "A cat holding a sign that says hello world"  # 定义生成图像的提示
        >>> image = pipe(prompt).images[0]  # 生成图像并提取第一张
        >>> image.save("sd3.png")  # 保存生成的图像
        ```py
"""

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 中复制的函数
def retrieve_timesteps(
    scheduler,                       # 调度器对象,用于管理时间步
    num_inference_steps: Optional[int] = None,  # 可选参数,推理步骤数
    device: Optional[Union[str, torch.device]] = None,  # 可选参数,设备类型
    timesteps: Optional[List[int]] = None,  # 可选参数,自定义时间步列表
    sigmas: Optional[List[float]] = None,    # 可选参数,自定义 sigma 值列表
    **kwargs,                           # 额外的关键字参数
):
    """
    调用调度器的 `set_timesteps` 方法并在调用后从调度器中检索时间步。处理
    自定义时间步。所有的关键字参数将传递给 `scheduler.set_timesteps`。
    # 定义参数的说明
        Args:
            scheduler (`SchedulerMixin`):  # 调度器,用于获取时间步
                The scheduler to get timesteps from.
            num_inference_steps (`int`):  # 生成样本时使用的扩散步骤数
                The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
                must be `None`.
            device (`str` or `torch.device`, *optional*):  # 指定时间步要移动到的设备
                The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
            timesteps (`List[int]`, *optional*):  # 自定义时间步以覆盖调度器的时间步间隔策略
                Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
                `num_inference_steps` and `sigmas` must be `None`.
            sigmas (`List[float]`, *optional*):  # 自定义sigma以覆盖调度器的时间步间隔策略
                Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
                `num_inference_steps` and `timesteps` must be `None`.
    
        Returns:
            `Tuple[torch.Tensor, int]`: 返回一个元组,第一个元素是调度器的时间步调度,第二个元素是推理步骤的数量。
        """
        # 检查是否同时传入了时间步和sigma,若是则抛出错误
        if timesteps is not None and sigmas is not None:
            raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
        # 检查是否传入了时间步
        if timesteps is not None:
            # 检查调度器是否支持自定义时间步
            accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
            if not accepts_timesteps:  # 不支持则抛出错误
                raise ValueError(
                    f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                    f" timestep schedules. Please check whether you are using the correct scheduler."
                )
            # 设置调度器的时间步
            scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
            # 获取当前调度器的时间步
            timesteps = scheduler.timesteps
            # 计算推理步骤的数量
            num_inference_steps = len(timesteps)
        # 检查是否传入了sigma
        elif sigmas is not None:
            # 检查调度器是否支持自定义sigma
            accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
            if not accept_sigmas:  # 不支持则抛出错误
                raise ValueError(
                    f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                    f" sigmas schedules. Please check whether you are using the correct scheduler."
                )
            # 设置调度器的sigma
            scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
            # 获取当前调度器的时间步
            timesteps = scheduler.timesteps
            # 计算推理步骤的数量
            num_inference_steps = len(timesteps)
        # 如果没有传入时间步和sigma
        else:
            # 使用推理步骤数量设置调度器的时间步
            scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
            # 获取当前调度器的时间步
            timesteps = scheduler.timesteps
        # 返回时间步和推理步骤数量
        return timesteps, num_inference_steps
# 定义 StableDiffusion3Pipeline 类,继承自 DiffusionPipeline、SD3LoraLoaderMixin 和 FromSingleFileMixin
class StableDiffusion3Pipeline(DiffusionPipeline, SD3LoraLoaderMixin, FromSingleFileMixin):
    r"""  # 文档字符串,描述类的参数及其作用
    Args:  # 指明构造函数参数的开始
        transformer ([`SD3Transformer2DModel`]):  # 条件 Transformer(MMDiT)架构,用于去噪编码后的图像潜变量
            Conditional Transformer (MMDiT) architecture to denoise the encoded image latents.
        scheduler ([`FlowMatchEulerDiscreteScheduler`]):  # 与 transformer 结合使用的调度器,用于去噪图像潜变量
            A scheduler to be used in combination with `transformer` to denoise the encoded image latents.
        vae ([`AutoencoderKL`]):  # 用于图像编码和解码的变分自编码器模型
            Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
        text_encoder ([`CLIPTextModelWithProjection`]):  # 特定 CLIP 变体的文本编码器,具有额外的投影层
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
            specifically the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant,
            with an additional added projection layer that is initialized with a diagonal matrix with the `hidden_size`
            as its dimension.
        text_encoder_2 ([`CLIPTextModelWithProjection`]):  # 第二个特定 CLIP 变体的文本编码器
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
            specifically the
            [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
            variant.
        text_encoder_3 ([`T5EncoderModel`]):  # 冻结的文本编码器,使用 T5 变体
            Frozen text-encoder. Stable Diffusion 3 uses
            [T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel), specifically the
            [t5-v1_1-xxl](https://huggingface.co/google/t5-v1_1-xxl) variant.
        tokenizer (`CLIPTokenizer`):  # CLIPTokenizer 类的标记器
            Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        tokenizer_2 (`CLIPTokenizer`):  # 第二个 CLIPTokenizer 类的标记器
            Second Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        tokenizer_3 (`T5TokenizerFast`):  # T5Tokenizer 类的标记器
            Tokenizer of class
            [T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer).
    """  # 文档字符串结束

    # 定义模型的 CPU 离线加载顺序
    model_cpu_offload_seq = "text_encoder->text_encoder_2->text_encoder_3->transformer->vae"
    # 定义可选组件为空列表
    _optional_components = []
    # 定义回调张量输入的列表
    _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "negative_pooled_prompt_embeds"]

    # 初始化方法,定义类的构造函数
    def __init__(
        self,
        transformer: SD3Transformer2DModel,  # 接收的 transformer 参数,类型为 SD3Transformer2DModel
        scheduler: FlowMatchEulerDiscreteScheduler,  # 接收的调度器参数,类型为 FlowMatchEulerDiscreteScheduler
        vae: AutoencoderKL,  # 接收的 VAE 参数,类型为 AutoencoderKL
        text_encoder: CLIPTextModelWithProjection,  # 接收的文本编码器参数,类型为 CLIPTextModelWithProjection
        tokenizer: CLIPTokenizer,  # 接收的标记器参数,类型为 CLIPTokenizer
        text_encoder_2: CLIPTextModelWithProjection,  # 接收的第二个文本编码器参数,类型为 CLIPTextModelWithProjection
        tokenizer_2: CLIPTokenizer,  # 接收的第二个标记器参数,类型为 CLIPTokenizer
        text_encoder_3: T5EncoderModel,  # 接收的第三个文本编码器参数,类型为 T5EncoderModel
        tokenizer_3: T5TokenizerFast,  # 接收的第三个标记器参数,类型为 T5TokenizerFast
    # 初始化父类
        ):
            super().__init__()
    
            # 注册多个模块,包括变分自编码器、文本编码器和调度器
            self.register_modules(
                vae=vae,
                text_encoder=text_encoder,
                text_encoder_2=text_encoder_2,
                text_encoder_3=text_encoder_3,
                tokenizer=tokenizer,
                tokenizer_2=tokenizer_2,
                tokenizer_3=tokenizer_3,
                transformer=transformer,
                scheduler=scheduler,
            )
            # 计算 VAE 缩放因子,基于配置的块输出通道数
            self.vae_scale_factor = (
                2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
            )
            # 创建图像处理器,使用计算得出的 VAE 缩放因子
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
            # 获取最大标记长度,如果有 tokenizer 的话
            self.tokenizer_max_length = (
                self.tokenizer.model_max_length if hasattr(self, "tokenizer") and self.tokenizer is not None else 77
            )
            # 设置默认样本大小,基于 transformer 的配置
            self.default_sample_size = (
                self.transformer.config.sample_size
                if hasattr(self, "transformer") and self.transformer is not None
                else 128
            )
    
        # 定义获取 T5 提示嵌入的方法
        def _get_t5_prompt_embeds(
            self,
            # 提示文本,支持单个字符串或字符串列表
            prompt: Union[str, List[str]] = None,
            # 每个提示生成的图像数量
            num_images_per_prompt: int = 1,
            # 最大序列长度限制
            max_sequence_length: int = 256,
            # 设备类型选择
            device: Optional[torch.device] = None,
            # 数据类型选择
            dtype: Optional[torch.dtype] = None,
    # 处理设备和数据类型的设置
        ):
            # 如果未指定设备,则使用默认执行设备
            device = device or self._execution_device
            # 如果未指定数据类型,则使用文本编码器的数据类型
            dtype = dtype or self.text_encoder.dtype
    
            # 将输入的 prompt 转换为列表形式(如果是字符串则包裹在列表中)
            prompt = [prompt] if isinstance(prompt, str) else prompt
            # 获取 prompt 的批处理大小
            batch_size = len(prompt)
    
            # 如果没有第三个文本编码器,则返回零张量
            if self.text_encoder_3 is None:
                return torch.zeros(
                    # 创建一个零张量,形状由批处理大小和其他参数决定
                    (
                        batch_size * num_images_per_prompt,
                        self.tokenizer_max_length,
                        self.transformer.config.joint_attention_dim,
                    ),
                    device=device,
                    dtype=dtype,
                )
    
            # 使用第三个文本编码器对 prompt 进行编码,返回张量格式
            text_inputs = self.tokenizer_3(
                prompt,
                padding="max_length",  # 填充到最大长度
                max_length=max_sequence_length,  # 最大序列长度
                truncation=True,  # 启用截断
                add_special_tokens=True,  # 添加特殊标记
                return_tensors="pt",  # 返回 PyTorch 张量
            )
            # 提取输入 ID
            text_input_ids = text_inputs.input_ids
            # 获取未截断的 ID
            untruncated_ids = self.tokenizer_3(prompt, padding="longest", return_tensors="pt").input_ids
    
            # 检查未截断 ID 是否长于或等于输入 ID,且不相等
            if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
                # 解码被截断的部分并记录警告
                removed_text = self.tokenizer_3.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
                logger.warning(
                    "The following part of your input was truncated because `max_sequence_length` is set to "
                    f" {max_sequence_length} tokens: {removed_text}"
                )
    
            # 获取文本编码器对输入 ID 的嵌入
            prompt_embeds = self.text_encoder_3(text_input_ids.to(device))[0]
    
            # 设置数据类型
            dtype = self.text_encoder_3.dtype
            # 将嵌入转换为指定的数据类型和设备
            prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
    
            # 获取嵌入的形状信息
            _, seq_len, _ = prompt_embeds.shape
    
            # 为每个 prompt 生成的图像重复文本嵌入和注意力掩码,采用适合 mps 的方法
            prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
            # 调整嵌入的形状
            prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
    
            # 返回最终的文本嵌入
            return prompt_embeds
    
        # 获取 CLIP 的提示嵌入
        def _get_clip_prompt_embeds(
            self,
            # 接受字符串或字符串列表作为提示
            prompt: Union[str, List[str]],
            # 每个提示生成的图像数量,默认值为 1
            num_images_per_prompt: int = 1,
            # 设备选择
            device: Optional[torch.device] = None,
            # CLIP 跳过的层数(可选)
            clip_skip: Optional[int] = None,
            # CLIP 模型索引,默认值为 0
            clip_model_index: int = 0,
    ):
        # 确定设备,如果没有指定,则使用默认执行设备
        device = device or self._execution_device

        # 定义 CLIP 的两个分词器
        clip_tokenizers = [self.tokenizer, self.tokenizer_2]
        # 定义 CLIP 的两个文本编码器
        clip_text_encoders = [self.text_encoder, self.text_encoder_2]

        # 根据给定的模型索引选择对应的分词器
        tokenizer = clip_tokenizers[clip_model_index]
        # 根据给定的模型索引选择对应的文本编码器
        text_encoder = clip_text_encoders[clip_model_index]

        # 如果 prompt 是字符串,则将其放入列表中,否则保持原样
        prompt = [prompt] if isinstance(prompt, str) else prompt
        # 计算批处理大小,即 prompt 的数量
        batch_size = len(prompt)

        # 使用选定的分词器对 prompt 进行编码,返回张量
        text_inputs = tokenizer(
            prompt,
            padding="max_length",  # 填充到最大长度
            max_length=self.tokenizer_max_length,  # 最大长度限制
            truncation=True,  # 允许截断
            return_tensors="pt",  # 返回 PyTorch 张量
        )

        # 提取编码后的输入 ID
        text_input_ids = text_inputs.input_ids
        # 对原始 prompt 进行长格式编码以获取未截断的 ID
        untruncated_ids = tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
        # 检查是否需要警告用户输入被截断
        if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
            # 解码被截断的文本
            removed_text = tokenizer.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
            # 记录警告,通知用户被截断的部分
            logger.warning(
                "The following part of your input was truncated because CLIP can only handle sequences up to"
                f" {self.tokenizer_max_length} tokens: {removed_text}"
            )
        # 使用选定的文本编码器生成提示的嵌入表示
        prompt_embeds = text_encoder(text_input_ids.to(device), output_hidden_states=True)
        # 提取 pooled 提示嵌入
        pooled_prompt_embeds = prompt_embeds[0]

        # 根据是否指定 clip_skip 来选择对应的隐藏状态
        if clip_skip is None:
            prompt_embeds = prompt_embeds.hidden_states[-2]
        else:
            prompt_embeds = prompt_embeds.hidden_states[-(clip_skip + 2)]

        # 将嵌入转换为指定的数据类型和设备
        prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)

        # 获取提示嵌入的形状
        _, seq_len, _ = prompt_embeds.shape
        # 对每个 prompt 生成多个图像时复制文本嵌入,使用兼容 mps 的方法
        prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
        # 重塑为批处理大小与生成图像数量的组合
        prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)

        # 复制 pooled 提示嵌入以适应多个生成的图像
        pooled_prompt_embeds = pooled_prompt_embeds.repeat(1, num_images_per_prompt, 1)
        # 重塑为批处理大小与生成图像数量的组合
        pooled_prompt_embeds = pooled_prompt_embeds.view(batch_size * num_images_per_prompt, -1)

        # 返回提示嵌入和 pooled 提示嵌入
        return prompt_embeds, pooled_prompt_embeds
    # 定义一个方法用于编码提示信息
        def encode_prompt(
            self,  # 方法参数开始
            prompt: Union[str, List[str]],  # 第一个提示,支持字符串或字符串列表
            prompt_2: Union[str, List[str]],  # 第二个提示,支持字符串或字符串列表
            prompt_3: Union[str, List[str]],  # 第三个提示,支持字符串或字符串列表
            device: Optional[torch.device] = None,  # 可选参数,指定设备
            num_images_per_prompt: int = 1,  # 每个提示生成的图像数量,默认1
            do_classifier_free_guidance: bool = True,  # 是否执行无分类器引导,默认是
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 可选的负面提示
            negative_prompt_2: Optional[Union[str, List[str]]] = None,  # 第二个负面提示
            negative_prompt_3: Optional[Union[str, List[str]]] = None,  # 第三个负面提示
            prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的负面提示嵌入
            pooled_prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的池化提示嵌入
            negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的负面池化提示嵌入
            clip_skip: Optional[int] = None,  # 可选参数,指定跳过的剪辑步骤
            max_sequence_length: int = 256,  # 最大序列长度,默认256
        # 定义一个检查输入的方法
        def check_inputs(
            self,  # 方法参数开始
            prompt,  # 第一个提示
            prompt_2,  # 第二个提示
            prompt_3,  # 第三个提示
            height,  # 图像高度
            width,  # 图像宽度
            negative_prompt=None,  # 可选的负面提示
            negative_prompt_2=None,  # 第二个负面提示
            negative_prompt_3=None,  # 第三个负面提示
            prompt_embeds=None,  # 可选的提示嵌入
            negative_prompt_embeds=None,  # 可选的负面提示嵌入
            pooled_prompt_embeds=None,  # 可选的池化提示嵌入
            negative_pooled_prompt_embeds=None,  # 可选的负面池化提示嵌入
            callback_on_step_end_tensor_inputs=None,  # 可选的回调输入
            max_sequence_length=None,  # 可选的最大序列长度
        # 定义一个准备潜在变量的方法
        def prepare_latents(
            self,  # 方法参数开始
            batch_size,  # 批次大小
            num_channels_latents,  # 潜在变量通道数
            height,  # 图像高度
            width,  # 图像宽度
            dtype,  # 数据类型
            device,  # 设备
            generator,  # 随机生成器
            latents=None,  # 可选的潜在变量
        ):
            if latents is not None:  # 检查是否提供了潜在变量
                return latents.to(device=device, dtype=dtype)  # 转移潜在变量到指定设备和数据类型
    
            shape = (  # 定义潜在变量的形状
                batch_size,  # 批次大小
                num_channels_latents,  # 潜在变量通道数
                int(height) // self.vae_scale_factor,  # 高度缩放
                int(width) // self.vae_scale_factor,  # 宽度缩放
            )
    
            if isinstance(generator, list) and len(generator) != batch_size:  # 检查生成器列表的长度
                raise ValueError(  # 如果长度不匹配则抛出错误
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)  # 生成潜在变量
    
            return latents  # 返回生成的潜在变量
    
        @property
        def guidance_scale(self):  # 定义一个属性用于获取引导比例
            return self._guidance_scale  # 返回引导比例
    
        @property
        def clip_skip(self):  # 定义一个属性用于获取剪辑跳过值
            return self._clip_skip  # 返回剪辑跳过值
    
        # 这里`guidance_scale`定义类似于Imagen论文中方程(2)的引导权重`w`
        # https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
        # 表示不执行无分类器引导。
        @property
        def do_classifier_free_guidance(self):  # 定义一个属性判断是否执行无分类器引导
            return self._guidance_scale > 1  # 如果引导比例大于1,则返回True
    
        @property
        def joint_attention_kwargs(self):  # 定义一个属性用于获取联合注意力参数
            return self._joint_attention_kwargs  # 返回联合注意力参数
    
        @property
        def num_timesteps(self):  # 定义一个属性用于获取时间步数
            return self._num_timesteps  # 返回时间步数
    
        @property
        def interrupt(self):  # 定义一个属性用于获取中断状态
            return self._interrupt  # 返回中断状态
    # 禁用梯度计算,以节省内存和加快计算速度
    @torch.no_grad()
    # 装饰器,用于替换示例文档字符串为预定义的文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义可调用对象的方法,接收多个参数用于生成图像
    def __call__(
        # 提示文本,可以是字符串或字符串列表,默认为 None
        self,
        prompt: Union[str, List[str]] = None,
        # 第二个提示文本,类型和默认值与 prompt 相同
        prompt_2: Optional[Union[str, List[str]]] = None,
        # 第三个提示文本,类型和默认值与 prompt 相同
        prompt_3: Optional[Union[str, List[str]]] = None,
        # 输出图像的高度,可选参数,默认为 None
        height: Optional[int] = None,
        # 输出图像的宽度,可选参数,默认为 None
        width: Optional[int] = None,
        # 进行推理的步骤数,默认为 28
        num_inference_steps: int = 28,
        # 指定时间步的列表,默认为 None
        timesteps: List[int] = None,
        # 引导强度,默认为 7.0
        guidance_scale: float = 7.0,
        # 负面提示文本,可以是字符串或字符串列表,默认为 None
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 第二个负面提示文本,类型和默认值与 negative_prompt 相同
        negative_prompt_2: Optional[Union[str, List[str]]] = None,
        # 第三个负面提示文本,类型和默认值与 negative_prompt 相同
        negative_prompt_3: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为 1
        num_images_per_prompt: Optional[int] = 1,
        # 随机数生成器,可以是单个生成器或生成器列表,默认为 None
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在表示,可选的浮点张量,默认为 None
        latents: Optional[torch.FloatTensor] = None,
        # 提示嵌入,可选的浮点张量,默认为 None
        prompt_embeds: Optional[torch.FloatTensor] = None,
        # 负面提示嵌入,可选的浮点张量,默认为 None
        negative_prompt_embeds: Optional[torch.FloatTensor] = None,
        # 聚合的提示嵌入,可选的浮点张量,默认为 None
        pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
        # 负面聚合提示嵌入,可选的浮点张量,默认为 None
        negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
        # 输出类型,默认为 "pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典,默认为 True
        return_dict: bool = True,
        # 联合注意力的额外参数,默认为 None
        joint_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 跳过的剪辑步骤,默认为 None
        clip_skip: Optional[int] = None,
        # 每步结束时的回调函数,默认为 None
        callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
        # 在每步结束时要回调的张量输入列表,默认为 ["latents"]
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # 最大序列长度,默认为 256
        max_sequence_length: int = 256,