diffusers-源码解析-五十一-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(五十一)

.\diffusers\pipelines\stable_diffusion_xl\pipeline_output.py

# 从 dataclasses 模块导入 dataclass 装饰器,用于简化类的定义
from dataclasses import dataclass
# 从 typing 模块导入 List 和 Union 类型提示
from typing import List, Union

# 导入 numpy 库,通常用于数值计算
import numpy as np
# 导入 PIL.Image 模块,用于处理图像
import PIL.Image

# 从上级模块导入 BaseOutput 类和 is_flax_available 函数
from ...utils import BaseOutput, is_flax_available


# 定义一个数据类,用于存储 Stable Diffusion 管道的输出
@dataclass
class StableDiffusionXLPipelineOutput(BaseOutput):
    """
    Stable Diffusion 管道的输出类。

    参数:
        images (`List[PIL.Image.Image]` 或 `np.ndarray`)
            包含去噪后的 PIL 图像的列表,长度为 `batch_size`,或形状为 `(batch_size, height, width,
            num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组表示扩散管道的去噪图像。
    """

    # 定义一个属性 images,可以是 PIL 图像列表或 numpy 数组
    images: Union[List[PIL.Image.Image], np.ndarray]


# 检查是否可用 Flax 库
if is_flax_available():
    # 导入 flax 库,通常用于深度学习框架
    import flax

    # 定义一个数据类,用于存储 Flax Stable Diffusion XL 管道的输出
    @flax.struct.dataclass
    class FlaxStableDiffusionXLPipelineOutput(BaseOutput):
        """
        Flax Stable Diffusion XL 管道的输出类。

        参数:
            images (`np.ndarray`)
                形状为 `(batch_size, height, width, num_channels)` 的数组,包含来自扩散管道的图像。
        """

        # 定义一个属性 images,类型为 numpy 数组
        images: np.ndarray

.\diffusers\pipelines\stable_diffusion_xl\pipeline_stable_diffusion_xl.py

# 版权所有 2024 The HuggingFace Team. 保留所有权利。
#
# 根据 Apache License, Version 2.0("许可证")许可;
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下地址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,
# 否则根据许可证分发的软件是以“按现状”基础分发的,
# 不提供任何种类的担保或条件,无论是明示或暗示的。
# 有关许可证所管理的权限和限制的具体信息,请参见许可证。

import inspect  # 导入 inspect 模块,用于获取对象的接口信息
from typing import Any, Callable, Dict, List, Optional, Tuple, Union  # 从 typing 导入类型提示所需的类型

import torch  # 导入 PyTorch 库,进行张量运算和深度学习任务
from transformers import (  # 从 transformers 库导入以下组件
    CLIPImageProcessor,  # CLIP 图像处理器
    CLIPTextModel,  # CLIP 文本模型
    CLIPTextModelWithProjection,  # 带投影的 CLIP 文本模型
    CLIPTokenizer,  # CLIP 分词器
    CLIPVisionModelWithProjection,  # 带投影的 CLIP 视觉模型
)

from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 从回调模块导入多管道回调和管道回调
from ...image_processor import PipelineImageInput, VaeImageProcessor  # 从图像处理模块导入管道图像输入和变分自编码器图像处理器
from ...loaders import (  # 从加载器模块导入以下混合类
    FromSingleFileMixin,  # 从单文件加载的混合类
    IPAdapterMixin,  # IP 适配器混合类
    StableDiffusionXLLoraLoaderMixin,  # 稳定扩散 XL Lora 加载器混合类
    TextualInversionLoaderMixin,  # 文本反演加载器混合类
)
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel  # 从模型模块导入自编码器、图像投影和条件 UNet 模型
from ...models.attention_processor import (  # 从注意力处理模块导入以下处理器
    AttnProcessor2_0,  # 注意力处理器版本 2.0
    FusedAttnProcessor2_0,  # 融合的注意力处理器版本 2.0
    XFormersAttnProcessor,  # XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder  # 从 Lora 模型导入调整文本编码器的 Lora 比例函数
from ...schedulers import KarrasDiffusionSchedulers  # 从调度器模块导入 Karras 扩散调度器
from ...utils import (  # 从工具模块导入以下工具函数和常量
    USE_PEFT_BACKEND,  # 使用 PEFT 后端的标志
    deprecate,  # 用于标记过时功能的工具
    is_invisible_watermark_available,  # 检查隐形水印是否可用的函数
    is_torch_xla_available,  # 检查 Torch XLA 是否可用的函数
    logging,  # 日志记录工具
    replace_example_docstring,  # 替换示例文档字符串的工具
    scale_lora_layers,  # 调整 Lora 层规模的函数
    unscale_lora_layers,  # 恢复 Lora 层规模的函数
)
from ...utils.torch_utils import randn_tensor  # 从 PyTorch 工具模块导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 从管道工具模块导入扩散管道和稳定扩散混合类
from .pipeline_output import StableDiffusionXLPipelineOutput  # 从管道输出模块导入稳定扩散 XL 管道输出类


if is_invisible_watermark_available():  # 如果隐形水印可用
    from .watermark import StableDiffusionXLWatermarker  # 导入稳定扩散 XL 水印工具

if is_torch_xla_available():  # 如果 Torch XLA 可用
    import torch_xla.core.xla_model as xm  # 导入 XLA 模型核心

    XLA_AVAILABLE = True  # 设置 XLA 可用标志为真
else:
    XLA_AVAILABLE = False  # 设置 XLA 可用标志为假


logger = logging.get_logger(__name__)  # 创建一个记录器,使用当前模块的名称作为标识

EXAMPLE_DOC_STRING = """  # 示例文档字符串,用于展示代码示例
    Examples:  # 示例说明
        ```py  # Python 代码示例开始
        >>> import torch  # 导入 PyTorch 库
        >>> from diffusers import StableDiffusionXLPipeline  # 从 diffusers 导入稳定扩散 XL 管道

        >>> pipe = StableDiffusionXLPipeline.from_pretrained(  # 从预训练模型加载管道
        ...     "stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16  # 指定模型名称和数据类型
        ... )
        >>> pipe = pipe.to("cuda")  # 将管道转移到 GPU

        >>> prompt = "a photo of an astronaut riding a horse on mars"  # 定义生成图像的提示
        >>> image = pipe(prompt).images[0]  # 生成图像并获取第一个图像
        ```py
"""


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制的函数
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):  # 定义重标定噪声配置的函数,接受噪声配置、预测文本和指导重标定参数
    """
    根据 `guidance_rescale` 重标定 `noise_cfg`。基于[常见扩散噪声调度和样本步骤存在缺陷](https://arxiv.org/pdf/2305.08891.pdf)中的发现。见第 3.4 节
    """  # 函数文档字符串,解释功能和引用文献
    # 计算噪声预测文本的标准差,沿指定维度进行计算,并保持维度
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算噪声配置的标准差,沿指定维度进行计算,并保持维度
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 根据标准差重新缩放来自指导的结果,以修正过度曝光问题
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 通过指导缩放因子将重新缩放的噪声与原始指导结果混合,以避免生成“普通”外观的图像
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回最终的噪声配置
    return noise_cfg
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制的代码
def retrieve_timesteps(
    # 调度器对象,用于获取时间步
    scheduler,
    # 生成样本时使用的扩散步骤数量,默认为 None
    num_inference_steps: Optional[int] = None,
    # 时间步要移动到的设备,可以是字符串或 torch.device 类型,默认为 None
    device: Optional[Union[str, torch.device]] = None,
    # 自定义时间步,默认为 None
    timesteps: Optional[List[int]] = None,
    # 自定义 sigma 值,默认为 None
    sigmas: Optional[List[float]] = None,
    # 其他关键字参数,将传递给调度器的 set_timesteps 方法
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法,并在调用后从调度器检索时间步。处理
    自定义时间步。任何关键字参数将被提供给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            用于获取时间步的调度器。
        num_inference_steps (`int`):
            生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`,*可选*):
            时间步要移动到的设备。如果为 `None`,则不移动时间步。
        timesteps (`List[int]`,*可选*):
            自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递了 `timesteps`,
            则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`,*可选*):
            自定义 sigma 值,用于覆盖调度器的时间步间隔策略。如果传递了 `sigmas`,
            则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

    返回:
        `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是来自调度器的时间步调度,
        第二个元素是推理步骤的数量。
    """
    # 如果同时传入了自定义时间步和 sigma,抛出错误
    if timesteps is not None and sigmas is not None:
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
    # 如果传入了自定义时间步
    if timesteps is not None:
        # 检查当前调度器是否接受自定义时间步
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持自定义时间步,抛出错误
        if not accepts_timesteps:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 设置自定义时间步,移动到指定设备
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 从调度器获取时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果传入了自定义 sigma
    elif sigmas is not None:
        # 检查当前调度器是否接受自定义 sigma
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持自定义 sigma,抛出错误
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 设置自定义 sigma,移动到指定设备
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 从调度器获取时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果不是特定条件,设置调度器的时间步长
    else:
        # 调用调度器的方法,设置推理步骤数和设备信息,同时传递其他关键字参数
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取调度器的时间步长
        timesteps = scheduler.timesteps
    # 返回时间步长和推理步骤数
    return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionXLPipeline 的类,继承多个混合类以实现功能
class StableDiffusionXLPipeline(
    # 继承自 DiffusionPipeline,提供基础的扩散模型功能
    DiffusionPipeline,
    # 继承自 StableDiffusionMixin,添加与稳定扩散相关的方法
    StableDiffusionMixin,
    # 继承自 FromSingleFileMixin,支持从单个文件加载模型
    FromSingleFileMixin,
    # 继承自 StableDiffusionXLLoraLoaderMixin,添加加载和保存 LoRA 权重的方法
    StableDiffusionXLLoraLoaderMixin,
    # 继承自 TextualInversionLoaderMixin,添加加载文本反转嵌入的方法
    TextualInversionLoaderMixin,
    # 继承自 IPAdapterMixin,支持加载 IP 适配器
    IPAdapterMixin,
):
    # 文档字符串,描述该类的功能
    r"""
    使用 Stable Diffusion XL 进行文本到图像生成的管道。

    该模型继承自 [`DiffusionPipeline`]。查看超类文档以了解库实现的所有管道的通用方法
    (例如下载或保存,在特定设备上运行等)。

    该管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
    # 参数说明
    Args:
        # 变分自编码器模型,用于将图像编码为潜在表示并解码回图像
        vae ([`AutoencoderKL`]):
            Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
        # 冻结的文本编码器,Stable Diffusion XL 使用 CLIP 模型的文本部分
        text_encoder ([`CLIPTextModel`]):
            Frozen text-encoder. Stable Diffusion XL uses the text portion of
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
            the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
        # 第二个冻结的文本编码器,Stable Diffusion XL 使用 CLIP 模型的文本和池化部分
        text_encoder_2 ([` CLIPTextModelWithProjection`]):
            Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
            specifically the
            [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
            variant.
        # CLIP 的分词器,用于将文本转换为模型可处理的输入
        tokenizer (`CLIPTokenizer`):
            Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        # 第二个 CLIP 分词器
        tokenizer_2 (`CLIPTokenizer`):
            Second Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        # 条件 U-Net 架构,用于对编码的图像潜在表示进行去噪
        unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.
        # 调度器,与 U-Net 结合使用以去噪编码的图像潜在表示
        scheduler ([`SchedulerMixin`]):
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        # 是否强制将负提示嵌入设置为 0
        force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
            Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
            `stabilityai/stable-diffusion-xl-base-1-0`.
        # 是否使用隐形水印库对输出图像进行水印处理
        add_watermarker (`bool`, *optional*):
            Whether to use the [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) to
            watermark output images. If not defined, it will default to True if the package is installed, otherwise no
            watermarker will be used.
    """
    # 定义模型在 CPU 上的卸载顺序
    model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
    # 可选组件的列表,包含一些可选的模型组件
    _optional_components = [
        "tokenizer",
        "tokenizer_2",
        "text_encoder",
        "text_encoder_2",
        "image_encoder",
        "feature_extractor",
    ]
    # 回调张量输入的列表,包含模型输入张量的名称
    _callback_tensor_inputs = [
        "latents",
        "prompt_embeds",
        "negative_prompt_embeds",
        "add_text_embeds",
        "add_time_ids",
        "negative_pooled_prompt_embeds",
        "negative_add_time_ids",
    ]
    # 初始化方法,用于创建类的实例并设置属性
        def __init__(
            self,
            vae: AutoencoderKL,  # 变分自编码器模型
            text_encoder: CLIPTextModel,  # 文本编码器模型
            text_encoder_2: CLIPTextModelWithProjection,  # 第二文本编码器模型
            tokenizer: CLIPTokenizer,  # 文本分词器
            tokenizer_2: CLIPTokenizer,  # 第二文本分词器
            unet: UNet2DConditionModel,  # U-Net 模型用于生成图像
            scheduler: KarrasDiffusionSchedulers,  # 扩散调度器
            image_encoder: CLIPVisionModelWithProjection = None,  # 可选的图像编码器
            feature_extractor: CLIPImageProcessor = None,  # 可选的特征提取器
            force_zeros_for_empty_prompt: bool = True,  # 是否对空提示强制使用零
            add_watermarker: Optional[bool] = None,  # 可选的水印添加标志
        ):
            super().__init__()  # 调用父类的初始化方法
    
            # 注册模块,初始化必要的组件
            self.register_modules(
                vae=vae,
                text_encoder=text_encoder,
                text_encoder_2=text_encoder_2,
                tokenizer=tokenizer,
                tokenizer_2=tokenizer_2,
                unet=unet,
                scheduler=scheduler,
                image_encoder=image_encoder,
                feature_extractor=feature_extractor,
            )
            # 注册配置参数,包含强制零的标志
            self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
            # 计算 VAE 的缩放因子,基于块输出通道的数量
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建图像处理器,使用计算得出的缩放因子
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
    
            # 设置默认采样大小,来自 U-Net 的配置
            self.default_sample_size = self.unet.config.sample_size
    
            # 如果没有提供水印标志,则根据可用性决定
            add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
    
            # 根据水印标志,初始化水印器
            if add_watermarker:
                self.watermark = StableDiffusionXLWatermarker()  # 初始化水印器
            else:
                self.watermark = None  # 不使用水印
    
        # 编码提示的方法,用于生成嵌入
        def encode_prompt(
            self,
            prompt: str,  # 主提示字符串
            prompt_2: Optional[str] = None,  # 可选的第二提示字符串
            device: Optional[torch.device] = None,  # 设备信息,默认为 None
            num_images_per_prompt: int = 1,  # 每个提示生成的图像数量
            do_classifier_free_guidance: bool = True,  # 是否进行无分类器引导
            negative_prompt: Optional[str] = None,  # 可选的负提示字符串
            negative_prompt_2: Optional[str] = None,  # 可选的第二负提示字符串
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负提示嵌入
            pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的池化提示嵌入
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负池化提示嵌入
            lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
            clip_skip: Optional[int] = None,  # 可选的跳过 Clip 层
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制
    # 定义编码图像的函数,接受图像、设备、每个提示的图像数量和可选的隐藏状态
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 如果输入的图像不是张量,则使用特征提取器处理并返回张量格式
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像转移到指定设备并转换为正确的数据类型
            image = image.to(device=device, dtype=dtype)
            # 如果请求输出隐藏状态,则获取编码后的图像隐藏状态
            if output_hidden_states:
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                # 通过重复每个图像的隐藏状态来匹配提示数量
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 对于未条件图像,获取对应的编码隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                # 同样地重复未条件图像的隐藏状态
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回条件和未条件的隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 如果不需要隐藏状态,直接获取图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 通过重复嵌入来匹配提示数量
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建与图像嵌入相同形状的零张量作为未条件嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回条件和未条件的图像嵌入
                return image_embeds, uncond_image_embeds
    
        # 从 StableDiffusionPipeline 中复制的函数,用于准备图像嵌入
        def prepare_ip_adapter_image_embeds(
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    ):
        # 初始化一个空列表,用于存储图像嵌入
        image_embeds = []
        # 如果启用分类器自由引导,则初始化一个空列表,用于存储负面图像嵌入
        if do_classifier_free_guidance:
            negative_image_embeds = []
        # 如果输入适配器图像嵌入为空
        if ip_adapter_image_embeds is None:
            # 检查输入适配器图像是否为列表,如果不是则转换为列表
            if not isinstance(ip_adapter_image, list):
                ip_adapter_image = [ip_adapter_image]

            # 检查输入适配器图像的长度是否与 IP 适配器数量匹配
            if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                # 如果不匹配,抛出错误并提供详细信息
                raise ValueError(
                    f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                )

            # 遍历每个单独的适配器图像和其对应的图像投影层
            for single_ip_adapter_image, image_proj_layer in zip(
                ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
            ):
                # 判断是否输出隐藏状态
                output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                # 对单个适配器图像进行编码,返回图像嵌入和负面图像嵌入
                single_image_embeds, single_negative_image_embeds = self.encode_image(
                    single_ip_adapter_image, device, 1, output_hidden_state
                )

                # 将单个图像嵌入添加到列表中
                image_embeds.append(single_image_embeds[None, :])
                # 如果启用分类器自由引导,则将负面图像嵌入添加到列表中
                if do_classifier_free_guidance:
                    negative_image_embeds.append(single_negative_image_embeds[None, :])
        else:
            # 如果已有输入适配器图像嵌入
            for single_image_embeds in ip_adapter_image_embeds:
                # 如果启用分类器自由引导,将负面和正面图像嵌入分开
                if do_classifier_free_guidance:
                    single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                    # 添加负面图像嵌入到列表中
                    negative_image_embeds.append(single_negative_image_embeds)
                # 将正面图像嵌入添加到列表中
                image_embeds.append(single_image_embeds)

        # 初始化一个空列表,用于存储最终的 IP 适配器图像嵌入
        ip_adapter_image_embeds = []
        # 遍历每个图像嵌入
        for i, single_image_embeds in enumerate(image_embeds):
            # 将单个图像嵌入重复 num_images_per_prompt 次
            single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
            # 如果启用分类器自由引导,处理负面图像嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                # 将负面图像嵌入与正面图像嵌入拼接
                single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

            # 将图像嵌入移动到指定设备
            single_image_embeds = single_image_embeds.to(device=device)
            # 将处理后的图像嵌入添加到结果列表中
            ip_adapter_image_embeds.append(single_image_embeds)

        # 返回所有的 IP 适配器图像嵌入
        return ip_adapter_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的内容
    # 准备调度器步骤的额外参数,因为并非所有调度器的参数签名相同
    def prepare_extra_step_kwargs(self, generator, eta):
        # eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略此参数
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 并且应在 [0, 1] 范围内
    
        # 检查调度器的 step 方法是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外参数字典
        extra_step_kwargs = {}
        # 如果接受 eta,则将其添加到额外参数字典
        if accepts_eta:
            extra_step_kwargs["eta"] = eta
    
        # 检查调度器的 step 方法是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,则将其添加到额外参数字典
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回包含额外参数的字典
        return extra_step_kwargs
    
    # 检查输入参数的有效性
    def check_inputs(
        self,
        prompt,
        prompt_2,
        height,
        width,
        callback_steps,
        negative_prompt=None,
        negative_prompt_2=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        pooled_prompt_embeds=None,
        negative_pooled_prompt_embeds=None,
        ip_adapter_image=None,
        ip_adapter_image_embeds=None,
        callback_on_step_end_tensor_inputs=None,
    ):
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的部分
        def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
            # 定义潜在张量的形状,包括批量大小和通道数
            shape = (
                batch_size,
                num_channels_latents,
                int(height) // self.vae_scale_factor,
                int(width) // self.vae_scale_factor,
            )
            # 检查生成器是否为列表,并且其长度与批量大小一致
            if isinstance(generator, list) and len(generator) != batch_size:
                # 如果不一致,抛出值错误
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 如果没有提供潜在张量,则随机生成一个
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 如果提供了潜在张量,则将其移动到指定设备
                latents = latents.to(device)
    
            # 按照调度器所需的标准差缩放初始噪声
            latents = latents * self.scheduler.init_noise_sigma
            # 返回处理后的潜在张量
            return latents
    
        # 定义获取时间标识的函数
        def _get_add_time_ids(
            self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
    ):
        # 创建一个包含原始大小、裁剪左上角坐标和目标大小的列表
        add_time_ids = list(original_size + crops_coords_top_left + target_size)

        # 计算通过添加时间嵌入维度得到的总嵌入维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取 UNet 中添加嵌入的预期输入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查预期的和实际的嵌入维度是否匹配
        if expected_add_embed_dim != passed_add_embed_dim:
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将添加的时间 ID 转换为张量,指定数据类型
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 返回添加的时间 ID 张量
        return add_time_ids

    def upcast_vae(self):
        # 获取 VAE 的数据类型
        dtype = self.vae.dtype
        # 将 VAE 转换为浮点数类型
        self.vae.to(dtype=torch.float32)
        # 检查是否使用 Torch 2.0 或 XFormers 的注意力处理器
        use_torch_2_0_or_xformers = isinstance(
            self.vae.decoder.mid_block.attentions[0].processor,
            (
                AttnProcessor2_0,
                XFormersAttnProcessor,
                FusedAttnProcessor2_0,
            ),
        )
        # 如果使用 XFormers 或 Torch 2.0,注意力块不需要为浮点32,这样可以节省大量内存
        if use_torch_2_0_or_xformers:
            self.vae.post_quant_conv.to(dtype)
            self.vae.decoder.conv_in.to(dtype)
            self.vae.decoder.mid_block.to(dtype)

    # 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 中复制的方法
    def get_guidance_scale_embedding(
        self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
    ) -> torch.Tensor:
        """
        查看 https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298

        参数:
            w (`torch.Tensor`):
                使用指定的引导比例生成嵌入向量,以丰富时间步嵌入。
            embedding_dim (`int`, *可选*, 默认值为 512):
                要生成的嵌入的维度。
            dtype (`torch.dtype`, *可选*, 默认值为 `torch.float32`):
                生成的嵌入的数据类型。

        返回:
            `torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
        """
        # 确保输入张量 w 是一维的
        assert len(w.shape) == 1
        # 将 w 的值扩大 1000 倍
        w = w * 1000.0

        # 计算一半的嵌入维度
        half_dim = embedding_dim // 2
        # 计算嵌入的基础值
        emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
        # 计算指数以生成嵌入
        emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
        # 生成最终的嵌入,通过将 w 和基础值相乘
        emb = w.to(dtype)[:, None] * emb[None, :]
        # 合并正弦和余弦嵌入
        emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
        # 如果嵌入维度是奇数,进行零填充
        if embedding_dim % 2 == 1:  # zero pad
            emb = torch.nn.functional.pad(emb, (0, 1))
        # 确保生成的嵌入形状正确
        assert emb.shape == (w.shape[0], embedding_dim)
        # 返回生成的嵌入
        return emb

    @property
    # 定义方法以获取指导比例
    def guidance_scale(self):
        # 返回指导比例的值
        return self._guidance_scale

    # 将指导重缩放定义为属性
    @property
    def guidance_rescale(self):
        # 返回重缩放值
        return self._guidance_rescale

    # 将剪切跳过定义为属性
    @property
    def clip_skip(self):
        # 返回剪切跳过的值
        return self._clip_skip

    # 定义属性以判断是否执行无分类器自由引导
    @property
    def do_classifier_free_guidance(self):
        # 判断指导比例是否大于 1 且时间条件投影维度是否为 None
        return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

    # 将交叉注意力关键字参数定义为属性
    @property
    def cross_attention_kwargs(self):
        # 返回交叉注意力关键字参数
        return self._cross_attention_kwargs

    # 将去噪结束定义为属性
    @property
    def denoising_end(self):
        # 返回去噪结束的值
        return self._denoising_end

    # 将时间步数定义为属性
    @property
    def num_timesteps(self):
        # 返回时间步数
        return self._num_timesteps

    # 将中断定义为属性
    @property
    def interrupt(self):
        # 返回中断状态
        return self._interrupt

    # 使用装饰器禁用梯度计算
    @torch.no_grad()
    # 替换示例文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义可调用方法以处理输入
    def __call__(
        # 输入提示,可以是字符串或字符串列表
        prompt: Union[str, List[str]] = None,
        # 第二个提示,可选
        prompt_2: Optional[Union[str, List[str]]] = None,
        # 可选的高度
        height: Optional[int] = None,
        # 可选的宽度
        width: Optional[int] = None,
        # 推理步骤的数量,默认值为 50
        num_inference_steps: int = 50,
        # 时间步列表,可选
        timesteps: List[int] = None,
        # sigma 列表,可选
        sigmas: List[float] = None,
        # 可选的去噪结束值
        denoising_end: Optional[float] = None,
        # 指导比例,默认为 5.0
        guidance_scale: float = 5.0,
        # 可选的负提示,可以是字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 第二个负提示,可选
        negative_prompt_2: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为 1
        num_images_per_prompt: Optional[int] = 1,
        # eta 值,默认为 0.0
        eta: float = 0.0,
        # 随机生成器,可选
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在表示,可选
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入,可选
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负提示嵌入,可选
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 池化的提示嵌入,可选
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 负池化的提示嵌入,可选
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 输入适配器图像,可选
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 输入适配器图像嵌入,可选
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 输出类型,默认为 "pil"
        output_type: Optional[str] = "pil",
        # 返回字典的标志,默认为 True
        return_dict: bool = True,
        # 交叉注意力关键字参数,可选
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 原始重缩放值,默认为 0.0
        guidance_rescale: float = 0.0,
        # 可选的原始大小
        original_size: Optional[Tuple[int, int]] = None,
        # 左上角裁剪坐标,默认为 (0, 0)
        crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 可选的目标大小
        target_size: Optional[Tuple[int, int]] = None,
        # 可选的负原始大小
        negative_original_size: Optional[Tuple[int, int]] = None,
        # 负裁剪左上角坐标,默认为 (0, 0)
        negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 可选的负目标大小
        negative_target_size: Optional[Tuple[int, int]] = None,
        # 可选的剪切跳过值
        clip_skip: Optional[int] = None,
        # 可选的回调函数,在步骤结束时调用
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 默认输入张量名称列表
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # 其他关键字参数
        **kwargs,

.\diffusers\pipelines\stable_diffusion_xl\pipeline_stable_diffusion_xl_img2img.py

# 版权声明,表明版权属于 HuggingFace 团队,所有权利保留
# 
# 根据 Apache 2.0 许可证授权("许可证"); 
# 除非遵循许可证,否则不得使用此文件。
# 可通过以下网址获取许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律或书面协议另有约定, 
# 否则根据许可证分发的软件是以“原样”基础分发,不附有任何类型的保证或条件,
# 无论是明示或暗示的。
# 请参见许可证以获取关于权限和限制的具体语言。

import inspect  # 导入 inspect 模块,用于检查对象
from typing import Any, Callable, Dict, List, Optional, Tuple, Union  # 导入类型提示,定义多种数据类型

import PIL.Image  # 导入 PIL.Image 模块,用于图像处理
import torch  # 导入 PyTorch 库,用于深度学习操作
from transformers import (  # 从 transformers 库导入多个模型和处理器
    CLIPImageProcessor,  # CLIP 图像处理器
    CLIPTextModel,  # CLIP 文本模型
    CLIPTextModelWithProjection,  # 带投影的 CLIP 文本模型
    CLIPTokenizer,  # CLIP 标记器
    CLIPVisionModelWithProjection,  # 带投影的 CLIP 视觉模型
)

from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 导入回调相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor  # 导入图像处理相关的类
from ...loaders import (  # 导入加载器相关的混入类
    FromSingleFileMixin,  # 单文件加载混入
    IPAdapterMixin,  # IP 适配器混入
    StableDiffusionXLLoraLoaderMixin,  # StableDiffusion XL Lora 加载混入
    TextualInversionLoaderMixin,  # 文本反转加载混入
)
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel  # 导入模型相关的类
from ...models.attention_processor import (  # 导入注意力处理器相关的类
    AttnProcessor2_0,  # 注意力处理器版本 2.0
    XFormersAttnProcessor,  # XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder  # 导入调整 Lora 规模的文本编码器的函数
from ...schedulers import KarrasDiffusionSchedulers  # 导入 Karras 扩散调度器
from ...utils import (  # 导入工具类中的多个函数和常量
    USE_PEFT_BACKEND,  # 是否使用 PEFT 后端
    deprecate,  # 用于标记已弃用功能的装饰器
    is_invisible_watermark_available,  # 检查是否可用隐形水印
    is_torch_xla_available,  # 检查是否可用 Torch XLA
    logging,  # 导入日志记录工具
    replace_example_docstring,  # 替换示例文档字符串的工具
    scale_lora_layers,  # 缩放 Lora 层的函数
    unscale_lora_layers,  # 反缩放 Lora 层的函数
)
from ...utils.torch_utils import randn_tensor  # 导入生成随机张量的工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 导入扩散管道和稳定扩散混合类
from .pipeline_output import StableDiffusionXLPipelineOutput  # 导入稳定扩散 XL 管道输出类

# 检查隐形水印功能是否可用,如果可用,则导入水印处理器
if is_invisible_watermark_available():
    from .watermark import StableDiffusionXLWatermarker  # 导入稳定扩散 XL 水印处理器

# 检查 Torch XLA 是否可用,若可用,则导入其核心模型
if is_torch_xla_available():
    import torch_xla.core.xla_model as xm  # 导入 XLA 模型核心

    XLA_AVAILABLE = True  # 设置标志,指示 XLA 可用
else:
    XLA_AVAILABLE = False  # 设置标志,指示 XLA 不可用

logger = logging.get_logger(__name__)  # 初始化日志记录器,使用当前模块名作为标识

EXAMPLE_DOC_STRING = """  # 示例文档字符串,展示如何使用管道
    Examples:
        ```py
        >>> import torch  # 导入 PyTorch 库
        >>> from diffusers import StableDiffusionXLImg2ImgPipeline  # 导入 StableDiffusion XL 图像到图像管道
        >>> from diffusers.utils import load_image  # 导入加载图像的工具

        >>> pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained(  # 从预训练模型创建管道
        ...     "stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16  # 指定模型和数据类型
        ... )
        >>> pipe = pipe.to("cuda")  # 将管道移动到 GPU
        >>> url = "https://huggingface.co/datasets/patrickvonplaten/images/resolve/main/aa_xl/000000009.png"  # 图像的 URL

        >>> init_image = load_image(url).convert("RGB")  # 加载并转换图像为 RGB 模式
        >>> prompt = "a photo of an astronaut riding a horse on mars"  # 定义生成图像的提示
        >>> image = pipe(prompt, image=init_image).images[0]  # 使用管道生成图像
        ```py
"""

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制的内容
# 根据给定的噪声配置和文本预测噪声进行重新缩放
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
    """
    根据 `guidance_rescale` 重新缩放 `noise_cfg`。基于论文 [Common Diffusion Noise Schedules and
    Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 的发现。参见第 3.4 节
    """
    # 计算噪声预测文本的标准差,保持维度
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算噪声配置的标准差,保持维度
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 根据标准差调整噪声预测,修正过曝问题
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 将调整后的噪声与原始噪声按 `guidance_rescale` 因子混合,以避免图像看起来“普通”
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回重新缩放后的噪声配置
    return noise_cfg


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
    # 如果 encoder_output 有 latent_dist 属性并且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从潜在分布中采样并返回
        return encoder_output.latent_dist.sample(generator)
    # 如果 encoder_output 有 latent_dist 属性并且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回潜在分布的众数
        return encoder_output.latent_dist.mode()
    # 如果 encoder_output 有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 直接返回潜在向量
        return encoder_output.latents
    # 如果都没有,抛出属性错误
    else:
        raise AttributeError("Could not access latents of provided encoder_output")


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制的函数
def retrieve_timesteps(
    scheduler,
    num_inference_steps: Optional[int] = None,
    device: Optional[Union[str, torch.device]] = None,
    timesteps: Optional[List[int]] = None,
    sigmas: Optional[List[float]] = None,
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法并在调用后从调度器获取时间步。处理自定义时间步。任何额外参数将传递给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            获取时间步的调度器。
        num_inference_steps (`int`):
            生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`,*可选*):
            将时间步移动到的设备。如果为 `None`,则时间步不移动。
        timesteps (`List[int]`,*可选*):
            自定义时间步,用于覆盖调度器的时间步间隔策略。如果传入 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`,*可选*):
            自定义 sigmas,用于覆盖调度器的时间步间隔策略。如果传入 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
    # 返回值说明:返回一个元组,第一个元素是调度器的时间步调度,第二个元素是推理步骤的数量
    """
    # 检查是否同时传入了时间步和 sigma,抛出错误
    if timesteps is not None and sigmas is not None:
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
    # 如果时间步不为空,进行相应处理
    if timesteps is not None:
        # 检查调度器是否接受自定义时间步
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受,抛出错误提示
        if not accepts_timesteps:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 设置调度器的时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 获取当前调度器的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果 sigma 不为空,进行相应处理
    elif sigmas is not None:
        # 检查调度器是否接受自定义 sigma
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受,抛出错误提示
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 设置调度器的 sigma
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 获取当前调度器的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果都不为空,则设置默认的时间步
    else:
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取当前调度器的时间步
        timesteps = scheduler.timesteps
    # 返回时间步和推理步骤的数量
    return timesteps, num_inference_steps
# 定义一个类,名为 StableDiffusionXLImg2ImgPipeline,继承多个父类
class StableDiffusionXLImg2ImgPipeline(
    # 继承 DiffusionPipeline 类
    DiffusionPipeline,
    # 继承 StableDiffusionMixin 类
    StableDiffusionMixin,
    # 继承 TextualInversionLoaderMixin 类
    TextualInversionLoaderMixin,
    # 继承 FromSingleFileMixin 类
    FromSingleFileMixin,
    # 继承 StableDiffusionXLLoraLoaderMixin 类
    StableDiffusionXLLoraLoaderMixin,
    # 继承 IPAdapterMixin 类
    IPAdapterMixin,
):
    # 文档字符串,描述该管道的功能
    r"""
    Pipeline for text-to-image generation using Stable Diffusion XL.

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
    library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)

    The pipeline also inherits the following loading methods:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
        - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
    """
    # 文档字符串,描述函数的参数及其作用
        Args:
            vae ([`AutoencoderKL`]):
                变分自编码器(VAE)模型,用于将图像编码和解码为潜在表示。
            text_encoder ([`CLIPTextModel`]):
                冻结的文本编码器。Stable Diffusion XL 使用
                [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel) 的文本部分,
                特别是 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
            text_encoder_2 ([` CLIPTextModelWithProjection`]):
                第二个冻结文本编码器。Stable Diffusion XL 使用
                [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection) 的文本和池部分,
                特别是 [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k) 变体。
            tokenizer (`CLIPTokenizer`):
                类 [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer) 的分词器。
            tokenizer_2 (`CLIPTokenizer`):
                第二个类 [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer) 的分词器。
            unet ([`UNet2DConditionModel`]): 条件 U-Net 架构,用于去噪编码后的图像潜在表示。
            scheduler ([`SchedulerMixin`]):
                用于与 `unet` 结合使用的调度器,以去噪编码后的图像潜在表示。可以是
                [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
            requires_aesthetics_score (`bool`, *optional*, defaults to `"False"`):
                `unet` 是否需要在推理过程中传递 `aesthetic_score` 条件。另见
                `stabilityai/stable-diffusion-xl-refiner-1-0` 的配置。
            force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
                是否强制将负提示嵌入始终设置为 0。另见
                `stabilityai/stable-diffusion-xl-base-1-0` 的配置。
            add_watermarker (`bool`, *optional*):
                是否使用 [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) 为输出图像加水印。如果未定义,
                如果安装了该包,则默认为 True,否则不使用水印。
        """
    
        # 定义模型的 CPU 卸载顺序
        model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
        # 定义可选组件列表
        _optional_components = [
            "tokenizer",
            "tokenizer_2",
            "text_encoder",
            "text_encoder_2",
            "image_encoder",
            "feature_extractor",
        ]
    # 存储输入张量的回调名称列表
        _callback_tensor_inputs = [
            "latents",  # 潜在变量的名称
            "prompt_embeds",  # 正面提示的嵌入
            "negative_prompt_embeds",  # 负面提示的嵌入
            "add_text_embeds",  # 附加文本的嵌入
            "add_time_ids",  # 附加时间标识
            "negative_pooled_prompt_embeds",  # 负面池化提示的嵌入
            "add_neg_time_ids",  # 附加负时间标识
        ]
    
        # 初始化函数,设置模型及其参数
        def __init__(
            self,
            vae: AutoencoderKL,  # 变分自编码器模型
            text_encoder: CLIPTextModel,  # 文本编码器模型
            text_encoder_2: CLIPTextModelWithProjection,  # 第二文本编码器,带有投影
            tokenizer: CLIPTokenizer,  # 主要的分词器
            tokenizer_2: CLIPTokenizer,  # 第二分词器
            unet: UNet2DConditionModel,  # 条件 U-Net 模型
            scheduler: KarrasDiffusionSchedulers,  # 扩散调度器
            image_encoder: CLIPVisionModelWithProjection = None,  # 可选的图像编码器,带有投影
            feature_extractor: CLIPImageProcessor = None,  # 可选的特征提取器
            requires_aesthetics_score: bool = False,  # 是否需要美学评分
            force_zeros_for_empty_prompt: bool = True,  # 是否对空提示强制设置为零
            add_watermarker: Optional[bool] = None,  # 可选的水印添加标志
        ):
            # 调用父类的初始化方法
            super().__init__()
    
            # 注册各个模块,便于后续调用
            self.register_modules(
                vae=vae,
                text_encoder=text_encoder,
                text_encoder_2=text_encoder_2,
                tokenizer=tokenizer,
                tokenizer_2=tokenizer_2,
                unet=unet,
                image_encoder=image_encoder,
                feature_extractor=feature_extractor,
                scheduler=scheduler,
            )
            # 将强制空提示为零的配置注册到系统中
            self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
            # 将美学评分的配置注册到系统中
            self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
            # 计算 VAE 的缩放因子
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建图像处理器实例,使用缩放因子
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
    
            # 根据条件决定是否添加水印
            add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
    
            # 根据是否添加水印,初始化水印对象
            if add_watermarker:
                self.watermark = StableDiffusionXLWatermarker()  # 创建水印对象
            else:
                self.watermark = None  # 不添加水印
    
        # 从稳定扩散 XL 管道复制的提示编码函数
        def encode_prompt(
            self,
            prompt: str,  # 正面提示字符串
            prompt_2: Optional[str] = None,  # 可选的第二个提示字符串
            device: Optional[torch.device] = None,  # 设备选择(CPU或GPU)
            num_images_per_prompt: int = 1,  # 每个提示生成的图像数量
            do_classifier_free_guidance: bool = True,  # 是否执行无分类器引导
            negative_prompt: Optional[str] = None,  # 可选的负面提示字符串
            negative_prompt_2: Optional[str] = None,  # 可选的第二个负面提示字符串
            prompt_embeds: Optional[torch.Tensor] = None,  # 预先计算的正面提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 预先计算的负面提示嵌入
            pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 预先计算的池化提示嵌入
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 预先计算的负面池化提示嵌入
            lora_scale: Optional[float] = None,  # 可选的 LORA 缩放因子
            clip_skip: Optional[int] = None,  # 可选的跳过的 CLIP 层数
        # 从稳定扩散管道复制的额外步骤参数准备函数
    # 准备额外的参数以供调度器步骤使用,因为不同的调度器具有不同的参数签名
    def prepare_extra_step_kwargs(self, generator, eta):
        # 检查调度器的步骤函数是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外参数的字典
        extra_step_kwargs = {}
        # 如果调度器接受 eta 参数,则将其添加到额外参数字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta
    
        # 检查调度器的步骤函数是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果调度器接受 generator 参数,则将其添加到额外参数字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回包含额外参数的字典
        return extra_step_kwargs
    
    # 检查输入参数的有效性和类型
    def check_inputs(
        self,
        prompt,  # 输入的提示文本
        prompt_2,  # 第二个输入的提示文本
        strength,  # 强度参数,控制效果的强弱
        num_inference_steps,  # 推理步骤的数量
        callback_steps,  # 回调步骤,控制回调的频率
        negative_prompt=None,  # 可选的负面提示文本
        negative_prompt_2=None,  # 第二个可选的负面提示文本
        prompt_embeds=None,  # 可选的提示嵌入表示
        negative_prompt_embeds=None,  # 可选的负面提示嵌入表示
        ip_adapter_image=None,  # 可选的适配器图像
        ip_adapter_image_embeds=None,  # 可选的适配器图像嵌入表示
        callback_on_step_end_tensor_inputs=None,  # 可选的回调输入张量
    # 获取时间步长,用于推理过程
        def get_timesteps(self, num_inference_steps, strength, device, denoising_start=None):
            # 根据初始时间步计算原始时间步
            if denoising_start is None:
                # 计算初始时间步,取 num_inference_steps 与 strength 的乘积和 num_inference_steps 的较小值
                init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
                # 计算起始时间步,确保不小于零
                t_start = max(num_inference_steps - init_timestep, 0)
            else:
                # 如果给定 denoising_start,起始时间步设为零
                t_start = 0
    
            # 从调度器中获取时间步,基于起始时间步和调度器的顺序
            timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
    
            # 如果提供了 denoising_start,强度将不相关
            if denoising_start is not None:
                # 计算离散时间步截止点
                discrete_timestep_cutoff = int(
                    round(
                        self.scheduler.config.num_train_timesteps
                        - (denoising_start * self.scheduler.config.num_train_timesteps)
                    )
                )
    
                # 计算有效的推理时间步数
                num_inference_steps = (timesteps < discrete_timestep_cutoff).sum().item()
                # 检查是否为二阶调度器,并处理时间步数的奇偶性
                if self.scheduler.order == 2 and num_inference_steps % 2 == 0:
                    # 若时间步数为偶数,增加1以避免中断导致错误结果
                    num_inference_steps = num_inference_steps + 1
    
                # 从时间步的末尾切片,确保顺序正确
                timesteps = timesteps[-num_inference_steps:]
                return timesteps, num_inference_steps
    
            # 返回未经过 denoising_start 调整的时间步和时间步数
            return timesteps, num_inference_steps - t_start
    
        # 准备潜在变量的函数
        def prepare_latents(
            self, image, timestep, batch_size, num_images_per_prompt, dtype, device, generator=None, add_noise=True
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制
    # 定义一个方法,用于对输入图像进行编码
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 如果输入的图像不是张量,使用特征提取器将其转换为张量
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像移动到指定的设备,并转换为适当的数据类型
            image = image.to(device=device, dtype=dtype)
    
            # 如果需要输出隐藏状态
            if output_hidden_states:
                # 通过图像编码器编码图像,并获取倒数第二层的隐藏状态
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                # 将隐藏状态沿第0维重复指定的次数
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建一个与图像形状相同的全零张量,并通过编码器获取其隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                # 将无条件图像的隐藏状态沿第0维重复指定的次数
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回有条件和无条件的图像编码隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 如果不需要输出隐藏状态,直接编码图像并获取图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 将图像嵌入沿第0维重复指定的次数
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建一个与图像嵌入形状相同的全零张量作为无条件图像嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回有条件和无条件的图像嵌入
                return image_embeds, uncond_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的方法
        def prepare_ip_adapter_image_embeds(
            # 定义方法参数,包括输入适配器图像、适配器图像嵌入、设备、每个提示的图像数量和是否进行分类器自由引导
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    # 方法的开始,接收各种参数
        ):
            # 初始化图像嵌入列表
            image_embeds = []
            # 如果启用无分类器自由引导,则初始化负图像嵌入列表
            if do_classifier_free_guidance:
                negative_image_embeds = []
            # 如果 IP 适配器图像嵌入为空
            if ip_adapter_image_embeds is None:
                # 检查 ip_adapter_image 是否为列表,如果不是则将其转为列表
                if not isinstance(ip_adapter_image, list):
                    ip_adapter_image = [ip_adapter_image]
    
                # 检查 ip_adapter_image 的长度是否与 IP 适配器数量一致
                if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                    raise ValueError(
                        f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                    )
                
                # 遍历每个 IP 适配器图像和对应的图像投影层
                for single_ip_adapter_image, image_proj_layer in zip(
                    ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
                ):
                    # 判断输出隐藏状态是否为真,若图像投影层不是 ImageProjection 类型
                    output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                    # 对单个图像进行编码,返回图像嵌入和负图像嵌入
                    single_image_embeds, single_negative_image_embeds = self.encode_image(
                        single_ip_adapter_image, device, 1, output_hidden_state
                    )
    
                    # 将单个图像嵌入添加到图像嵌入列表
                    image_embeds.append(single_image_embeds[None, :])
                    # 如果启用无分类器自由引导,则添加负图像嵌入
                    if do_classifier_free_guidance:
                        negative_image_embeds.append(single_negative_image_embeds[None, :])
            else:
                # 如果 IP 适配器图像嵌入不为空,则遍历其内容
                for single_image_embeds in ip_adapter_image_embeds:
                    # 如果启用无分类器自由引导,则将单个图像嵌入拆分为负和正
                    if do_classifier_free_guidance:
                        single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                        negative_image_embeds.append(single_negative_image_embeds)
                    # 将图像嵌入添加到列表
                    image_embeds.append(single_image_embeds)
    
            # 初始化 IP 适配器图像嵌入列表
            ip_adapter_image_embeds = []
            # 遍历图像嵌入
            for i, single_image_embeds in enumerate(image_embeds):
                # 根据每个提示的图像数量重复图像嵌入
                single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
                # 如果启用无分类器自由引导,则重复负图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                    single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
    
                # 将图像嵌入移动到指定设备
                single_image_embeds = single_image_embeds.to(device=device)
                # 将处理后的图像嵌入添加到 IP 适配器图像嵌入列表
                ip_adapter_image_embeds.append(single_image_embeds)
    
            # 返回 IP 适配器图像嵌入列表
            return ip_adapter_image_embeds
    
        # 定义获取附加时间 ID 的方法
        def _get_add_time_ids(
            self,
            # 原始图像的大小
            original_size,
            # 裁剪区域的左上角坐标
            crops_coords_top_left,
            # 目标图像大小
            target_size,
            # 审美分数
            aesthetic_score,
            # 负审美分数
            negative_aesthetic_score,
            # 负原始图像大小
            negative_original_size,
            # 负裁剪区域的左上角坐标
            negative_crops_coords_top_left,
            # 负目标图像大小
            negative_target_size,
            # 数据类型
            dtype,
            # 文本编码器投影维度(可选)
            text_encoder_projection_dim=None,
    # 结束当前方法定义
    ):
        # 检查配置是否需要美学评分
        if self.config.requires_aesthetics_score:
            # 组合原始尺寸、裁剪坐标和美学评分,转换为列表
            add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
            # 组合负原始尺寸、负裁剪坐标和负美学评分,转换为列表
            add_neg_time_ids = list(
                negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
            )
        else:
            # 组合原始尺寸、裁剪坐标和目标尺寸,转换为列表
            add_time_ids = list(original_size + crops_coords_top_left + target_size)
            # 组合负原始尺寸、裁剪坐标和负目标尺寸,转换为列表
            add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)

        # 计算传递的附加嵌入维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取期望的附加嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查期望的附加嵌入维度是否大于传递的维度,并且两者差值符合配置要求
        if (
            expected_add_embed_dim > passed_add_embed_dim
            and (expected_add_embed_dim - passed_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出值错误,提示维度不匹配,需启用美学评分
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to enable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=True)` to make sure `aesthetic_score` {aesthetic_score} and `negative_aesthetic_score` {negative_aesthetic_score} is correctly used by the model."
            )
        # 检查期望的附加嵌入维度是否小于传递的维度,并且两者差值符合配置要求
        elif (
            expected_add_embed_dim < passed_add_embed_dim
            and (passed_add_embed_dim - expected_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出值错误,提示维度不匹配,需禁用美学评分
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to disable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=False)` to make sure `target_size` {target_size} is correctly used by the model."
            )
        # 检查期望的附加嵌入维度是否与传递的维度不相等
        elif expected_add_embed_dim != passed_add_embed_dim:
            # 抛出值错误,提示模型配置不正确
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将附加时间 ID 转换为张量,指定数据类型
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 将附加负时间 ID 转换为张量,指定数据类型
        add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)

        # 返回附加时间 ID 和附加负时间 ID
        return add_time_ids, add_neg_time_ids

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.StableDiffusionUpscalePipeline.upcast_vae 中复制
    # 定义一个用于上行转换 VAE 的方法
        def upcast_vae(self):
            # 获取 VAE 的数据类型
            dtype = self.vae.dtype
            # 将 VAE 转换为 float32 类型
            self.vae.to(dtype=torch.float32)
            # 判断是否使用 Torch 2.0 或 XFormers 的注意力处理器
            use_torch_2_0_or_xformers = isinstance(
                self.vae.decoder.mid_block.attentions[0].processor,
                (
                    AttnProcessor2_0,
                    XFormersAttnProcessor,
                ),
            )
            # 如果使用 xformers 或 torch_2_0,注意力块无需为 float32,可以节省大量内存
            if use_torch_2_0_or_xformers:
                # 将后量化卷积层转换为原始数据类型
                self.vae.post_quant_conv.to(dtype)
                # 将解码器的输入卷积层转换为原始数据类型
                self.vae.decoder.conv_in.to(dtype)
                # 将解码器中间块转换为原始数据类型
                self.vae.decoder.mid_block.to(dtype)
    
        # 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 中复制的方法
        def get_guidance_scale_embedding(
            self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
        ) -> torch.Tensor:
            """
            参见 https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
    
            参数:
                w (`torch.Tensor`):
                    用指定的引导比例生成嵌入向量,以随后丰富时间步嵌入。
                embedding_dim (`int`, *可选*, 默认为 512):
                    要生成的嵌入维度。
                dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
                    生成嵌入的类型。
    
            返回:
                `torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
            """
            # 确保输入张量 w 的维度为 1
            assert len(w.shape) == 1
            # 将 w 扩大 1000 倍
            w = w * 1000.0
    
            # 计算半维度
            half_dim = embedding_dim // 2
            # 计算嵌入的缩放因子
            emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
            # 生成指数衰减的嵌入
            emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
            # 计算最终嵌入矩阵
            emb = w.to(dtype)[:, None] * emb[None, :]
            # 将正弦和余弦嵌入合并
            emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
            # 如果嵌入维度为奇数,则进行零填充
            if embedding_dim % 2 == 1:  # zero pad
                emb = torch.nn.functional.pad(emb, (0, 1))
            # 确保最终嵌入的形状正确
            assert emb.shape == (w.shape[0], embedding_dim)
            # 返回生成的嵌入
            return emb
    
        # 定义一个属性,返回当前的引导比例
        @property
        def guidance_scale(self):
            return self._guidance_scale
    
        # 定义一个属性,返回当前的引导重标定值
        @property
        def guidance_rescale(self):
            return self._guidance_rescale
    
        # 定义一个属性,返回当前的剪切跳过值
        @property
        def clip_skip(self):
            return self._clip_skip
    
        # 这里定义的 `guidance_scale` 类似于公式 (2) 中的引导权重 `w`
        # 来源于 Imagen 论文: https://arxiv.org/pdf/2205.11487.pdf。`guidance_scale = 1`
        # 表示不进行分类器自由引导。
        @property
        def do_classifier_free_guidance(self):
            return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
    
        # 定义一个属性,返回当前的交叉注意力关键字参数
        @property
        def cross_attention_kwargs(self):
            return self._cross_attention_kwargs
    
        # 定义一个属性,返回当前的去噪结束值
        @property
        def denoising_end(self):
            return self._denoising_end
    
        # 定义一个属性
    # 定义方法 denoising_start,返回去噪开始的值
    def denoising_start(self):
        return self._denoising_start

    # 定义属性 num_timesteps,返回时间步数的值
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 定义属性 interrupt,返回中断状态的值
    @property
    def interrupt(self):
        return self._interrupt

    # 装饰器,禁用梯度计算以减少内存使用
    @torch.no_grad()
    # 替换文档字符串为示例文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义可调用方法 __call__,接收多个参数以执行具体操作
    def __call__(
        # 输入提示,可以是字符串或字符串列表
        self,
        prompt: Union[str, List[str]] = None,
        # 第二个提示,可以是字符串或字符串列表
        prompt_2: Optional[Union[str, List[str]]] = None,
        # 输入图像,可以是特定格式的图像输入
        image: PipelineImageInput = None,
        # 去噪强度,默认值为 0.3
        strength: float = 0.3,
        # 推理步骤数,默认值为 50
        num_inference_steps: int = 50,
        # 时间步列表,默认为 None
        timesteps: List[int] = None,
        # Sigma 值列表,默认为 None
        sigmas: List[float] = None,
        # 可选的去噪开始值
        denoising_start: Optional[float] = None,
        # 可选的去噪结束值
        denoising_end: Optional[float] = None,
        # 引导比例,默认值为 5.0
        guidance_scale: float = 5.0,
        # 可选的负提示,可以是字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 可选的第二个负提示
        negative_prompt_2: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为 1
        num_images_per_prompt: Optional[int] = 1,
        # eta 值,默认值为 0.0
        eta: float = 0.0,
        # 随机数生成器,可以是单个或多个生成器
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在变量,可以是 PyTorch 张量
        latents: Optional[torch.Tensor] = None,
        # 提示的嵌入表示,可以是 PyTorch 张量
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负提示的嵌入表示,可以是 PyTorch 张量
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 池化后的提示嵌入,可以是 PyTorch 张量
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 负池化提示嵌入,可以是 PyTorch 张量
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的图像输入适配器图像
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 可选的图像适配器嵌入列表
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 输出类型,默认值为 "pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典,默认为 True
        return_dict: bool = True,
        # 可选的交叉注意力参数字典
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 引导重新缩放,默认值为 0.0
        guidance_rescale: float = 0.0,
        # 原始大小的元组,默认为 None
        original_size: Tuple[int, int] = None,
        # 左上角裁剪坐标,默认值为 (0, 0)
        crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 目标大小的元组,默认为 None
        target_size: Tuple[int, int] = None,
        # 可选的负原始大小元组
        negative_original_size: Optional[Tuple[int, int]] = None,
        # 负裁剪左上角坐标,默认值为 (0, 0)
        negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 可选的负目标大小元组
        negative_target_size: Optional[Tuple[int, int]] = None,
        # 美学分数,默认值为 6.0
        aesthetic_score: float = 6.0,
        # 负美学分数,默认值为 2.5
        negative_aesthetic_score: float = 2.5,
        # 可选的跳过裁剪的数量
        clip_skip: Optional[int] = None,
        # 可选的步骤结束时回调
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 默认的步骤结束时回调输入的张量名称
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # 可接受的其他参数
        **kwargs,

.\diffusers\pipelines\stable_diffusion_xl\pipeline_stable_diffusion_xl_inpaint.py

# 版权声明,指明该代码属于 HuggingFace 团队,受版权保护
# 本文件根据 Apache 许可证第 2.0 版授权,使用需遵循许可证条款
# 许可证的副本可以在以下网址获取
#     http://www.apache.org/licenses/LICENSE-2.0
# 除非法律要求或书面同意,软件按 "原样" 方式分发,不提供任何形式的保证或条件
# 详见许可证中关于权限和限制的具体条款

# 导入 inspect 模块,用于获取活跃的对象的源代码和文档
import inspect
# 从 typing 模块导入多种类型提示工具
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 numpy 库,通常用于数值计算和数组操作
import numpy as np
# 导入 PIL.Image 库,用于处理图像
import PIL.Image
# 导入 torch 库,PyTorch 深度学习框架
import torch
# 从 transformers 库导入各种 CLIP 模型和处理器
from transformers import (
    CLIPImageProcessor,  # 图像处理器
    CLIPTextModel,  # 文本模型
    CLIPTextModelWithProjection,  # 带投影的文本模型
    CLIPTokenizer,  # 文本分词器
    CLIPVisionModelWithProjection,  # 带投影的视觉模型
)

# 导入回调函数和处理器类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入图像处理器类
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 导入加载器类,负责从文件加载模型
from ...loaders import (
    FromSingleFileMixin,  # 从单个文件加载混合
    IPAdapterMixin,  # IP 适配器混合
    StableDiffusionXLLoraLoaderMixin,  # 稳定扩散 XL Lora 加载器混合
    TextualInversionLoaderMixin,  # 文本反演加载器混合
)
# 导入模型类,用于图像生成
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
# 导入注意力处理器类
from ...models.attention_processor import (
    AttnProcessor2_0,  # 注意力处理器版本 2.0
    XFormersAttnProcessor,  # XFormers 注意力处理器
)
# 导入调度器类,用于控制生成过程
from ...schedulers import KarrasDiffusionSchedulers
# 导入实用工具,提供额外功能
from ...utils import (
    USE_PEFT_BACKEND,  # 使用 PEFT 后端的标志
    deprecate,  # 标记已弃用的功能
    is_invisible_watermark_available,  # 检查是否支持隐形水印
    is_torch_xla_available,  # 检查是否可用 Torch XLA
    logging,  # 日志记录模块
    replace_example_docstring,  # 替换示例文档字符串
    scale_lora_layers,  # 调整 Lora 层的缩放
    unscale_lora_layers,  # 恢复 Lora 层的缩放
)
# 从 torch_utils 模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 导入扩散管道和稳定扩散混合的基类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入管道输出类
from .pipeline_output import StableDiffusionXLPipelineOutput

# 如果隐形水印可用,则导入水印处理器
if is_invisible_watermark_available():
    from .watermark import StableDiffusionXLWatermarker

# 如果 Torch XLA 可用,则导入相关核心功能
if is_torch_xla_available():
    import torch_xla.core.xla_model as xm  # 导入 Torch XLA 核心模块

    XLA_AVAILABLE = True  # 设置标志表示 XLA 可用
else:
    XLA_AVAILABLE = False  # 设置标志表示 XLA 不可用

# 创建日志记录器,用于记录模块内的日志信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,用于说明示例代码或函数的作用
EXAMPLE_DOC_STRING = """
    # 示例代码使用了 Stable Diffusion XL 进行图像修复
        Examples:
            ```py
            # 导入 PyTorch 库
            >>> import torch
            # 从 diffusers 库导入 StableDiffusionXLInpaintPipeline 类
            >>> from diffusers import StableDiffusionXLInpaintPipeline
            # 从 diffusers.utils 导入 load_image 函数
            >>> from diffusers.utils import load_image
    
            # 创建 StableDiffusionXLInpaintPipeline 的实例,加载预训练模型
            >>> pipe = StableDiffusionXLInpaintPipeline.from_pretrained(
            ...     "stabilityai/stable-diffusion-xl-base-1.0",  # 模型的名称
            ...     torch_dtype=torch.float16,  # 使用半精度浮点数
            ...     variant="fp16",  # 指定模型变种为 fp16
            ...     use_safetensors=True,  # 启用安全张量
            ... )
            # 将管道转移到 CUDA 设备以加速计算
            >>> pipe.to("cuda")
    
            # 定义要修复的图像的 URL
            >>> img_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png"
            # 定义图像的掩码 URL
            >>> mask_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png"
    
            # 加载并转换初始图像为 RGB 格式
            >>> init_image = load_image(img_url).convert("RGB")
            # 加载并转换掩码图像为 RGB 格式
            >>> mask_image = load_image(mask_url).convert("RGB")
    
            # 定义提示文本,用于指导图像生成
            >>> prompt = "A majestic tiger sitting on a bench"
            # 使用管道生成图像,输入提示、初始图像、掩码图像,并设置推理步骤和强度
            >>> image = pipe(
            ...     prompt=prompt, image=init_image, mask_image=mask_image, num_inference_steps=50, strength=0.80
            ... ).images[0]  # 获取生成的第一张图像
"""
# 多行字符串,可能用于文档说明或注释,内容未给出

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制的函数
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
    """
    根据 `guidance_rescale` 对 `noise_cfg` 进行重新缩放。基于[Common Diffusion Noise Schedules and
    Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf)的研究结果。详见第 3.4 节
    """
    # 计算 noise_pred_text 沿所有维度(除第 0 维)标准差,保持维度
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算 noise_cfg 沿所有维度(除第 0 维)标准差,保持维度
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 重新缩放指导结果(修正过曝问题)
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 根据指导缩放因子 mix 原始结果,避免生成“普通”的图像
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回重新缩放后的噪声配置
    return noise_cfg


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def mask_pil_to_torch(mask, height, width):
    # 预处理 mask
    if isinstance(mask, (PIL.Image.Image, np.ndarray)):
        # 如果 mask 是 PIL 图像或 NumPy 数组,将其放入列表中
        mask = [mask]

    if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):
        # 如果 mask 是 PIL 图像列表,调整每个图像的尺寸
        mask = [i.resize((width, height), resample=PIL.Image.LANCZOS) for i in mask]
        # 将图像转换为灰度并堆叠成 NumPy 数组
        mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)
        # 将数组的数据类型转换为 float32,并归一化到 [0, 1] 范围
        mask = mask.astype(np.float32) / 255.0
    elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):
        # 如果 mask 是 NumPy 数组列表,堆叠数组
        mask = np.concatenate([m[None, None, :] for m in mask], axis=0)

    # 将 NumPy 数组转换为 PyTorch 张量
    mask = torch.from_numpy(mask)
    # 返回转换后的张量
    return mask


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
    # 如果 encoder_output 有 latent_dist 属性并且采样模式是“sample”
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 返回采样的潜在分布
        return encoder_output.latent_dist.sample(generator)
    # 如果 encoder_output 有 latent_dist 属性并且采样模式是“argmax”
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回潜在分布的众数
        return encoder_output.latent_dist.mode()
    # 如果 encoder_output 有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 返回潜在向量
        return encoder_output.latents
    else:
        # 如果都没有,抛出属性错误
        raise AttributeError("Could not access latents of provided encoder_output")


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制的函数
def retrieve_timesteps(
    scheduler,
    num_inference_steps: Optional[int] = None,
    device: Optional[Union[str, torch.device]] = None,
    timesteps: Optional[List[int]] = None,
    sigmas: Optional[List[float]] = None,
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法并在调用后从调度器获取时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。
    # 参数说明部分
    Args:
        scheduler (`SchedulerMixin`):  # 调度器,用于获取时间步
            The scheduler to get timesteps from.  # 描述调度器的功能
        num_inference_steps (`int`):  # 生成样本时使用的扩散步骤数
            The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
            must be `None`.  # 如果使用此参数,timesteps必须为None
        device (`str` or `torch.device`, *optional*):  # 指定要移动时间步的设备
            The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.  # 如果为None,则不移动时间步
        timesteps (`List[int]`, *optional*):  # 自定义时间步,用于覆盖调度器的时间步间距策略
            Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
            `num_inference_steps` and `sigmas` must be `None`.  # 如果提供该参数,则num_inference_steps和sigmas必须为None
        sigmas (`List[float]`, *optional*):  # 自定义sigmas,用于覆盖调度器的时间步间距策略
            Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
            `num_inference_steps` and `timesteps` must be `None`.  # 如果提供该参数,则num_inference_steps和timesteps必须为None

    Returns:  # 返回值说明部分
        `Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
        second element is the number of inference steps.  # 返回一个元组,包含时间步调度和推断步骤数
    """
    # 检查是否同时提供了timesteps和sigmas
    if timesteps is not None and sigmas is not None:  # 如果两个参数都不为None
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")  # 抛出错误,告知只能选择一个参数
    if timesteps is not None:  # 如果timesteps不为None
        # 检查当前调度器是否支持timesteps
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())  # 检查set_timesteps方法的参数中是否包含timesteps
        if not accepts_timesteps:  # 如果不支持
            raise ValueError(  # 抛出错误,告知不支持自定义时间步
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 设置自定义的时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)  # 调用调度器设置时间步
        timesteps = scheduler.timesteps  # 更新时间步为调度器中的值
        num_inference_steps = len(timesteps)  # 计算推断步骤数
    elif sigmas is not None:  # 如果sigmas不为None
        # 检查当前调度器是否支持sigmas
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())  # 检查set_timesteps方法的参数中是否包含sigmas
        if not accept_sigmas:  # 如果不支持
            raise ValueError(  # 抛出错误,告知不支持自定义sigmas
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 设置自定义的sigmas
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)  # 调用调度器设置sigmas
        timesteps = scheduler.timesteps  # 更新时间步为调度器中的值
        num_inference_steps = len(timesteps)  # 计算推断步骤数
    else:  # 如果两个参数都为None
        # 设置默认的推断步骤
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)  # 调用调度器设置默认推断步骤
        timesteps = scheduler.timesteps  # 更新时间步为调度器中的值
    return timesteps, num_inference_steps  # 返回时间步和推断步骤数
# 定义一个名为 StableDiffusionXLInpaintPipeline 的类,继承多个基类
class StableDiffusionXLInpaintPipeline(
    # 继承自 DiffusionPipeline,提供扩散过程的基本功能
    DiffusionPipeline,
    # 继承自 StableDiffusionMixin,提供与稳定扩散相关的功能
    StableDiffusionMixin,
    # 继承自 TextualInversionLoaderMixin,允许加载文本反演嵌入
    TextualInversionLoaderMixin,
    # 继承自 StableDiffusionXLLoraLoaderMixin,允许加载和保存 LoRA 权重
    StableDiffusionXLLoraLoaderMixin,
    # 继承自 FromSingleFileMixin,允许从单个文件加载模型
    FromSingleFileMixin,
    # 继承自 IPAdapterMixin,允许加载 IP 适配器
    IPAdapterMixin,
):
    # 文档字符串,描述该类的用途和继承的功能
    r"""
    Pipeline for text-to-image generation using Stable Diffusion XL.

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
    library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)

    The pipeline also inherits the following loading methods:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
        - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
    # 参数定义部分,说明各参数的用途
    Args:
        vae ([`AutoencoderKL`]):  # 变分自编码器模型,用于将图像编码和解码为潜在表示
            Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
        text_encoder ([`CLIPTextModel`]):  # 冻结的文本编码器,使用 CLIP 的文本部分
            Frozen text-encoder. Stable Diffusion XL uses the text portion of
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
            the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
        text_encoder_2 ([` CLIPTextModelWithProjection`]):  # 第二个冻结文本编码器,使用 CLIP 的文本和池化部分
            Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
            [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
            specifically the
            [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
            variant.
        tokenizer (`CLIPTokenizer`):  # CLIPTokenizer 类的分词器
            Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        tokenizer_2 (`CLIPTokenizer`):  # 第二个 CLIPTokenizer 类的分词器
            Second Tokenizer of class
            [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
        unet ([`UNet2DConditionModel`]):  # 条件 U-Net 架构,用于去噪编码后的图像潜在表示
            Conditional U-Net architecture to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):  # 用于与 `unet` 结合使用的调度器
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        requires_aesthetics_score (`bool`, *optional*, defaults to `"False"`):  # 是否需要美学评分条件
            Whether the `unet` requires a aesthetic_score condition to be passed during inference. Also see the config
            of `stabilityai/stable-diffusion-xl-refiner-1-0`.
        force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):  # 是否将负提示嵌入强制设为 0
            Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
            `stabilityai/stable-diffusion-xl-base-1-0`.
        add_watermarker (`bool`, *optional*):  # 是否使用隐形水印库对输出图像进行水印处理
            Whether to use the [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) to
            watermark output images. If not defined, it will default to True if the package is installed, otherwise no
            watermarker will be used.
    """  # 文档字符串的结束

    # 定义模型的 CPU 卸载顺序
    model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"

    # 可选组件的列表,用于后续处理
    _optional_components = [
        "tokenizer",  # 分词器
        "tokenizer_2",  # 第二个分词器
        "text_encoder",  # 文本编码器
        "text_encoder_2",  # 第二个文本编码器
        "image_encoder",  # 图像编码器
        "feature_extractor",  # 特征提取器
    ]
    # 定义一个回调张量输入的列表,包含多个输入项的名称
    _callback_tensor_inputs = [
        "latents",  # 潜在表示
        "prompt_embeds",  # 提示嵌入
        "negative_prompt_embeds",  # 负提示嵌入
        "add_text_embeds",  # 附加文本嵌入
        "add_time_ids",  # 附加时间标识
        "negative_pooled_prompt_embeds",  # 负池化提示嵌入
        "add_neg_time_ids",  # 附加负时间标识
        "mask",  # 掩码
        "masked_image_latents",  # 被掩蔽的图像潜在表示
    ]

    # 初始化方法,设置各个组件和参数
    def __init__(
        # 定义自编码器
        vae: AutoencoderKL,
        # 定义文本编码器
        text_encoder: CLIPTextModel,
        # 定义第二个文本编码器
        text_encoder_2: CLIPTextModelWithProjection,
        # 定义分词器
        tokenizer: CLIPTokenizer,
        # 定义第二个分词器
        tokenizer_2: CLIPTokenizer,
        # 定义 UNet 模型
        unet: UNet2DConditionModel,
        # 定义调度器
        scheduler: KarrasDiffusionSchedulers,
        # 定义可选的图像编码器
        image_encoder: CLIPVisionModelWithProjection = None,
        # 定义可选的特征提取器
        feature_extractor: CLIPImageProcessor = None,
        # 定义是否需要美学评分的标志
        requires_aesthetics_score: bool = False,
        # 强制空提示为零的标志
        force_zeros_for_empty_prompt: bool = True,
        # 可选的水印添加标志
        add_watermarker: Optional[bool] = None,
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 注册各个模块到当前实例中
        self.register_modules(
            vae=vae,  # 注册自编码器
            text_encoder=text_encoder,  # 注册文本编码器
            text_encoder_2=text_encoder_2,  # 注册第二个文本编码器
            tokenizer=tokenizer,  # 注册分词器
            tokenizer_2=tokenizer_2,  # 注册第二个分词器
            unet=unet,  # 注册 UNet 模型
            image_encoder=image_encoder,  # 注册图像编码器
            feature_extractor=feature_extractor,  # 注册特征提取器
            scheduler=scheduler,  # 注册调度器
        )
        # 将强制空提示为零的标志注册到配置中
        self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
        # 将是否需要美学评分的标志注册到配置中
        self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
        # 计算 VAE 缩放因子,基于块输出通道的数量
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 初始化图像处理器,用于处理 VAE 缩放因子
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
        # 初始化掩码处理器,进行特殊处理
        self.mask_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_normalize=False, do_binarize=True, do_convert_grayscale=True
        )

        # 确定是否添加水印,如果未指定则根据是否可用来设置
        add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()

        # 根据标志设置水印处理器
        if add_watermarker:
            self.watermark = StableDiffusionXLWatermarker()  # 创建水印处理器
        else:
            self.watermark = None  # 不创建水印处理器

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的编码图像的方法
    # 定义一个方法,用于编码图像
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 检查输入的图像是否为张量类型
            if not isinstance(image, torch.Tensor):
                # 如果不是,则使用特征提取器处理图像并返回张量
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像移动到指定的设备上,并转换为正确的数据类型
            image = image.to(device=device, dtype=dtype)
    
            # 如果需要输出隐藏状态
            if output_hidden_states:
                # 对图像进行编码,获取倒数第二层的隐藏状态
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                # 将隐藏状态重复以匹配每个提示的图像数量
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 对全零的图像进行编码,获取其隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                # 将全零图像的隐藏状态重复以匹配每个提示的图像数量
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回编码的图像隐藏状态和全零图像的隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 如果不需要输出隐藏状态,则直接获取图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 将图像嵌入重复以匹配每个提示的图像数量
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建与图像嵌入形状相同的全零张量作为无条件图像嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回图像嵌入和无条件图像嵌入
                return image_embeds, uncond_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的代码
        def prepare_ip_adapter_image_embeds(
            # 定义方法参数,包括适配器图像、适配器图像嵌入、设备、每个提示的图像数量和是否执行分类器自由引导
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    # 开始处理图像嵌入的逻辑
        ):
            # 初始化图像嵌入列表
            image_embeds = []
            # 如果启用分类器自由引导,初始化负图像嵌入列表
            if do_classifier_free_guidance:
                negative_image_embeds = []
            # 如果适配器图像嵌入为空
            if ip_adapter_image_embeds is None:
                # 检查输入图像是否为列表,如果不是则转换为列表
                if not isinstance(ip_adapter_image, list):
                    ip_adapter_image = [ip_adapter_image]
    
                # 确保适配器图像长度与 IP 适配器数量一致
                if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                    raise ValueError(
                        f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                    )
    
                # 遍历每个适配器图像和对应的图像投影层
                for single_ip_adapter_image, image_proj_layer in zip(
                    ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
                ):
                    # 判断输出隐藏状态是否需要
                    output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                    # 编码单个适配器图像,获取嵌入和负嵌入
                    single_image_embeds, single_negative_image_embeds = self.encode_image(
                        single_ip_adapter_image, device, 1, output_hidden_state
                    )
    
                    # 将单个图像嵌入添加到列表
                    image_embeds.append(single_image_embeds[None, :])
                    # 如果启用分类器自由引导,添加负图像嵌入
                    if do_classifier_free_guidance:
                        negative_image_embeds.append(single_negative_image_embeds[None, :])
            else:
                # 如果已有适配器图像嵌入,遍历它们
                for single_image_embeds in ip_adapter_image_embeds:
                    # 如果启用分类器自由引导,分离负图像嵌入和正图像嵌入
                    if do_classifier_free_guidance:
                        single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                        negative_image_embeds.append(single_negative_image_embeds)
                    # 添加正图像嵌入到列表
                    image_embeds.append(single_image_embeds)
    
            # 初始化最终的适配器图像嵌入列表
            ip_adapter_image_embeds = []
            # 遍历图像嵌入
            for i, single_image_embeds in enumerate(image_embeds):
                # 扩展每个图像嵌入至每个提示所需的数量
                single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
                # 如果启用分类器自由引导,扩展负图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                    # 将负图像嵌入和正图像嵌入拼接在一起
                    single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
    
                # 将图像嵌入移动到指定设备
                single_image_embeds = single_image_embeds.to(device=device)
                # 将处理后的图像嵌入添加到最终列表
                ip_adapter_image_embeds.append(single_image_embeds)
    
            # 返回适配器图像嵌入
            return ip_adapter_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl.StableDiffusionXLPipeline.encode_prompt 复制的代码
    # 定义一个编码提示的函数,接受多个参数来生成图像
    def encode_prompt(
        self,
        # 主提示字符串
        prompt: str,
        # 可选的第二个提示字符串
        prompt_2: Optional[str] = None,
        # 可选的设备参数,指定计算的设备
        device: Optional[torch.device] = None,
        # 每个提示生成的图像数量,默认为1
        num_images_per_prompt: int = 1,
        # 是否进行分类器自由引导,默认为True
        do_classifier_free_guidance: bool = True,
        # 可选的负面提示字符串
        negative_prompt: Optional[str] = None,
        # 可选的第二个负面提示字符串
        negative_prompt_2: Optional[str] = None,
        # 可选的提示嵌入,预先计算的提示向量
        prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负面提示嵌入,预先计算的负向量
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的池化提示嵌入,预先计算的池化向量
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的池化负面提示嵌入,预先计算的负池化向量
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的Lora缩放因子
        lora_scale: Optional[float] = None,
        # 可选的跳过剪辑参数
        clip_skip: Optional[int] = None,
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的
    # 准备额外的步骤关键字参数,适用于调度器步骤,因为并非所有调度器都有相同的签名
    def prepare_extra_step_kwargs(self, generator, eta):
        # eta (η) 仅用于 DDIMScheduler,对于其他调度器将被忽略
        # eta 在 DDIM 论文中的对应值应在 [0, 1] 范围内
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外步骤参数的字典
        extra_step_kwargs = {}
        if accepts_eta:
            # 如果调度器接受eta参数,则将其添加到字典中
            extra_step_kwargs["eta"] = eta

        # 检查调度器是否接受生成器
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:
            # 如果调度器接受生成器参数,则将其添加到字典中
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外步骤参数
        return extra_step_kwargs

    # 定义检查输入的函数,用于验证各种输入参数的有效性
    def check_inputs(
        self,
        # 主提示字符串
        prompt,
        # 可选的第二个提示字符串
        prompt_2,
        # 输入图像
        image,
        # 输入掩码图像
        mask_image,
        # 图像高度
        height,
        # 图像宽度
        width,
        # 强度参数
        strength,
        # 回调步骤
        callback_steps,
        # 输出类型
        output_type,
        # 可选的负面提示字符串
        negative_prompt=None,
        # 可选的第二个负面提示字符串
        negative_prompt_2=None,
        # 可选的提示嵌入
        prompt_embeds=None,
        # 可选的负面提示嵌入
        negative_prompt_embeds=None,
        # 可选的适配器图像
        ip_adapter_image=None,
        # 可选的适配器图像嵌入
        ip_adapter_image_embeds=None,
        # 可选的在步骤结束时回调的张量输入
        callback_on_step_end_tensor_inputs=None,
        # 可选的填充掩码裁剪参数
        padding_mask_crop=None,
    # 定义准备潜在变量的函数,用于生成潜在表示
    def prepare_latents(
        # 批大小
        batch_size,
        # 潜在通道数
        num_channels_latents,
        # 图像高度
        height,
        # 图像宽度
        width,
        # 数据类型
        dtype,
        # 设备参数
        device,
        # 随机数生成器
        generator,
        # 可选的潜在变量
        latents=None,
        # 可选的输入图像
        image=None,
        # 时间步
        timestep=None,
        # 是否为最大强度,默认为True
        is_strength_max=True,
        # 是否添加噪声,默认为True
        add_noise=True,
        # 是否返回噪声,默认为False
        return_noise=False,
        # 是否返回图像潜在表示,默认为False
        return_image_latents=False,
    ):
        # 定义输出张量的形状,包含批量大小、通道数、以及缩放后的高度和宽度
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器是否为列表,并且其长度是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            # 抛出值错误,提示生成器长度与请求的批量大小不符
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 检查图像或时间步是否为 None,且强度不为最大值
        if (image is None or timestep is None) and not is_strength_max:
            # 抛出值错误,提示图像或噪声时间步未提供
            raise ValueError(
                "Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
                "However, either the image or the noise timestep has not been provided."
            )

        # 检查图像的通道数是否为 4
        if image.shape[1] == 4:
            # 将图像转换为指定设备和数据类型的张量
            image_latents = image.to(device=device, dtype=dtype)
            # 重复图像张量以匹配批量大小
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)
        # 如果需要返回图像潜变量或潜变量为 None 且强度不为最大值
        elif return_image_latents or (latents is None and not is_strength_max):
            # 将图像转换为指定设备和数据类型的张量
            image = image.to(device=device, dtype=dtype)
            # 编码图像为 VAE 潜变量
            image_latents = self._encode_vae_image(image=image, generator=generator)
            # 重复图像潜变量以匹配批量大小
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)

        # 如果潜变量为 None 且需要添加噪声
        if latents is None and add_noise:
            # 生成随机噪声张量
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 如果强度为 1,初始化潜变量为噪声,否则初始化为图像和噪声的组合
            latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
            # 如果强度为最大,则按调度器的初始化 sigma 缩放潜变量
            latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
        # 如果需要添加噪声但潜变量不为 None
        elif add_noise:
            # 将潜变量转换为指定设备的张量
            noise = latents.to(device)
            # 按调度器的初始化 sigma 缩放潜变量
            latents = noise * self.scheduler.init_noise_sigma
        # 如果不需要添加噪声
        else:
            # 生成随机噪声张量
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 将图像潜变量转换为指定设备的张量
            latents = image_latents.to(device)

        # 创建输出元组,初始包含潜变量
        outputs = (latents,)

        # 如果需要返回噪声,将其添加到输出元组中
        if return_noise:
            outputs += (noise,)

        # 如果需要返回图像潜变量,将其添加到输出元组中
        if return_image_latents:
            outputs += (image_latents,)

        # 返回最终的输出元组
        return outputs
    # 定义一个编码 VAE 图像的私有方法,接受图像和生成器作为参数
    def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
        # 获取输入图像的数值类型
        dtype = image.dtype
        # 如果配置强制提升类型,则将图像转换为浮点型,并将 VAE 转为浮点32类型
        if self.vae.config.force_upcast:
            image = image.float()
            self.vae.to(dtype=torch.float32)
    
        # 检查生成器是否为列表
        if isinstance(generator, list):
            # 对于每个图像,编码并检索其潜变量
            image_latents = [
                retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
                for i in range(image.shape[0])
            ]
            # 将所有潜变量在第0维度上连接成一个张量
            image_latents = torch.cat(image_latents, dim=0)
        else:
            # 如果生成器不是列表,直接编码图像并检索潜变量
            image_latents = retrieve_latents(self.vae.encode(image), generator=generator)
    
        # 如果配置强制提升类型,则将 VAE 恢复到原来的数值类型
        if self.vae.config.force_upcast:
            self.vae.to(dtype)
    
        # 将潜变量转换为原来的数据类型
        image_latents = image_latents.to(dtype)
        # 根据配置的缩放因子调整潜变量
        image_latents = self.vae.config.scaling_factor * image_latents
    
        # 返回处理后的潜变量
        return image_latents
    
    # 定义一个准备掩码潜变量的公共方法,接受多个参数
    def prepare_mask_latents(
        self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
    ):
        # 将掩膜调整为与潜在变量形状一致,以便将掩膜与潜在变量拼接
        # 在转换数据类型之前执行此操作,以避免在使用 cpu_offload 和半精度时出现问题
        mask = torch.nn.functional.interpolate(
            # 调整掩膜大小,使其与潜在变量匹配,缩放因子由 self.vae_scale_factor 控制
            mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
        )
        # 将掩膜移动到指定设备并设置数据类型
        mask = mask.to(device=device, dtype=dtype)

        # 为每个提示生成重复掩膜和被掩膜图像潜在变量,使用适合 mps 的方法
        if mask.shape[0] < batch_size:
            # 检查掩膜数量是否能够整除批量大小
            if not batch_size % mask.shape[0] == 0:
                # 如果不匹配,抛出错误提示
                raise ValueError(
                    "The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"
                    f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"
                    " of masks that you pass is divisible by the total requested batch size."
                )
            # 通过重复掩膜以匹配批量大小
            mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)

        # 如果进行无分类器引导,则将掩膜复制两次;否则保持不变
        mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask

        # 检查被掩膜图像是否存在且其通道数为4
        if masked_image is not None and masked_image.shape[1] == 4:
            # 如果是,则直接将其赋值给 masked_image_latents
            masked_image_latents = masked_image
        else:
            # 否则初始化为 None
            masked_image_latents = None

        # 如果被掩膜图像存在
        if masked_image is not None:
            # 如果潜在变量为 None,则编码被掩膜图像
            if masked_image_latents is None:
                # 将被掩膜图像移动到指定设备并设置数据类型
                masked_image = masked_image.to(device=device, dtype=dtype)
                # 使用 VAE 编码器将被掩膜图像编码为潜在变量
                masked_image_latents = self._encode_vae_image(masked_image, generator=generator)

            # 检查编码后的潜在变量数量是否能够整除批量大小
            if masked_image_latents.shape[0] < batch_size:
                if not batch_size % masked_image_latents.shape[0] == 0:
                    # 如果不匹配,抛出错误提示
                    raise ValueError(
                        "The passed images and the required batch size don't match. Images are supposed to be duplicated"
                        f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."
                        " Make sure the number of images that you pass is divisible by the total requested batch size."
                    )
                # 通过重复潜在变量以匹配批量大小
                masked_image_latents = masked_image_latents.repeat(
                    batch_size // masked_image_latents.shape[0], 1, 1, 1
                )

            # 如果进行无分类器引导,则将潜在变量复制两次;否则保持不变
            masked_image_latents = (
                torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
            )

            # 将潜在变量移动到指定设备并设置数据类型,以防在与潜在模型输入拼接时出现设备错误
            masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)

        # 返回处理后的掩膜和被掩膜图像的潜在变量
        return mask, masked_image_latents

    # 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline.get_timesteps 复制
    # 获取时间步长的函数,计算推理步骤、强度和设备相关的时间步长
        def get_timesteps(self, num_inference_steps, strength, device, denoising_start=None):
            # 如果没有指定去噪开始时间,则计算初始时间步
            if denoising_start is None:
                # 根据强度和推理步骤计算初始时间步,确保不超过总推理步骤
                init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
                # 计算开始时间步,确保不小于0
                t_start = max(num_inference_steps - init_timestep, 0)
            else:
                # 如果指定了去噪开始时间,开始时间步为0
                t_start = 0
    
            # 根据开始时间步和调度器的顺序获取时间步长
            timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
    
            # 如果指定了去噪开始时间,则强度不再重要,直接使用去噪开始时间
            if denoising_start is not None:
                # 计算离散时间步截止点
                discrete_timestep_cutoff = int(
                    round(
                        self.scheduler.config.num_train_timesteps
                        - (denoising_start * self.scheduler.config.num_train_timesteps)
                    )
                )
    
                # 计算有效的推理步骤数量
                num_inference_steps = (timesteps < discrete_timestep_cutoff).sum().item()
                # 如果调度器是二阶调度器且推理步骤为偶数,则需要加1以确保正确性
                if self.scheduler.order == 2 and num_inference_steps % 2 == 0:
                    # 添加1以确保去噪过程在调度器的二阶导数步骤之后结束
                    num_inference_steps = num_inference_steps + 1
    
                # 从末尾切片获取推理步骤的时间步
                timesteps = timesteps[-num_inference_steps:]
                # 返回时间步和推理步骤数量
                return timesteps, num_inference_steps
    
            # 如果没有去噪开始时间,返回时间步和调整后的推理步骤数量
            return timesteps, num_inference_steps - t_start
    
        # 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline._get_add_time_ids 复制的函数
        def _get_add_time_ids(
            self,
            original_size,
            crops_coords_top_left,
            target_size,
            aesthetic_score,
            negative_aesthetic_score,
            negative_original_size,
            negative_crops_coords_top_left,
            negative_target_size,
            dtype,
            text_encoder_projection_dim=None,
    ):
        # 检查配置是否需要美学评分
        if self.config.requires_aesthetics_score:
            # 创建包含原始尺寸、裁剪坐标左上角和美学评分的列表
            add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
            # 创建负样本的原始尺寸、裁剪坐标左上角和负美学评分的列表
            add_neg_time_ids = list(
                negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
            )
        else:
            # 创建包含原始尺寸、裁剪坐标左上角和目标尺寸的列表
            add_time_ids = list(original_size + crops_coords_top_left + target_size)
            # 创建负样本的原始尺寸、裁剪坐标左上角和负目标尺寸的列表
            add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)

        # 计算传入的附加嵌入维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取模型期望的附加嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查期望的附加嵌入维度是否大于实际传入的维度
        if (
            expected_add_embed_dim > passed_add_embed_dim
            and (expected_add_embed_dim - passed_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出错误,提示需要启用美学评分
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to enable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=True)` to make sure `aesthetic_score` {aesthetic_score} and `negative_aesthetic_score` {negative_aesthetic_score} is correctly used by the model."
            )
        # 检查期望的附加嵌入维度是否小于实际传入的维度
        elif (
            expected_add_embed_dim < passed_add_embed_dim
            and (passed_add_embed_dim - expected_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出错误,提示需要禁用美学评分
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to disable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=False)` to make sure `target_size` {target_size} is correctly used by the model."
            )
        # 检查传入的附加嵌入维度是否与期望的不同
        elif expected_add_embed_dim != passed_add_embed_dim:
            # 抛出错误,提示模型配置不正确
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将 add_time_ids 转换为张量,并指定数据类型
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 将 add_neg_time_ids 转换为张量,并指定数据类型
        add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)

        # 返回添加的时间 ID 和负样本的时间 ID
        return add_time_ids, add_neg_time_ids

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.StableDiffusionUpscalePipeline.upcast_vae 复制的代码
    # 定义一个方法来将 VAE 模型的类型上升到指定的浮点数精度
        def upcast_vae(self):
            # 获取当前 VAE 的数据类型
            dtype = self.vae.dtype
            # 将 VAE 转换为浮点32类型
            self.vae.to(dtype=torch.float32)
            # 检查 VAE 解码器的中间块注意力处理器是否为指定的类型
            use_torch_2_0_or_xformers = isinstance(
                self.vae.decoder.mid_block.attentions[0].processor,
                (
                    AttnProcessor2_0,
                    XFormersAttnProcessor,
                ),
            )
            # 如果使用的是 xformers 或 torch_2_0,则注意力块不需要为浮点32,这可以节省大量内存
            if use_torch_2_0_or_xformers:
                # 将后量化卷积转换为当前数据类型
                self.vae.post_quant_conv.to(dtype)
                # 将解码器的输入卷积转换为当前数据类型
                self.vae.decoder.conv_in.to(dtype)
                # 将解码器的中间块转换为当前数据类型
                self.vae.decoder.mid_block.to(dtype)
    
        # 从 LatentConsistencyModelPipeline 中复制的方法,用于获取引导缩放嵌入
        def get_guidance_scale_embedding(
            self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
        ) -> torch.Tensor:
            """
            参考链接:模型的指导缩放嵌入生成方法。
    
            参数:
                w (`torch.Tensor`):
                    生成带有指定指导缩放的嵌入向量,以随后丰富时间步嵌入。
                embedding_dim (`int`, *可选*, 默认为 512):
                    要生成的嵌入维度。
                dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
                    生成嵌入的数据类型。
    
            返回:
                `torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
            """
            # 确保输入张量是一个一维向量
            assert len(w.shape) == 1
            # 将 w 的值乘以 1000.0 以增强缩放
            w = w * 1000.0
    
            # 计算嵌入的一半维度
            half_dim = embedding_dim // 2
            # 计算对数的常数值
            emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
            # 生成指数衰减的嵌入
            emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
            # 生成嵌入的最终形状,通过广播机制结合 w
            emb = w.to(dtype)[:, None] * emb[None, :]
            # 将正弦和余弦嵌入合并到一起
            emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
            # 如果嵌入维度为奇数,则进行零填充
            if embedding_dim % 2 == 1:  # zero pad
                emb = torch.nn.functional.pad(emb, (0, 1))
            # 确保生成的嵌入形状符合预期
            assert emb.shape == (w.shape[0], embedding_dim)
            # 返回最终生成的嵌入
            return emb
    
        # 返回当前的指导缩放值
        @property
        def guidance_scale(self):
            return self._guidance_scale
    
        # 返回当前的指导重缩放值
        @property
        def guidance_rescale(self):
            return self._guidance_rescale
    
        # 返回当前的跳过剪辑值
        @property
        def clip_skip(self):
            return self._clip_skip
    
        # 定义不使用分类器自由指导的条件
        @property
        def do_classifier_free_guidance(self):
            return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
    
        # 返回交叉注意力的额外参数
        @property
        def cross_attention_kwargs(self):
            return self._cross_attention_kwargs
    
        # 返回去噪结束的值
        @property
        def denoising_end(self):
            return self._denoising_end
    
        # 这里省略了剩余的属性定义
    # 定义去噪的起始方法
        def denoising_start(self):
            # 返回去噪的起始值
            return self._denoising_start
    
        # 定义 num_timesteps 属性的获取方法
        @property
        def num_timesteps(self):
            # 返回时间步数
            return self._num_timesteps
    
        # 定义 interrupt 属性的获取方法
        @property
        def interrupt(self):
            # 返回中断状态
            return self._interrupt
    
        # 关闭梯度计算以提高性能,并替换示例文档字符串
        @torch.no_grad()
        @replace_example_docstring(EXAMPLE_DOC_STRING)
        # 定义可调用方法,接受多种输入参数
        def __call__(
            # 输入提示文本,支持字符串或字符串列表
            self,
            prompt: Union[str, List[str]] = None,
            # 第二个提示文本,支持字符串或字符串列表
            prompt_2: Optional[Union[str, List[str]]] = None,
            # 输入图像
            image: PipelineImageInput = None,
            # 输入掩码图像
            mask_image: PipelineImageInput = None,
            # 掩码图像的潜在张量
            masked_image_latents: torch.Tensor = None,
            # 图像高度
            height: Optional[int] = None,
            # 图像宽度
            width: Optional[int] = None,
            # 填充掩码裁剪参数
            padding_mask_crop: Optional[int] = None,
            # 强度参数
            strength: float = 0.9999,
            # 推理步骤数量
            num_inference_steps: int = 50,
            # 时间步列表
            timesteps: List[int] = None,
            # sigma值列表
            sigmas: List[float] = None,
            # 去噪开始值
            denoising_start: Optional[float] = None,
            # 去噪结束值
            denoising_end: Optional[float] = None,
            # 引导比例
            guidance_scale: float = 7.5,
            # 负提示文本,支持字符串或字符串列表
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 第二个负提示文本,支持字符串或字符串列表
            negative_prompt_2: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量
            num_images_per_prompt: Optional[int] = 1,
            # eta值
            eta: float = 0.0,
            # 随机数生成器
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 潜在张量
            latents: Optional[torch.Tensor] = None,
            # 提示嵌入
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 池化后的提示嵌入
            pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 负池化提示嵌入
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 图像适配器输入
            ip_adapter_image: Optional[PipelineImageInput] = None,
            # 图像适配器嵌入
            ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
            # 输出类型,默认为"pil"
            output_type: Optional[str] = "pil",
            # 是否返回字典格式
            return_dict: bool = True,
            # 交叉注意力的额外参数
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 引导重缩放参数
            guidance_rescale: float = 0.0,
            # 原始图像尺寸
            original_size: Tuple[int, int] = None,
            # 裁剪坐标,默认为(0, 0)
            crops_coords_top_left: Tuple[int, int] = (0, 0),
            # 目标图像尺寸
            target_size: Tuple[int, int] = None,
            # 负样本的原始尺寸
            negative_original_size: Optional[Tuple[int, int]] = None,
            # 负样本裁剪坐标,默认为(0, 0)
            negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
            # 负样本目标尺寸
            negative_target_size: Optional[Tuple[int, int]] = None,
            # 美学评分
            aesthetic_score: float = 6.0,
            # 负样本美学评分
            negative_aesthetic_score: float = 2.5,
            # 跳过的剪辑层
            clip_skip: Optional[int] = None,
            # 每步结束时的回调
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 回调时的张量输入列表
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 其他额外参数
            **kwargs,