diffusers-源码解析-二十二-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(二十二)

.\diffusers\pipelines\controlnet\pipeline_controlnet_inpaint.py

# 版权声明,表明版权归 HuggingFace 团队所有
# 
# 根据 Apache License, Version 2.0 许可协议进行许可;
# 除非符合许可协议,否则不可使用此文件。
# 可以在以下地址获取许可协议:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件按 "原样" 提供,
# 不提供任何形式的保证或条件,明示或暗示。
# 请参阅许可协议以获取特定语言的权限和
# 限制条款。

# 此模型实现受 https://github.com/haofanwang/ControlNet-for-Diffusers/ 启发

import inspect  # 导入 inspect 模块,用于获取对象信息
from typing import Any, Callable, Dict, List, Optional, Tuple, Union  # 导入类型提示工具

import numpy as np  # 导入 numpy,用于数值计算
import PIL.Image  # 导入 PIL.Image,用于图像处理
import torch  # 导入 PyTorch 库
import torch.nn.functional as F  # 导入 PyTorch 中的功能性神经网络模块
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection  # 导入 Transformers 库中的 CLIP 相关组件

from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 从回调模块导入多管道回调类
from ...image_processor import PipelineImageInput, VaeImageProcessor  # 从图像处理模块导入图像输入和 VAE 图像处理器
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 从加载器模块导入混合类
from ...models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel  # 从模型模块导入各种模型
from ...models.lora import adjust_lora_scale_text_encoder  # 从 LoRA 模块导入调整文本编码器的函数
from ...schedulers import KarrasDiffusionSchedulers  # 从调度器模块导入 Karras 采样调度器
from ...utils import (  # 从工具模块导入各种实用函数
    USE_PEFT_BACKEND,  # 使用 PEFT 后端的标志
    deprecate,  # 警告使用过时功能的函数
    logging,  # 日志记录模块
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 缩放 LoRA 层的函数
    unscale_lora_layers,  # 还原 LoRA 层缩放的函数
)
from ...utils.torch_utils import is_compiled_module, randn_tensor  # 从 PyTorch 工具模块导入相关功能
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 从管道工具模块导入扩散管道和稳定扩散混合类
from ..stable_diffusion import StableDiffusionPipelineOutput  # 从稳定扩散模块导入管道输出类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker  # 从稳定扩散安全检查模块导入安全检查器
from .multicontrolnet import MultiControlNetModel  # 从多控制网络模块导入多控制网络模型

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器实例,禁用 pylint 检查

EXAMPLE_DOC_STRING = """  # 定义一个多行字符串变量 EXAMPLE_DOC_STRING
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
```py  # 多行字符串结束符
```  # 多行字符串结束符
    # 示例代码片段,展示如何使用相关库生成图像
        Examples:
            ```py
            >>> # 安装所需的库,transformers 和 accelerate
            >>> from diffusers import StableDiffusionControlNetInpaintPipeline, ControlNetModel, DDIMScheduler
            # 从 diffusers 库导入相关类,用于图像处理和生成
            >>> from diffusers.utils import load_image
            # 从 diffusers.utils 导入 load_image 函数,用于加载图像
            >>> import numpy as np
            # 导入 numpy 库,用于数组操作
            >>> import torch
            # 导入 PyTorch 库,用于深度学习
    
            >>> init_image = load_image(
            ...     "https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_inpaint/boy.png"
            ... )
            # 加载初始图像并存储在 init_image 变量中
            >>> init_image = init_image.resize((512, 512))
            # 将初始图像调整为 512x512 的尺寸
    
            >>> generator = torch.Generator(device="cpu").manual_seed(1)
            # 创建一个 CPU 上的随机数生成器,并设置种子为 1
    
            >>> mask_image = load_image(
            ...     "https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_inpaint/boy_mask.png"
            ... )
            # 加载掩模图像并存储在 mask_image 变量中
            >>> mask_image = mask_image.resize((512, 512))
            # 将掩模图像调整为 512x512 的尺寸
    
            >>> def make_canny_condition(image):
            ...     image = np.array(image)
            # 将输入图像转换为 numpy 数组
            ...     image = cv2.Canny(image, 100, 200)
            # 使用 Canny 算法进行边缘检测
            ...     image = image[:, :, None]
            # 在数组最后添加一个新维度,使其适应后续操作
            ...     image = np.concatenate([image, image, image], axis=2)
            # 将边缘检测结果复制到三个通道,生成三通道图像
            ...     image = Image.fromarray(image)
            # 将 numpy 数组转换回图像格式
            ...     return image
            # 返回处理后的图像
    
            >>> control_image = make_canny_condition(init_image)
            # 对初始图像应用 Canny 边缘检测,生成控制图像
    
            >>> controlnet = ControlNetModel.from_pretrained(
            ...     "lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16
            ... )
            # 从预训练模型加载 ControlNetModel,并设置数据类型为 float16
            >>> pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
            ...     "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16
            ... )
            # 创建一个用于图像生成的管道,使用预训练的 Stable Diffusion 模型和 ControlNet
    
            >>> pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config)
            # 将管道的调度器设置为从配置中加载的 DDIMScheduler
            >>> pipe.enable_model_cpu_offload()
            # 启用模型的 CPU 离线加载,以节省内存
    
            >>> # 生成图像
            >>> image = pipe(
            ...     "a handsome man with ray-ban sunglasses",
            ...     num_inference_steps=20,
            ...     generator=generator,
            ...     eta=1.0,
            ...     image=init_image,
            ...     mask_image=mask_image,
            ...     control_image=control_image,
            ... ).images[0]
            # 使用管道生成一幅图像,传入描述、推理步骤、随机生成器、初始图像、掩模和控制图像
            ``` 
"""
# 文档字符串,通常用于描述模块、类或方法的功能
# 这里没有具体内容,可能是留作注释或文档

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
    # 定义函数接收一个张量类型的编码器输出和可选的随机数生成器
    encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
    # 检查 encoder_output 是否具有 latent_dist 属性并且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从 latent_dist 中采样并返回结果
        return encoder_output.latent_dist.sample(generator)
    # 检查 encoder_output 是否具有 latent_dist 属性并且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回 latent_dist 的模式值
        return encoder_output.latent_dist.mode()
    # 检查 encoder_output 是否具有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 返回 latents 属性的值
        return encoder_output.latents
    # 如果以上条件都不满足,则抛出异常
    else:
        raise AttributeError("Could not access latents of provided encoder_output")


# 定义一个图像修复管道类,使用带有 ControlNet 指导的 Stable Diffusion
class StableDiffusionControlNetInpaintPipeline(
    # 继承自 DiffusionPipeline 和其他多个混合类
    DiffusionPipeline,
    StableDiffusionMixin,
    TextualInversionLoaderMixin,
    StableDiffusionLoraLoaderMixin,
    IPAdapterMixin,
    FromSingleFileMixin,
):
    # 文档字符串,描述该管道的功能
    r"""
    使用 ControlNet 指导的 Stable Diffusion 进行图像修复的管道。

    该模型继承自 [`DiffusionPipeline`]。请查看超类文档,以获取所有管道实现的通用方法
    (下载、保存、在特定设备上运行等)。

    该管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
        - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器

    <Tip>

    该管道可以与专门为修复微调的检查点一起使用
    ([runwayml/stable-diffusion-inpainting](https://huggingface.co/runwayml/stable-diffusion-inpainting))以及
    默认的文本到图像 Stable Diffusion 检查点
    ([runwayml/stable-diffusion-v1-5](https://huggingface.co/runwayml/stable-diffusion-v1-5))。 默认的文本到图像
    Stable Diffusion 检查点可能更适合已经在这些检查点上微调的 ControlNet,例如
    [lllyasviel/control_v11p_sd15_inpaint](https://huggingface.co/lllyasviel/control_v11p_sd15_inpaint)。

    </Tip>
    # 参数说明
    Args:
        vae ([`AutoencoderKL`]):
            # 变分自编码器(VAE)模型,用于对图像进行编码和解码,转换为潜在表示。
        text_encoder ([`~transformers.CLIPTextModel`]):
            # 冻结的文本编码器,使用 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)。
        tokenizer ([`~transformers.CLIPTokenizer`]):
            # 用于对文本进行分词的 `CLIPTokenizer`。
        unet ([`UNet2DConditionModel`]):
            # 用于对编码的图像潜在空间进行去噪的 `UNet2DConditionModel`。
        controlnet ([`ControlNetModel`] 或 `List[ControlNetModel]`):
            # 在去噪过程中为 `unet` 提供额外的条件。如果设置多个 ControlNet 作为列表,则每个 ControlNet 的输出会相加,创建一个组合的额外条件。
        scheduler ([`SchedulerMixin`]):
            # 用于与 `unet` 结合,去噪编码图像潜在空间的调度器。可以是 [`DDIMScheduler`]、[`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
        safety_checker ([`StableDiffusionSafetyChecker`]):
            # 分类模块,用于估计生成的图像是否可能被认为是冒犯性或有害的。
            # 有关模型潜在危害的更多详细信息,请参考 [模型卡片](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
        feature_extractor ([`~transformers.CLIPImageProcessor`]):
            # `CLIPImageProcessor` 用于从生成的图像中提取特征;作为 `safety_checker` 的输入。
    """

    # 定义模型的 CPU 离线加载顺序
    model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
    # 可选组件列表,包含安全检查器、特征提取器和图像编码器
    _optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
    # 从 CPU 离线加载中排除的组件
    _exclude_from_cpu_offload = ["safety_checker"]
    # 回调张量输入的列表
    _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]

    # 构造函数初始化
    def __init__(
        # 变分自编码器实例
        vae: AutoencoderKL,
        # 文本编码器实例
        text_encoder: CLIPTextModel,
        # 分词器实例
        tokenizer: CLIPTokenizer,
        # UNet 模型实例
        unet: UNet2DConditionModel,
        # ControlNet 模型或模型列表
        controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel],
        # 调度器实例
        scheduler: KarrasDiffusionSchedulers,
        # 安全检查器实例
        safety_checker: StableDiffusionSafetyChecker,
        # 特征提取器实例
        feature_extractor: CLIPImageProcessor,
        # 可选的图像编码器实例,默认为 None
        image_encoder: CLIPVisionModelWithProjection = None,
        # 是否需要安全检查器,默认为 True
        requires_safety_checker: bool = True,
    # 结束函数定义,初始化父类
    ):
        super().__init__()

        # 检查安全检查器是否为 None,且需要安全检查器的情况下发出警告
        if safety_checker is None and requires_safety_checker:
            logger.warning(
                # 日志警告信息,提醒用户禁用安全检查器的后果
                f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
                " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
                " results in services or applications open to the public. Both the diffusers team and Hugging Face"
                " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
                " it only for use-cases that involve analyzing network behavior or auditing its results. For more"
                " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
            )

        # 检查安全检查器不为 None 但特征提取器为 None 时引发错误
        if safety_checker is not None and feature_extractor is None:
            raise ValueError(
                # 报错信息,提示用户需要定义特征提取器
                "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
                " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
            )

        # 如果 controlnet 是列表或元组,则将其转换为 MultiControlNetModel
        if isinstance(controlnet, (list, tuple)):
            controlnet = MultiControlNetModel(controlnet)

        # 注册模块,传入各种组件
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            controlnet=controlnet,
            scheduler=scheduler,
            safety_checker=safety_checker,
            feature_extractor=feature_extractor,
            image_encoder=image_encoder,
        )
        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建用于图像处理的 VAE 图像处理器
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
        # 创建用于掩码处理的 VAE 图像处理器,设置不同的处理选项
        self.mask_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_normalize=False, do_binarize=True, do_convert_grayscale=True
        )
        # 创建用于控制图像处理的 VAE 图像处理器
        self.control_image_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
        )
        # 注册配置,记录是否需要安全检查器
        self.register_to_config(requires_safety_checker=requires_safety_checker)

    # 从 StableDiffusionPipeline 复制的编码提示的方法
    def _encode_prompt(
        self,
        prompt,
        device,
        num_images_per_prompt,
        do_classifier_free_guidance,
        negative_prompt=None,
        prompt_embeds: Optional[torch.Tensor] = None,
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        lora_scale: Optional[float] = None,
        **kwargs,
    # 结束函数参数列表
        ):
            # 定义弃用消息,告知用户该函数将来会被移除
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            # 调用弃用警告函数,标记该方法为过时
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用 encode_prompt 函数,获取提示的嵌入元组
            prompt_embeds_tuple = self.encode_prompt(
                # 输入提示文本
                prompt=prompt,
                # 指定设备
                device=device,
                # 每个提示生成的图像数量
                num_images_per_prompt=num_images_per_prompt,
                # 是否使用无分类器自由引导
                do_classifier_free_guidance=do_classifier_free_guidance,
                # 负面提示文本
                negative_prompt=negative_prompt,
                # 提示嵌入,若有
                prompt_embeds=prompt_embeds,
                # 负面提示嵌入,若有
                negative_prompt_embeds=negative_prompt_embeds,
                # Lora 缩放因子,若有
                lora_scale=lora_scale,
                # 其他关键字参数
                **kwargs,
            )
    
            # 连接提示嵌入元组中的元素,以便向后兼容
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回连接后的提示嵌入
            return prompt_embeds
    
        # 从 diffusers 库中复制的 encode_prompt 方法
        def encode_prompt(
            # 输入参数列表
            self,
            prompt,
            device,
            num_images_per_prompt,
            do_classifier_free_guidance,
            negative_prompt=None,
            prompt_embeds: Optional[torch.Tensor] = None,
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            lora_scale: Optional[float] = None,
            clip_skip: Optional[int] = None,
        # 从 diffusers 库中复制的 encode_image 方法
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 如果输入图像不是张量,则通过特征提取器转换为张量
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像移动到指定设备并设置数据类型
            image = image.to(device=device, dtype=dtype)
            # 如果需要输出隐藏状态
            if output_hidden_states:
                # 获取图像的隐藏状态,并按每个提示图像的数量重复
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 获取无条件图像的隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回图像和无条件图像的隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 获取图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 按每个提示图像的数量重复图像嵌入
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建与图像嵌入相同形状的全零张量作为无条件图像嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回图像嵌入和无条件图像嵌入
                return image_embeds, uncond_image_embeds
    
        # 从 diffusers 库中复制的 prepare_ip_adapter_image_embeds 方法
    # 准备图像嵌入以供 IP 适配器使用
        def prepare_ip_adapter_image_embeds(
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
        ):
            # 初始化图像嵌入列表
            image_embeds = []
            # 如果启用无分类器自由引导,则初始化负图像嵌入列表
            if do_classifier_free_guidance:
                negative_image_embeds = []
            # 如果没有提供图像嵌入,则处理给定的 IP 适配器图像
            if ip_adapter_image_embeds is None:
                # 如果给定的图像不是列表,则将其转换为列表
                if not isinstance(ip_adapter_image, list):
                    ip_adapter_image = [ip_adapter_image]
                # 检查给定图像数量与 IP 适配器层数量是否匹配
                if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                    raise ValueError(
                        f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                    )
                # 遍历每个 IP 适配器图像和对应的图像投影层
                for single_ip_adapter_image, image_proj_layer in zip(
                    ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
                ):
                    # 判断是否需要输出隐藏状态
                    output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                    # 对单个图像进行编码,获取图像嵌入和负图像嵌入
                    single_image_embeds, single_negative_image_embeds = self.encode_image(
                        single_ip_adapter_image, device, 1, output_hidden_state
                    )
                    # 将单个图像嵌入添加到列表中
                    image_embeds.append(single_image_embeds[None, :])
                    # 如果启用无分类器自由引导,将负图像嵌入添加到列表中
                    if do_classifier_free_guidance:
                        negative_image_embeds.append(single_negative_image_embeds[None, :])
            else:
                # 遍历提供的图像嵌入
                for single_image_embeds in ip_adapter_image_embeds:
                    # 如果启用无分类器自由引导,将图像嵌入拆分为负和正图像嵌入
                    if do_classifier_free_guidance:
                        single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                        negative_image_embeds.append(single_negative_image_embeds)
                    # 将图像嵌入添加到列表中
                    image_embeds.append(single_image_embeds)
            # 初始化最终图像嵌入列表
            ip_adapter_image_embeds = []
            # 遍历每个图像嵌入
            for i, single_image_embeds in enumerate(image_embeds):
                # 重复图像嵌入以满足每个提示的图像数量
                single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
                # 如果启用无分类器自由引导,重复负图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                    # 将负图像嵌入与正图像嵌入合并
                    single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
                # 将图像嵌入移动到指定设备
                single_image_embeds = single_image_embeds.to(device=device)
                # 将处理后的图像嵌入添加到最终列表中
                ip_adapter_image_embeds.append(single_image_embeds)
            # 返回最终的图像嵌入列表
            return ip_adapter_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
    # 运行安全检查器,检测图像是否包含不适宜内容
    def run_safety_checker(self, image, device, dtype):
        # 如果安全检查器未定义,则设置 NSFW 概念为 None
        if self.safety_checker is None:
            has_nsfw_concept = None
        else:
            # 如果输入是张量格式,进行图像处理,转为 PIL 格式
            if torch.is_tensor(image):
                feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
            else:
                # 如果输入是 numpy 格式,转为 PIL 格式
                feature_extractor_input = self.image_processor.numpy_to_pil(image)
            # 使用特征提取器处理图像,并将结果转移到指定设备
            safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
            # 使用安全检查器检查图像,并返回处理后的图像和 NSFW 概念
            image, has_nsfw_concept = self.safety_checker(
                images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
            )
        # 返回处理后的图像及是否含有 NSFW 概念
        return image, has_nsfw_concept
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
        # 解码潜在变量,生成对应的图像
        def decode_latents(self, latents):
            # 显示解码方法已弃用的警告信息
            deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
            deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
            # 根据 VAE 配置的缩放因子调整潜在变量
            latents = 1 / self.vae.config.scaling_factor * latents
            # 解码潜在变量生成图像
            image = self.vae.decode(latents, return_dict=False)[0]
            # 将图像数据归一化到 [0, 1] 范围内
            image = (image / 2 + 0.5).clamp(0, 1)
            # 将图像转换为 float32 格式,以便与 bfloat16 兼容
            image = image.cpu().permute(0, 2, 3, 1).float().numpy()
            # 返回处理后的图像
            return image
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
        # 准备额外的参数以供调度器步骤使用
        def prepare_extra_step_kwargs(self, generator, eta):
            # 为调度器步骤准备额外的参数,因为不同调度器的签名可能不同
            # eta (η) 仅在 DDIMScheduler 中使用,其他调度器会忽略
            # eta 在 DDIM 论文中对应于 η,应在 [0, 1] 范围内
    
            # 检查调度器是否接受 eta 参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            extra_step_kwargs = {}
            if accepts_eta:
                # 如果接受,则将 eta 添加到额外参数中
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器是否接受 generator 参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            if accepts_generator:
                # 如果接受,则将 generator 添加到额外参数中
                extra_step_kwargs["generator"] = generator
            # 返回准备好的额外参数
            return extra_step_kwargs
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.get_timesteps 复制
    # 定义获取时间步长的方法,接受推理步骤数、强度和设备作为参数
    def get_timesteps(self, num_inference_steps, strength, device):
        # 计算初始时间步,确保不超过总推理步骤
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

        # 计算开始时间步,确保不小于零
        t_start = max(num_inference_steps - init_timestep, 0)
        # 从调度器中提取相关时间步
        timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
        # 如果调度器有设置开始索引的方法,则调用该方法
        if hasattr(self.scheduler, "set_begin_index"):
            self.scheduler.set_begin_index(t_start * self.scheduler.order)

        # 返回时间步和剩余的推理步骤数
        return timesteps, num_inference_steps - t_start

    # 定义检查输入参数的方法,接受多个参数
    def check_inputs(
        self,
        prompt,
        image,
        mask_image,
        height,
        width,
        callback_steps,
        output_type,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        ip_adapter_image=None,
        ip_adapter_image_embeds=None,
        controlnet_conditioning_scale=1.0,
        control_guidance_start=0.0,
        control_guidance_end=1.0,
        callback_on_step_end_tensor_inputs=None,
        padding_mask_crop=None,
    # 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.copy 的检查图像方法
    def check_image(self, image, prompt, prompt_embeds):
        # 检查输入图像是否为 PIL 图像对象
        image_is_pil = isinstance(image, PIL.Image.Image)
        # 检查输入图像是否为 PyTorch 张量
        image_is_tensor = isinstance(image, torch.Tensor)
        # 检查输入图像是否为 NumPy 数组
        image_is_np = isinstance(image, np.ndarray)
        # 检查输入图像是否为 PIL 图像列表
        image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
        # 检查输入图像是否为张量列表
        image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
        # 检查输入图像是否为 NumPy 数组列表
        image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)

        # 检查输入图像类型是否合法,若不合法则抛出类型错误
        if (
            not image_is_pil
            and not image_is_tensor
            and not image_is_np
            and not image_is_pil_list
            and not image_is_tensor_list
            and not image_is_np_list
        ):
            raise TypeError(
                f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
            )

        # 如果图像是 PIL 图像,批量大小为 1
        if image_is_pil:
            image_batch_size = 1
        else:
            # 否则,批量大小为图像列表的长度
            image_batch_size = len(image)

        # 检查提示内容的批量大小
        if prompt is not None and isinstance(prompt, str):
            prompt_batch_size = 1
        elif prompt is not None and isinstance(prompt, list):
            prompt_batch_size = len(prompt)
        elif prompt_embeds is not None:
            prompt_batch_size = prompt_embeds.shape[0]

        # 如果图像批量大小不为 1,且与提示批量大小不一致,则抛出值错误
        if image_batch_size != 1 and image_batch_size != prompt_batch_size:
            raise ValueError(
                f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
            )
    # 准备控制图像的函数
        def prepare_control_image(
            self,
            image,  # 输入图像
            width,  # 目标宽度
            height,  # 目标高度
            batch_size,  # 批处理大小
            num_images_per_prompt,  # 每个提示生成的图像数量
            device,  # 设备类型(CPU或GPU)
            dtype,  # 数据类型
            crops_coords,  # 裁剪坐标
            resize_mode,  # 调整大小的模式
            do_classifier_free_guidance=False,  # 是否使用无分类器引导
            guess_mode=False,  # 是否启用猜测模式
        ):
            # 预处理图像,包括调整大小和裁剪,并转换为浮点32格式
            image = self.control_image_processor.preprocess(
                image, height=height, width=width, crops_coords=crops_coords, resize_mode=resize_mode
            ).to(dtype=torch.float32)
            # 获取图像的批处理大小
            image_batch_size = image.shape[0]
    
            if image_batch_size == 1:  # 如果批处理大小为1
                repeat_by = batch_size  # 设置重复次数为批处理大小
            else:
                # 如果图像批处理大小与提示批处理大小相同
                repeat_by = num_images_per_prompt  # 设置重复次数为每个提示生成的图像数量
    
            # 沿着第0维度重复图像
            image = image.repeat_interleave(repeat_by, dim=0)
    
            # 将图像移动到指定的设备并设置数据类型
            image = image.to(device=device, dtype=dtype)
    
            if do_classifier_free_guidance and not guess_mode:  # 如果启用无分类器引导且未启用猜测模式
                # 复制图像并将其连接在一起
                image = torch.cat([image] * 2)
    
            # 返回处理后的图像
            return image
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint.StableDiffusionInpaintPipeline.prepare_latents 复制的函数
        def prepare_latents(
            self,
            batch_size,  # 批处理大小
            num_channels_latents,  # 潜在通道数
            height,  # 高度
            width,  # 宽度
            dtype,  # 数据类型
            device,  # 设备类型
            generator,  # 随机数生成器
            latents=None,  # 潜在变量(可选)
            image=None,  # 输入图像(可选)
            timestep=None,  # 时间步(可选)
            is_strength_max=True,  # 是否最大强度
            return_noise=False,  # 是否返回噪声
            return_image_latents=False,  # 是否返回图像潜在变量
    ):
        # 定义输出形状,包括批处理大小、通道数、高度和宽度
        shape = (
            batch_size,  # 批处理大小
            num_channels_latents,  # 潜在变量的通道数
            int(height) // self.vae_scale_factor,  # 高度缩放后的值
            int(width) // self.vae_scale_factor,  # 宽度缩放后的值
        )
        # 检查生成器是否为列表且其长度与批处理大小不匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            # 如果不匹配,抛出值错误,提示用户
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 检查图像或时间步是否为 None,且最大强度为 False
        if (image is None or timestep is None) and not is_strength_max:
            # 如果是,则抛出值错误,提示必须提供图像或噪声时间步
            raise ValueError(
                "Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
                "However, either the image or the noise timestep has not been provided."
            )

        # 检查是否需要返回图像潜在变量,或者潜在变量为 None 且最大强度为 False
        if return_image_latents or (latents is None and not is_strength_max):
            # 将图像转换为指定设备和数据类型
            image = image.to(device=device, dtype=dtype)

            # 检查图像的通道数是否为 4
            if image.shape[1] == 4:
                # 如果是,则将图像潜在变量设为图像本身
                image_latents = image
            else:
                # 否则,通过 VAE 编码图像生成潜在变量
                image_latents = self._encode_vae_image(image=image, generator=generator)
            # 根据批处理大小重复图像潜在变量以匹配批处理大小
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)

        # 如果潜在变量为 None
        if latents is None:
            # 生成随机噪声张量
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 如果强度为 1,则初始化潜在变量为噪声,否则为图像和噪声的组合
            latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
            # 如果是纯噪声,则将初始化的潜在变量乘以调度器的初始 sigma
            latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
        else:
            # 如果潜在变量不为 None,则将其转移到设备上
            noise = latents.to(device)
            # 根据调度器的初始 sigma 缩放潜在变量
            latents = noise * self.scheduler.init_noise_sigma

        # 将潜在变量放入输出元组中
        outputs = (latents,)

        # 如果需要返回噪声,将其添加到输出中
        if return_noise:
            outputs += (noise,)

        # 如果需要返回图像潜在变量,将其添加到输出中
        if return_image_latents:
            outputs += (image_latents,)

        # 返回输出元组
        return outputs

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint.StableDiffusionInpaintPipeline.prepare_mask_latents 复制
    def prepare_mask_latents(
        # 定义方法的参数,包括掩码、被遮挡图像、批处理大小、高度、宽度、数据类型、设备、生成器和是否进行分类器自由引导
        self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
    # 函数的闭合部分,处理掩膜和图像的形状和数据类型
        ):
            # 将掩膜调整为与潜在空间的形状,以便在连接时不会出错
            # 在转换数据类型之前进行调整,以避免在使用 cpu_offload 和半精度时出现问题
            mask = torch.nn.functional.interpolate(
                # 将掩膜的大小调整为经过 VAE 缩放因子的高度和宽度
                mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
            )
            # 将掩膜移动到指定设备并转换为指定数据类型
            mask = mask.to(device=device, dtype=dtype)
    
            # 将掩膜图像移动到指定设备并转换为指定数据类型
            masked_image = masked_image.to(device=device, dtype=dtype)
    
            # 如果掩膜图像有四个通道,则直接使用掩膜图像作为潜在表示
            if masked_image.shape[1] == 4:
                masked_image_latents = masked_image
            else:
                # 否则,通过 VAE 编码掩膜图像以获得潜在表示
                masked_image_latents = self._encode_vae_image(masked_image, generator=generator)
    
            # 为每个提示复制掩膜和潜在图像,使用适合 MPS 的方法
            if mask.shape[0] < batch_size:
                # 如果掩膜数量不能整除批处理大小,则引发错误
                if not batch_size % mask.shape[0] == 0:
                    raise ValueError(
                        "传入的掩膜数量与所需批处理大小不匹配。掩膜应复制到"
                        f" 总批处理大小 {batch_size},但传入了 {mask.shape[0]} 个掩膜。确保传入的掩膜数量能被总请求的批处理大小整除。"
                    )
                # 复制掩膜以匹配批处理大小
                mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)
            # 如果潜在图像数量不能整除批处理大小,则引发错误
            if masked_image_latents.shape[0] < batch_size:
                if not batch_size % masked_image_latents.shape[0] == 0:
                    raise ValueError(
                        "传入的图像数量与所需批处理大小不匹配。图像应复制到"
                        f" 总批处理大小 {batch_size},但传入了 {masked_image_latents.shape[0]} 个图像。确保传入的图像数量能被总请求的批处理大小整除。"
                    )
                # 复制潜在图像以匹配批处理大小
                masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)
    
            # 如果启用无分类器引导,则重复掩膜两次
            mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
            # 如果启用无分类器引导,则重复潜在图像两次
            masked_image_latents = (
                torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
            )
    
            # 对齐设备,以防在与潜在模型输入连接时出现设备错误
            masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)
            # 返回掩膜和潜在图像
            return mask, masked_image_latents
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_inpaint.StableDiffusionInpaintPipeline._encode_vae_image 复制的内容
    # 定义一个私有方法,用于编码变分自编码器(VAE)图像
    def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
        # 检查生成器是否为列表
        if isinstance(generator, list):
            # 对每个图像批次进行编码并提取潜在表示
            image_latents = [
                retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
                for i in range(image.shape[0])  # 遍历图像的每一张
            ]
            # 将潜在表示沿第0维(批次维度)进行拼接
            image_latents = torch.cat(image_latents, dim=0)
        else:
            # 如果生成器不是列表,则对整个图像进行编码并提取潜在表示
            image_latents = retrieve_latents(self.vae.encode(image), generator=generator)

        # 根据 VAE 配置的缩放因子调整潜在表示
        image_latents = self.vae.config.scaling_factor * image_latents

        # 返回最终的潜在表示
        return image_latents

    # 定义一个只读属性,返回指导比例
    @property
    def guidance_scale(self):
        return self._guidance_scale  # 返回内部存储的指导比例

    # 定义一个只读属性,返回剪切跳过的参数
    @property
    def clip_skip(self):
        return self._clip_skip  # 返回内部存储的剪切跳过参数

    # 定义一个只读属性,判断是否进行无分类器引导
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1  # 当指导比例大于1时返回True

    # 定义一个只读属性,返回交叉注意力的关键字参数
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs  # 返回内部存储的交叉注意力参数

    # 定义一个只读属性,返回时间步数
    @property
    def num_timesteps(self):
        return self._num_timesteps  # 返回内部存储的时间步数

    # 装饰器,关闭梯度计算以提高效率
    @torch.no_grad()
    # 用于替换示例文档字符串的装饰器
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义调用方法,处理生成图像的输入参数
    def __call__(
        # 定义提示内容,支持字符串或字符串列表
        prompt: Union[str, List[str]] = None,
        # 定义输入图像
        image: PipelineImageInput = None,
        # 定义掩膜图像
        mask_image: PipelineImageInput = None,
        # 定义控制图像
        control_image: PipelineImageInput = None,
        # 定义图像高度
        height: Optional[int] = None,
        # 定义图像宽度
        width: Optional[int] = None,
        # 定义填充掩膜裁剪参数
        padding_mask_crop: Optional[int] = None,
        # 定义强度参数
        strength: float = 1.0,
        # 定义推理步骤数
        num_inference_steps: int = 50,
        # 定义指导比例
        guidance_scale: float = 7.5,
        # 定义负提示内容,支持字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 定义每个提示生成的图像数量
        num_images_per_prompt: Optional[int] = 1,
        # 定义η参数
        eta: float = 0.0,
        # 定义生成器,支持单个或多个生成器
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 定义潜在表示
        latents: Optional[torch.Tensor] = None,
        # 定义提示嵌入
        prompt_embeds: Optional[torch.Tensor] = None,
        # 定义负提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 定义图像适配器输入
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 定义图像适配器嵌入
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 定义输出类型,默认为 PIL 图像
        output_type: Optional[str] = "pil",
        # 定义是否返回字典格式的输出
        return_dict: bool = True,
        # 定义交叉注意力的关键字参数
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 定义控制网络条件缩放比例,默认为0.5
        controlnet_conditioning_scale: Union[float, List[float]] = 0.5,
        # 定义是否使用猜测模式
        guess_mode: bool = False,
        # 定义控制引导开始比例,默认为0.0
        control_guidance_start: Union[float, List[float]] = 0.0,
        # 定义控制引导结束比例,默认为1.0
        control_guidance_end: Union[float, List[float]] = 1.0,
        # 定义剪切跳过的参数
        clip_skip: Optional[int] = None,
        # 定义步骤结束时的回调函数
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 定义步骤结束时的张量输入回调参数
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # 允许接收其他关键字参数
        **kwargs,

.\diffusers\pipelines\controlnet\pipeline_controlnet_inpaint_sd_xl.py

# 版权所有 2024 Harutatsu Akiyama, Jinbin Bai 和 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,第 2.0 版(“许可证”)进行许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有约定,
# 否则根据许可证分发的软件是按“原样”提供的,
# 不提供任何形式的明示或暗示的担保或条件。
# 请参阅许可证以了解管理权限和
# 限制的具体条款。

# 导入 inspect 模块以获取对象的签名、源代码等信息
import inspect
# 从 typing 模块导入类型提示工具
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 numpy 库用于数值计算
import numpy as np
# 导入 PIL.Image 模块用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入 PyTorch 的功能性接口
import torch.nn.functional as F
# 从 transformers 库中导入 CLIP 相关模型和处理器
from transformers import (
    CLIPImageProcessor,  # 图像处理器
    CLIPTextModel,  # 文本模型
    CLIPTextModelWithProjection,  # 带投影的文本模型
    CLIPTokenizer,  # 分词器
    CLIPVisionModelWithProjection,  # 带投影的视觉模型
)

# 从 callbacks 模块导入多管道回调类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 从 image_processor 模块导入图像输入处理器
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 从 loaders 模块导入各种加载器混合类
from ...loaders import (
    FromSingleFileMixin,  # 从单个文件加载
    IPAdapterMixin,  # IP 适配器混合
    StableDiffusionXLLoraLoaderMixin,  # 稳定扩散 XL LoRA 加载器混合
    TextualInversionLoaderMixin,  # 文本反转加载器混合
)
# 从 models 模块导入多种模型类
from ...models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel
# 从注意力处理器模块导入不同版本的注意力处理器
from ...models.attention_processor import (
    AttnProcessor2_0,  # 注意力处理器版本 2.0
    XFormersAttnProcessor,  # XFormers 注意力处理器
)
# 从调度器模块导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从 utils 模块导入实用函数和常量
from ...utils import (
    USE_PEFT_BACKEND,  # 使用 PEFT 后端的标志
    deprecate,  # 用于标记过时功能的装饰器
    is_invisible_watermark_available,  # 检查是否可用隐形水印
    logging,  # 日志记录模块
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 缩放 LoRA 层的函数
    unscale_lora_layers,  # 取消缩放 LoRA 层的函数
)
# 从 torch_utils 模块导入特定的 PyTorch 实用工具函数
from ...utils.torch_utils import is_compiled_module, randn_tensor
# 从 pipeline_utils 模块导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从稳定扩散 XL 的输出模块导入管道输出类
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput
# 从 multicontrolnet 模块导入多控制网络模型
from .multicontrolnet import MultiControlNetModel

# 如果可用隐形水印,则导入相应的水印处理器
if is_invisible_watermark_available():
    from diffusers.pipelines.stable_diffusion_xl.watermark import StableDiffusionXLWatermarker

# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor,  # 输入为编码器输出的张量
    generator: Optional[torch.Generator] = None,  # 可选的随机数生成器
    sample_mode: str = "sample"  # 采样模式,默认设置为 "sample"
):
    # 如果 encoder_output 具有 latent_dist 属性并且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从 latent_dist 中采样并返回结果
        return encoder_output.latent_dist.sample(generator)
    # 如果 encoder_output 具有 latent_dist 属性并且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回 latent_dist 的众数
        return encoder_output.latent_dist.mode()
    # 如果 encoder_output 具有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 返回 latents 属性
        return encoder_output.latents
    # 如果没有找到有效属性,则抛出异常
    else:
        raise AttributeError("Could not access latents of provided encoder_output")

# 示例文档字符串
EXAMPLE_DOC_STRING = """
    # 示例代码,展示如何使用Diffusers库进行图像处理
    Examples:
        ```py
        >>> # 安装必要的库
        >>> # !pip install transformers accelerate
        >>> # 导入所需的模块
        >>> from diffusers import StableDiffusionXLControlNetInpaintPipeline, ControlNetModel, DDIMScheduler
        >>> from diffusers.utils import load_image
        >>> from PIL import Image
        >>> import numpy as np
        >>> import torch

        >>> # 从指定URL加载初始图像
        >>> init_image = load_image(
        ...     "https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_inpaint/boy.png"
        ... )
        >>> # 将图像调整为1024x1024像素
        >>> init_image = init_image.resize((1024, 1024))

        >>> # 创建一个生成器并设定随机种子
        >>> generator = torch.Generator(device="cpu").manual_seed(1)

        >>> # 从指定URL加载掩码图像
        >>> mask_image = load_image(
        ...     "https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_inpaint/boy_mask.png"
        ... )
        >>> # 将掩码图像调整为1024x1024像素
        >>> mask_image = mask_image.resize((1024, 1024))

        >>> # 定义一个函数,用于生成Canny边缘图像
        >>> def make_canny_condition(image):
        ...     # 将图像转换为NumPy数组
        ...     image = np.array(image)
        ...     # 应用Canny边缘检测算法
        ...     image = cv2.Canny(image, 100, 200)
        ...     # 增加一个维度,以适应后续操作
        ...     image = image[:, :, None]
        ...     # 复制图像到三个通道,以便转换为RGB格式
        ...     image = np.concatenate([image, image, image], axis=2)
        ...     # 从数组创建图像对象
        ...     image = Image.fromarray(image)
        ...     return image

        >>> # 生成控制图像
        >>> control_image = make_canny_condition(init_image)

        >>> # 从预训练模型加载ControlNet模型
        >>> controlnet = ControlNetModel.from_pretrained(
        ...     "diffusers/controlnet-canny-sdxl-1.0", torch_dtype=torch.float16
        ... )
        >>> # 从预训练模型加载Stable Diffusion管道
        >>> pipe = StableDiffusionXLControlNetInpaintPipeline.from_pretrained(
        ...     "stabilityai/stable-diffusion-xl-base-1.0", controlnet=controlnet, torch_dtype=torch.float16
        ... )

        >>> # 启用模型CPU卸载,以节省内存
        >>> pipe.enable_model_cpu_offload()

        >>> # 生成图像
        >>> image = pipe(
        ...     "a handsome man with ray-ban sunglasses",
        ...     num_inference_steps=20,
        ...     generator=generator,
        ...     eta=1.0,
        ...     image=init_image,
        ...     mask_image=mask_image,
        ...     control_image=control_image,
        ... ).images[0]  # 从生成的图像列表中提取第一张图像
        ```py 
"""
# 文档字符串,描述该模块的功能或用法


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制的函数
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
    """
    根据 `guidance_rescale` 重新缩放 `noise_cfg`。基于[Common Diffusion Noise Schedules and
    Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf)中的发现。参见第 3.4 节
    """
    # 计算 noise_pred_text 的标准差,保持维度
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算 noise_cfg 的标准差,保持维度
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 根据标准差重缩放来自指导的结果(修复过度曝光)
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 按照指导比例混合原始结果,以避免生成“平淡”的图像
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回重新缩放后的 noise_cfg
    return noise_cfg


# 定义一个用于文本到图像生成的 Stable Diffusion XL 控制网络插值管道类
class StableDiffusionXLControlNetInpaintPipeline(
    DiffusionPipeline,  # 继承自 DiffusionPipeline
    StableDiffusionMixin,  # 继承自 StableDiffusionMixin
    StableDiffusionXLLoraLoaderMixin,  # 继承自 StableDiffusionXLLoraLoaderMixin
    FromSingleFileMixin,  # 继承自 FromSingleFileMixin
    IPAdapterMixin,  # 继承自 IPAdapterMixin
    TextualInversionLoaderMixin,  # 继承自 TextualInversionLoaderMixin
):
    r"""
    用于使用 Stable Diffusion XL 进行文本到图像生成的管道。

    该模型继承自 [`DiffusionPipeline`]。有关库为所有管道实现的通用方法(例如下载或保存、在特定设备上运行等)的文档,请查阅超类文档。

    此管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
        - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
    # 定义参数说明的文档字符串
        Args:
            vae ([`AutoencoderKL`]):
                定义用于编码和解码图像的变分自编码器模型,将图像转换为潜在表示。
            text_encoder ([`CLIPTextModel`]):
                冻结的文本编码器。Stable Diffusion XL使用
                [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel)的文本部分,
                特别是[clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)变体。
            text_encoder_2 ([` CLIPTextModelWithProjection`]):
                第二个冻结文本编码器。Stable Diffusion XL使用
                [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection)的文本和池部分,
                特别是[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)变体。
            tokenizer (`CLIPTokenizer`):
                [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer)类的标记器。
            tokenizer_2 (`CLIPTokenizer`):
                第二个[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer)类的标记器。
            unet ([`UNet2DConditionModel`]): 条件U-Net架构,用于去噪编码的图像潜在表示。
            scheduler ([`SchedulerMixin`]):
                用于与`unet`结合使用的调度器,以去噪编码的图像潜在表示。可以是
                [`DDIMScheduler`], [`LMSDiscreteScheduler`]或[`PNDMScheduler`]之一。
        """
    
        # 定义模型组件的顺序,以便进行CPU卸载
        model_cpu_offload_seq = "text_encoder->text_encoder_2->unet->vae"
    
        # 定义可选组件的列表
        _optional_components = [
            "tokenizer",  # 标记器
            "tokenizer_2",  # 第二个标记器
            "text_encoder",  # 文本编码器
            "text_encoder_2",  # 第二个文本编码器
            "image_encoder",  # 图像编码器
            "feature_extractor",  # 特征提取器
        ]
        # 定义回调张量输入的列表
        _callback_tensor_inputs = [
            "latents",  # 潜在表示
            "prompt_embeds",  # 提示嵌入
            "negative_prompt_embeds",  # 负提示嵌入
            "add_text_embeds",  # 添加的文本嵌入
            "add_time_ids",  # 添加的时间ID
            "negative_pooled_prompt_embeds",  # 负池化提示嵌入
            "add_neg_time_ids",  # 添加的负时间ID
            "mask",  # 掩码
            "masked_image_latents",  # 被掩码的图像潜在表示
        ]
    
        # 初始化方法定义,接收多个模型和参数
        def __init__(
            self,
            vae: AutoencoderKL,  # 变分自编码器模型
            text_encoder: CLIPTextModel,  # 文本编码器
            text_encoder_2: CLIPTextModelWithProjection,  # 第二个文本编码器
            tokenizer: CLIPTokenizer,  # 第一个标记器
            tokenizer_2: CLIPTokenizer,  # 第二个标记器
            unet: UNet2DConditionModel,  # 条件U-Net模型
            controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel],  # 控制网络模型或模型列表
            scheduler: KarrasDiffusionSchedulers,  # Karras调度器
            requires_aesthetics_score: bool = False,  # 是否需要美学评分的布尔值
            force_zeros_for_empty_prompt: bool = True,  # 对空提示强制使用零的布尔值
            add_watermarker: Optional[bool] = None,  # 可选的水印标记布尔值
            feature_extractor: Optional[CLIPImageProcessor] = None,  # 可选的特征提取器
            image_encoder: Optional[CLIPVisionModelWithProjection] = None,  # 可选的图像编码器
    # 初始化父类构造函数
        ):
            super().__init__()
    
            # 检查 controlnet 是否为列表或元组,若是则转换为 MultiControlNetModel 实例
            if isinstance(controlnet, (list, tuple)):
                controlnet = MultiControlNetModel(controlnet)
    
            # 注册各个模块以供使用
            self.register_modules(
                # 注册变分自编码器
                vae=vae,
                # 注册文本编码器
                text_encoder=text_encoder,
                # 注册第二文本编码器
                text_encoder_2=text_encoder_2,
                # 注册标记器
                tokenizer=tokenizer,
                # 注册第二标记器
                tokenizer_2=tokenizer_2,
                # 注册联合网络
                unet=unet,
                # 注册控制网络
                controlnet=controlnet,
                # 注册调度器
                scheduler=scheduler,
                # 注册特征提取器
                feature_extractor=feature_extractor,
                # 注册图像编码器
                image_encoder=image_encoder,
            )
            # 将强制零填充的参数注册到配置中
            self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
            # 将需要美学评分的参数注册到配置中
            self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
            # 计算 VAE 的缩放因子
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建图像处理器实例,使用 VAE 缩放因子
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
            # 创建掩模处理器实例,配置不同的处理参数
            self.mask_processor = VaeImageProcessor(
                vae_scale_factor=self.vae_scale_factor, do_normalize=False, do_binarize=True, do_convert_grayscale=True
            )
            # 创建控制图像处理器实例,配置不同的处理参数
            self.control_image_processor = VaeImageProcessor(
                vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
            )
    
            # 确定是否添加水印,若未指定则检查是否可用
            add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
    
            # 若需要添加水印,则初始化水印器
            if add_watermarker:
                self.watermark = StableDiffusionXLWatermarker()
            else:
                # 否则水印设置为 None
                self.watermark = None
    
        # 从稳定扩散管道复制的函数,用于编码提示
        def encode_prompt(
            # 提示字符串
            prompt: str,
            # 第二个提示字符串,可选
            prompt_2: Optional[str] = None,
            # 设备类型,可选
            device: Optional[torch.device] = None,
            # 每个提示生成的图像数量
            num_images_per_prompt: int = 1,
            # 是否进行分类器自由引导
            do_classifier_free_guidance: bool = True,
            # 负面提示字符串,可选
            negative_prompt: Optional[str] = None,
            # 第二个负面提示字符串,可选
            negative_prompt_2: Optional[str] = None,
            # 提示嵌入张量,可选
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负面提示嵌入张量,可选
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 经过处理的提示嵌入张量,可选
            pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 经过处理的负面提示嵌入张量,可选
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # Lora 缩放因子,可选
            lora_scale: Optional[float] = None,
            # 跳过剪辑的参数,可选
            clip_skip: Optional[int] = None,
        # 从稳定扩散管道复制的函数,用于编码图像
    # 定义一个编码图像的函数,接收图像、设备、每个提示的图像数量以及可选的隐藏状态参数
    def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
        # 获取图像编码器参数的数据类型
        dtype = next(self.image_encoder.parameters()).dtype
    
        # 检查输入图像是否为张量,若不是,则使用特征提取器将其转换为张量
        if not isinstance(image, torch.Tensor):
            image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
        # 将图像移动到指定设备,并转换为适当的数据类型
        image = image.to(device=device, dtype=dtype)
        # 如果需要输出隐藏状态
        if output_hidden_states:
            # 编码图像并获取倒数第二层的隐藏状态
            image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
            # 根据每个提示的图像数量重复隐藏状态
            image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
            # 编码全零图像并获取倒数第二层的隐藏状态
            uncond_image_enc_hidden_states = self.image_encoder(
                torch.zeros_like(image), output_hidden_states=True
            ).hidden_states[-2]
            # 根据每个提示的图像数量重复隐藏状态
            uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                num_images_per_prompt, dim=0
            )
            # 返回编码后的图像和无条件图像的隐藏状态
            return image_enc_hidden_states, uncond_image_enc_hidden_states
        else:
            # 编码图像并获取图像嵌入
            image_embeds = self.image_encoder(image).image_embeds
            # 根据每个提示的图像数量重复图像嵌入
            image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
            # 创建与图像嵌入相同形状的全零张量作为无条件图像嵌入
            uncond_image_embeds = torch.zeros_like(image_embeds)
    
            # 返回编码后的图像嵌入和无条件图像嵌入
            return image_embeds, uncond_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的函数
        def prepare_ip_adapter_image_embeds(
            # 函数参数定义,包括适配器图像、图像嵌入、设备、每个提示的图像数量和分类器自由引导的标志
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    # 函数体开始
        ):
            # 初始化用于存储图像嵌入的列表
            image_embeds = []
            # 如果使用分类器自由引导,则初始化负图像嵌入列表
            if do_classifier_free_guidance:
                negative_image_embeds = []
            # 检查适配器图像嵌入是否为 None
            if ip_adapter_image_embeds is None:
                # 确保适配器图像为列表形式
                if not isinstance(ip_adapter_image, list):
                    ip_adapter_image = [ip_adapter_image]
    
                # 检查适配器图像的长度是否与 IP 适配器数量匹配
                if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                    raise ValueError(
                        f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                    )
    
                # 遍历每个适配器图像和相应的图像投影层
                for single_ip_adapter_image, image_proj_layer in zip(
                    ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
                ):
                    # 检查是否需要输出隐藏状态
                    output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                    # 对单个适配器图像进行编码,得到嵌入
                    single_image_embeds, single_negative_image_embeds = self.encode_image(
                        single_ip_adapter_image, device, 1, output_hidden_state
                    )
    
                    # 将单个图像嵌入添加到列表中
                    image_embeds.append(single_image_embeds[None, :])
                    # 如果使用分类器自由引导,添加负图像嵌入
                    if do_classifier_free_guidance:
                        negative_image_embeds.append(single_negative_image_embeds[None, :])
            else:
                # 如果已有适配器图像嵌入,遍历这些嵌入
                for single_image_embeds in ip_adapter_image_embeds:
                    # 如果使用分类器自由引导,拆分负和正图像嵌入
                    if do_classifier_free_guidance:
                        single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                        negative_image_embeds.append(single_negative_image_embeds)
                    # 将单个图像嵌入添加到列表中
                    image_embeds.append(single_image_embeds)
    
            # 初始化适配器图像嵌入的列表
            ip_adapter_image_embeds = []
            # 遍历每个图像嵌入,进行处理
            for i, single_image_embeds in enumerate(image_embeds):
                # 将每个图像嵌入复制指定次数以生成多图像嵌入
                single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
                # 如果使用分类器自由引导,处理负图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                    # 将负图像嵌入与正图像嵌入合并
                    single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
    
                # 将图像嵌入移动到指定设备
                single_image_embeds = single_image_embeds.to(device=device)
                # 将处理后的嵌入添加到适配器图像嵌入列表中
                ip_adapter_image_embeds.append(single_image_embeds)
    
            # 返回最终的适配器图像嵌入列表
            return ip_adapter_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的代码
    # 准备额外的参数用于调度器步骤,因为并不是所有调度器都有相同的参数签名
        def prepare_extra_step_kwargs(self, generator, eta):
            # eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
            # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
            # 取值应在 [0, 1] 之间
    
            # 检查调度器步骤是否接受 eta 参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 初始化额外参数字典
            extra_step_kwargs = {}
            # 如果接受 eta,则将其添加到额外参数字典中
            if accepts_eta:
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器步骤是否接受 generator 参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 如果接受 generator,则将其添加到额外参数字典中
            if accepts_generator:
                extra_step_kwargs["generator"] = generator
            # 返回准备好的额外参数字典
            return extra_step_kwargs
    
        # 检查输入图像及其对应的提示和提示嵌入
        def check_image(self, image, prompt, prompt_embeds):
            # 检查图像是否为 PIL 图像类型
            image_is_pil = isinstance(image, PIL.Image.Image)
            # 检查图像是否为 PyTorch 张量类型
            image_is_tensor = isinstance(image, torch.Tensor)
            # 检查图像是否为 NumPy 数组类型
            image_is_np = isinstance(image, np.ndarray)
            # 检查图像是否为 PIL 图像列表
            image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
            # 检查图像是否为 PyTorch 张量列表
            image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
            # 检查图像是否为 NumPy 数组列表
            image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)
    
            # 如果图像不属于上述任何一种类型,则引发类型错误
            if (
                not image_is_pil
                and not image_is_tensor
                and not image_is_np
                and not image_is_pil_list
                and not image_is_tensor_list
                and not image_is_np_list
            ):
                raise TypeError(
                    f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
                )
    
            # 如果图像为 PIL 图像,则图像批量大小为 1
            if image_is_pil:
                image_batch_size = 1
            else:
                # 否则,图像批量大小为图像列表的长度
                image_batch_size = len(image)
    
            # 检查提示是否为字符串类型
            if prompt is not None and isinstance(prompt, str):
                prompt_batch_size = 1
            # 检查提示是否为列表类型
            elif prompt is not None and isinstance(prompt, list):
                prompt_batch_size = len(prompt)
            # 如果提示嵌入不为空,则根据其形状确定提示批量大小
            elif prompt_embeds is not None:
                prompt_batch_size = prompt_embeds.shape[0]
    
            # 如果图像批量大小不为 1,且与提示批量大小不匹配,则引发值错误
            if image_batch_size != 1 and image_batch_size != prompt_batch_size:
                raise ValueError(
                    f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
                )
    # 定义检查输入参数的函数,包含多个参数
    def check_inputs(
        self,
        prompt,  # 输入的提示文本
        prompt_2,  # 第二个输入的提示文本
        image,  # 输入的图像
        mask_image,  # 输入的掩码图像
        strength,  # 强度参数
        num_inference_steps,  # 推理步骤数
        callback_steps,  # 回调步骤
        output_type,  # 输出类型
        negative_prompt=None,  # 负面提示文本(可选)
        negative_prompt_2=None,  # 第二个负面提示文本(可选)
        prompt_embeds=None,  # 提示的嵌入表示(可选)
        negative_prompt_embeds=None,  # 负面提示的嵌入表示(可选)
        ip_adapter_image=None,  # IP 适配器图像(可选)
        ip_adapter_image_embeds=None,  # IP 适配器图像嵌入(可选)
        pooled_prompt_embeds=None,  # 池化后的提示嵌入(可选)
        negative_pooled_prompt_embeds=None,  # 负面池化提示嵌入(可选)
        controlnet_conditioning_scale=1.0,  # ControlNet 的条件缩放因子
        control_guidance_start=0.0,  # Control 引导的起始值
        control_guidance_end=1.0,  # Control 引导的结束值
        callback_on_step_end_tensor_inputs=None,  # 步骤结束时的回调张量输入(可选)
        padding_mask_crop=None,  # 填充掩码裁剪(可选)
    # 定义准备控制图像的函数,包含多个参数
    def prepare_control_image(
        self,
        image,  # 输入的图像
        width,  # 图像的宽度
        height,  # 图像的高度
        batch_size,  # 批处理大小
        num_images_per_prompt,  # 每个提示的图像数量
        device,  # 设备类型(CPU/GPU)
        dtype,  # 数据类型
        crops_coords,  # 裁剪坐标
        resize_mode,  # 调整大小的模式
        do_classifier_free_guidance=False,  # 是否进行无分类器引导
        guess_mode=False,  # 是否启用猜测模式
    ):
        # 预处理图像,调整大小和裁剪,并转换为指定的数据类型
        image = self.control_image_processor.preprocess(
            image, height=height, width=width, crops_coords=crops_coords, resize_mode=resize_mode
        ).to(dtype=torch.float32)  # 转换为浮点型
        # 获取图像的批处理大小
        image_batch_size = image.shape[0]

        # 如果图像批处理大小为1,则重复次数为批处理大小
        if image_batch_size == 1:
            repeat_by = batch_size
        else:
            # 如果图像批处理大小与提示批处理大小相同
            repeat_by = num_images_per_prompt

        # 按指定维度重复图像
        image = image.repeat_interleave(repeat_by, dim=0)

        # 将图像移动到指定的设备和数据类型
        image = image.to(device=device, dtype=dtype)

        # 如果启用无分类器引导且未启用猜测模式,则复制图像
        if do_classifier_free_guidance and not guess_mode:
            image = torch.cat([image] * 2)  # 将图像重复连接

        # 返回处理后的图像
        return image

    # 定义准备潜变量的函数,包含多个参数
    def prepare_latents(
        self,
        batch_size,  # 批处理大小
        num_channels_latents,  # 潜变量的通道数
        height,  # 高度
        width,  # 宽度
        dtype,  # 数据类型
        device,  # 设备类型
        generator,  # 随机数生成器
        latents=None,  # 潜变量(可选)
        image=None,  # 输入图像(可选)
        timestep=None,  # 时间步(可选)
        is_strength_max=True,  # 强度是否达到最大值
        add_noise=True,  # 是否添加噪声
        return_noise=False,  # 是否返回噪声
        return_image_latents=False,  # 是否返回图像潜变量
    ):
        # 定义形状,包括批量大小、通道数和缩放后的高度和宽度
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器列表长度是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果图像或时间步长未提供且强度未达到最大值,抛出错误
        if (image is None or timestep is None) and not is_strength_max:
            raise ValueError(
                "Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
                "However, either the image or the noise timestep has not been provided."
            )

        # 如果需要返回图像潜变量或潜变量为空且强度未达到最大值
        if return_image_latents or (latents is None and not is_strength_max):
            # 将图像转换为指定设备和数据类型
            image = image.to(device=device, dtype=dtype)

            # 如果图像有四个通道,直接赋值给图像潜变量
            if image.shape[1] == 4:
                image_latents = image
            else:
                # 否则通过 VAE 编码图像以获取潜变量
                image_latents = self._encode_vae_image(image=image, generator=generator)
            # 根据批量大小重复潜变量
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)

        # 如果潜变量为空且需要添加噪声
        if latents is None and add_noise:
            # 创建随机噪声张量
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 如果强度为 1,则将潜变量初始化为噪声,否则将其初始化为图像和噪声的组合
            latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
            # 如果强度为最大值,则根据调度器的初始 sigma 缩放潜变量
            latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
        elif add_noise:
            # 如果需要添加噪声,将潜变量转换到指定设备
            noise = latents.to(device)
            # 用调度器的初始 sigma 缩放潜变量
            latents = noise * self.scheduler.init_noise_sigma
        else:
            # 创建随机噪声张量
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 将图像潜变量转换到指定设备
            latents = image_latents.to(device)

        # 创建输出元组,初始仅包含潜变量
        outputs = (latents,)

        # 如果需要返回噪声,则将其添加到输出中
        if return_noise:
            outputs += (noise,)

        # 如果需要返回图像潜变量,则将其添加到输出中
        if return_image_latents:
            outputs += (image_latents,)

        # 返回最终的输出元组
        return outputs
    # 定义一个私有方法,用于编码变分自编码器(VAE)的图像
        def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
            # 获取输入图像的数值类型
            dtype = image.dtype
            # 如果配置要求强制转换数据类型为浮点型
            if self.vae.config.force_upcast:
                # 将图像转换为浮点型
                image = image.float()
                # 将 VAE 模型转换为浮点32位类型
                self.vae.to(dtype=torch.float32)
    
            # 如果生成器是一个列表
            if isinstance(generator, list):
                # 遍历每个图像,编码并获取对应的潜在变量
                image_latents = [
                    retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
                    for i in range(image.shape[0])  # 遍历图像的每一维
                ]
                # 将所有潜在变量在第0维上连接成一个张量
                image_latents = torch.cat(image_latents, dim=0)
            else:
                # 编码整个图像并获取潜在变量
                image_latents = retrieve_latents(self.vae.encode(image), generator=generator)
    
            # 如果配置要求强制转换数据类型为原始类型
            if self.vae.config.force_upcast:
                # 将 VAE 模型恢复为原始数据类型
                self.vae.to(dtype)
    
            # 将潜在变量转换为原始数据类型
            image_latents = image_latents.to(dtype)
            # 将潜在变量乘以缩放因子
            image_latents = self.vae.config.scaling_factor * image_latents
    
            # 返回编码后的潜在变量
            return image_latents
    
        # 定义一个方法,用于准备掩码潜在变量
        def prepare_mask_latents(
            # 接收掩码、被掩盖的图像、批次大小、高度、宽度、数据类型、设备、生成器和分类器自由引导的标志
            self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
    ):
        # 将掩膜调整为与潜在形状相同,以便在连接掩膜与潜在时使用
        # 在转换为数据类型之前进行此操作,以避免在使用 cpu_offload 和半精度时出现问题
        mask = torch.nn.functional.interpolate(
            # 调整掩膜的大小,使其与潜在相匹配
            mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
        )
        # 将掩膜移动到指定设备,并设置数据类型
        mask = mask.to(device=device, dtype=dtype)

        # 为每个提示生成重复的掩膜和掩膜图像潜在,使用与 MPS 友好的方法
        if mask.shape[0] < batch_size:
            # 检查掩膜数量是否可以被批量大小整除
            if not batch_size % mask.shape[0] == 0:
                raise ValueError(
                    # 抛出错误,提示掩膜与批量大小不匹配
                    "The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"
                    f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"
                    " of masks that you pass is divisible by the total requested batch size."
                )
            # 重复掩膜以匹配批量大小
            mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)

        # 如果启用分类器自由引导,重复掩膜两次;否则保持不变
        mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask

        masked_image_latents = None
        if masked_image is not None:
            # 将掩膜图像移动到指定设备,并设置数据类型
            masked_image = masked_image.to(device=device, dtype=dtype)
            # 对掩膜图像进行 VAE 编码,生成潜在表示
            masked_image_latents = self._encode_vae_image(masked_image, generator=generator)
            # 检查掩膜图像潜在数量是否可以被批量大小整除
            if masked_image_latents.shape[0] < batch_size:
                if not batch_size % masked_image_latents.shape[0] == 0:
                    raise ValueError(
                        # 抛出错误,提示图像与批量大小不匹配
                        "The passed images and the required batch size don't match. Images are supposed to be duplicated"
                        f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."
                        " Make sure the number of images that you pass is divisible by the total requested batch size."
                    )
                # 重复掩膜图像潜在以匹配批量大小
                masked_image_latents = masked_image_latents.repeat(
                    batch_size // masked_image_latents.shape[0], 1, 1, 1
                )

            # 如果启用分类器自由引导,重复潜在表示两次;否则保持不变
            masked_image_latents = (
                torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
            )

            # 将掩膜图像潜在移动到指定设备,以防在与潜在模型输入连接时出现设备错误
            masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)

        # 返回处理后的掩膜和掩膜图像潜在
        return mask, masked_image_latents

    # 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline.get_timesteps 复制的
    # 获取时间步长,包含推理步数、强度、设备和可选的去噪开始时间
    def get_timesteps(self, num_inference_steps, strength, device, denoising_start=None):
        # 如果没有提供去噪开始时间,则计算初始时间步
        if denoising_start is None:
            # 计算初始时间步,取强度和推理步数的乘积与推理步数中的最小值
            init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
            # 计算开始时间步,确保不小于 0
            t_start = max(num_inference_steps - init_timestep, 0)
        else:
            # 如果提供了去噪开始时间,开始时间步设为 0
            t_start = 0

        # 根据调度器的时间步数组,从开始时间步切片获取时间步
        timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]

        # 如果有去噪开始时间,强度不再重要;
        # 此时强度由去噪开始时间决定
        if denoising_start is not None:
            # 计算离散时间步的截止值
            discrete_timestep_cutoff = int(
                round(
                    self.scheduler.config.num_train_timesteps
                    - (denoising_start * self.scheduler.config.num_train_timesteps)
                )
            )

            # 统计时间步小于截止值的数量,得到推理步数
            num_inference_steps = (timesteps < discrete_timestep_cutoff).sum().item()
            # 如果调度器为二阶调度器且推理步数为偶数,可能需要加 1
            if self.scheduler.order == 2 and num_inference_steps % 2 == 0:
                # 因为每个时间步(最高的时间步除外)被重复,如果推理步数为偶数,
                # 则可能会在去噪步骤中间切分时间步,导致结果不正确。
                # 加 1 确保去噪过程在调度器的二阶导数步骤后结束
                num_inference_steps = num_inference_steps + 1

            # 因为 t_n+1 >= t_n,从结束开始切片获取时间步
            timesteps = timesteps[-num_inference_steps:]
            # 返回时间步和推理步数
            return timesteps, num_inference_steps

        # 返回时间步和从开始时间步减去的推理步数
        return timesteps, num_inference_steps - t_start

    # 定义获取附加时间 ID 的私有方法,参数包括原始大小、裁剪坐标、目标大小、美学分数等
    def _get_add_time_ids(
        self,
        original_size,
        crops_coords_top_left,
        target_size,
        aesthetic_score,
        negative_aesthetic_score,
        dtype,
        text_encoder_projection_dim=None,
    ):
        # 检查配置是否需要美学评分
        if self.config.requires_aesthetics_score:
            # 创建添加时间 ID 列表,包括原始大小、裁剪坐标和美学评分
            add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
            # 创建添加负美学评分 ID 列表,包括原始大小、裁剪坐标和负美学评分
            add_neg_time_ids = list(original_size + crops_coords_top_left + (negative_aesthetic_score,))
        else:
            # 创建添加时间 ID 列表,包括原始大小、裁剪坐标和目标大小
            add_time_ids = list(original_size + crops_coords_top_left + target_size)
            # 创建添加负时间 ID 列表,包括原始大小、裁剪坐标和目标大小
            add_neg_time_ids = list(original_size + crops_coords_top_left + target_size)

        # 计算通过的添加嵌入维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取期望的添加嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查期望的嵌入维度是否大于实际通过的嵌入维度,并且差值是否等于配置的添加时间嵌入维度
        if (
            expected_add_embed_dim > passed_add_embed_dim
            and (expected_add_embed_dim - passed_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出错误,提示嵌入向量长度不匹配
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to enable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=True)` to make sure `aesthetic_score` {aesthetic_score} and `negative_aesthetic_score` {negative_aesthetic_score} is correctly used by the model."
            )
        # 检查期望的嵌入维度是否小于实际通过的嵌入维度,并且差值是否等于配置的添加时间嵌入维度
        elif (
            expected_add_embed_dim < passed_add_embed_dim
            and (passed_add_embed_dim - expected_add_embed_dim) == self.unet.config.addition_time_embed_dim
        ):
            # 抛出错误,提示嵌入向量长度不匹配
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to disable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=False)` to make sure `target_size` {target_size} is correctly used by the model."
            )
        # 检查期望的嵌入维度是否与实际通过的嵌入维度不相等
        elif expected_add_embed_dim != passed_add_embed_dim:
            # 抛出错误,提示模型配置不正确
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将添加时间 ID 转换为张量
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 将添加负时间 ID 转换为张量
        add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)

        # 返回添加时间 ID 和添加负时间 ID
        return add_time_ids, add_neg_time_ids

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.StableDiffusionUpscalePipeline.upcast_vae 中复制
    # 定义一个函数用于提升 VAE 的类型
        def upcast_vae(self):
            # 获取 VAE 的数据类型
            dtype = self.vae.dtype
            # 将 VAE 转换为浮点32位数据类型
            self.vae.to(dtype=torch.float32)
            # 检查当前使用的处理器是否为特定版本或类型
            use_torch_2_0_or_xformers = isinstance(
                self.vae.decoder.mid_block.attentions[0].processor,
                (
                    AttnProcessor2_0,
                    XFormersAttnProcessor,
                ),
            )
            # 如果使用 xformers 或 torch_2_0,注意力模块不需要使用浮点32位,可以节省大量内存
            if use_torch_2_0_or_xformers:
                # 将后量化卷积转换为相应的数据类型
                self.vae.post_quant_conv.to(dtype)
                # 将输入卷积层转换为相应的数据类型
                self.vae.decoder.conv_in.to(dtype)
                # 将中间块转换为相应的数据类型
                self.vae.decoder.mid_block.to(dtype)
    
        # 定义属性以获取指导比例
        @property
        def guidance_scale(self):
            # 返回指导比例的值
            return self._guidance_scale
    
        # 定义属性以获取剪辑跳过的值
        @property
        def clip_skip(self):
            # 返回剪辑跳过的值
            return self._clip_skip
    
        # 定义属性以判断是否进行无分类器引导
        # 这里的 `guidance_scale` 类似于 Imagen 论文中方程 (2) 的引导权重 `w`
        # `guidance_scale = 1` 表示不进行分类器无引导。
        @property
        def do_classifier_free_guidance(self):
            # 返回是否进行无分类器引导的布尔值
            return self._guidance_scale > 1
    
        # 定义属性以获取交叉注意力的参数
        @property
        def cross_attention_kwargs(self):
            # 返回交叉注意力的参数
            return self._cross_attention_kwargs
    
        # 定义属性以获取时间步数
        @property
        def num_timesteps(self):
            # 返回时间步数的值
            return self._num_timesteps
    
        # 禁用梯度计算以节省内存
        @torch.no_grad()
        # 用于替换示例文档字符串的装饰器
        @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义可调用的方法,用于处理输入参数并执行相关操作
        def __call__(
            self,  # 方法本身
            prompt: Union[str, List[str]] = None,  # 输入提示,支持字符串或字符串列表,默认为 None
            prompt_2: Optional[Union[str, List[str]]] = None,  # 第二个输入提示,支持字符串或字符串列表,默认为 None
            image: PipelineImageInput = None,  # 输入图像,默认为 None
            mask_image: PipelineImageInput = None,  # 输入掩码图像,默认为 None
            control_image: Union[  # 控制图像,支持单个或多个输入图像
                PipelineImageInput,
                List[PipelineImageInput],
            ] = None,  # 默认为 None
            height: Optional[int] = None,  # 输出图像的高度,默认为 None
            width: Optional[int] = None,  # 输出图像的宽度,默认为 None
            padding_mask_crop: Optional[int] = None,  # 填充掩码裁剪参数,默认为 None
            strength: float = 0.9999,  # 强度参数,默认为 0.9999
            num_inference_steps: int = 50,  # 推理步骤数量,默认为 50
            denoising_start: Optional[float] = None,  # 去噪开始的值,默认为 None
            denoising_end: Optional[float] = None,  # 去噪结束的值,默认为 None
            guidance_scale: float = 5.0,  # 引导缩放因子,默认为 5.0
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 负向提示,支持字符串或字符串列表,默认为 None
            negative_prompt_2: Optional[Union[str, List[str]]] = None,  # 第二个负向提示,支持字符串或字符串列表,默认为 None
            num_images_per_prompt: Optional[int] = 1,  # 每个提示生成的图像数量,默认为 1
            eta: float = 0.0,  # 影响随机性的参数,默认为 0.0
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,  # 随机数生成器,默认为 None
            latents: Optional[torch.Tensor] = None,  # 潜在变量,默认为 None
            prompt_embeds: Optional[torch.Tensor] = None,  # 提示嵌入,默认为 None
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 负向提示嵌入,默认为 None
            ip_adapter_image: Optional[PipelineImageInput] = None,  # 输入适配器图像,默认为 None
            ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,  # 输入适配器图像嵌入,默认为 None
            pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 池化后的提示嵌入,默认为 None
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 池化后的负向提示嵌入,默认为 None
            output_type: Optional[str] = "pil",  # 输出类型,默认为 "pil"
            return_dict: bool = True,  # 是否返回字典,默认为 True
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,  # 交叉注意力参数,默认为 None
            controlnet_conditioning_scale: Union[float, List[float]] = 1.0,  # 控制网条件缩放因子,默认为 1.0
            guess_mode: bool = False,  # 猜测模式开关,默认为 False
            control_guidance_start: Union[float, List[float]] = 0.0,  # 控制引导开始值,默认为 0.0
            control_guidance_end: Union[float, List[float]] = 1.0,  # 控制引导结束值,默认为 1.0
            guidance_rescale: float = 0.0,  # 引导重新缩放因子,默认为 0.0
            original_size: Tuple[int, int] = None,  # 原始图像尺寸,默认为 None
            crops_coords_top_left: Tuple[int, int] = (0, 0),  # 裁剪的左上角坐标,默认为 (0, 0)
            target_size: Tuple[int, int] = None,  # 目标图像尺寸,默认为 None
            aesthetic_score: float = 6.0,  # 美学评分,默认为 6.0
            negative_aesthetic_score: float = 2.5,  # 负向美学评分,默认为 2.5
            clip_skip: Optional[int] = None,  # 剪切跳过参数,默认为 None
            callback_on_step_end: Optional[  # 步骤结束时的回调函数,支持多种类型
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,  # 默认为 None
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],  # 步骤结束时的张量输入,默认为 ["latents"]
            **kwargs,  # 额外的关键字参数

.\diffusers\pipelines\controlnet\pipeline_controlnet_sd_xl.py

# 版权声明,2024年HuggingFace团队所有权利
# 
# 根据Apache许可证第2.0版(“许可证”)授权;
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下网址获取许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律或书面协议另有约定,软件
# 根据许可证分发是基于“原样”基础,
# 不提供任何形式的明示或暗示的担保或条件。
# 请参见许可证以获取特定语言的权限和
# 限制。

# 导入inspect模块,用于检查对象的属性和方法
import inspect
# 从typing模块导入类型注释所需的各种类型
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入numpy库,通常用于数值计算
import numpy as np
# 导入PIL库中的Image模块,用于图像处理
import PIL.Image
# 导入PyTorch库
import torch
# 导入PyTorch中的函数式API,用于深度学习操作
import torch.nn.functional as F
# 从transformers库导入多个CLIP相关模型和处理器
from transformers import (
    CLIPImageProcessor,  # 图像处理器
    CLIPTextModel,  # 文本模型
    CLIPTextModelWithProjection,  # 带有投影的文本模型
    CLIPTokenizer,  # 文本分词器
    CLIPVisionModelWithProjection,  # 带有投影的视觉模型
)

# 从diffusers.utils导入检查隐形水印可用性的函数
from diffusers.utils.import_utils import is_invisible_watermark_available

# 导入多个回调和处理器类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入图像处理器相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 导入加载器的多个混合类
from ...loaders import (
    FromSingleFileMixin,  # 单文件加载混合类
    IPAdapterMixin,  # IP适配器混合类
    StableDiffusionXLLoraLoaderMixin,  # 稳定扩散XL Lora加载混合类
    TextualInversionLoaderMixin,  # 文本反转加载混合类
)
# 导入多个模型类
from ...models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel
# 导入注意力处理器相关的类
from ...models.attention_processor import (
    AttnProcessor2_0,  # 注意力处理器2.0
    XFormersAttnProcessor,  # XFormers注意力处理器
)
# 从Lora模型中导入调整文本编码器的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入调度器相关的类
from ...schedulers import KarrasDiffusionSchedulers
# 从utils模块导入多个实用函数和常量
from ...utils import (
    USE_PEFT_BACKEND,  # 指示是否使用PEFT后端
    deprecate,  # 用于标记弃用的函数
    logging,  # 日志记录工具
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 调整Lora层的比例
    unscale_lora_layers,  # 反调整Lora层的比例
)
# 从torch_utils导入与PyTorch相关的实用工具
from ...utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor
# 从pipeline_utils导入扩散管道及其混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从stable_diffusion_xl导入管道输出类
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput

# 如果隐形水印可用,则导入水印类
if is_invisible_watermark_available():
    from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker

# 导入多控制网模型类
from .multicontrolnet import MultiControlNetModel

# 创建一个记录器,用于记录模块中的日志
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,可能包含使用示例
EXAMPLE_DOC_STRING = """
Examples:
    ```py
    >>> # !pip install opencv-python transformers accelerate
    >>> # 导入必要的库
    >>> from diffusers import StableDiffusionXLControlNetPipeline, ControlNetModel, AutoencoderKL
    >>> from diffusers.utils import load_image  # 导入加载图像的工具
    >>> import numpy as np  # 导入 NumPy 库用于数组操作
    >>> import torch  # 导入 PyTorch 库用于深度学习

    >>> import cv2  # 导入 OpenCV 库用于计算机视觉操作
    >>> from PIL import Image  # 导入 PIL 库用于图像处理

    >>> # 定义生成图像的提示文本
    >>> prompt = "aerial view, a futuristic research complex in a bright foggy jungle, hard lighting"
    >>> # 定义负面提示文本以避免生成低质量图像
    >>> negative_prompt = "low quality, bad quality, sketches"

    >>> # 下载一张图像
    >>> image = load_image(
    ...     "https://hf.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/hf-logo.png"
    ... )  # 从指定URL加载图像

    >>> # 初始化模型和管道
    >>> controlnet_conditioning_scale = 0.5  # 设置控制网的条件缩放比例,推荐用于良好的泛化
    >>> controlnet = ControlNetModel.from_pretrained(
    ...     "diffusers/controlnet-canny-sdxl-1.0", torch_dtype=torch.float16
    ... )  # 从预训练模型加载控制网模型
    >>> vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16)  # 从预训练模型加载变分自编码器
    >>> pipe = StableDiffusionXLControlNetPipeline.from_pretrained(
    ...     "stabilityai/stable-diffusion-xl-base-1.0", controlnet=controlnet, vae=vae, torch_dtype=torch.float16
    ... )  # 从预训练模型加载稳定扩散管道并指定控制网和变分自编码器
    >>> pipe.enable_model_cpu_offload()  # 启用模型的 CPU 卸载以节省内存

    >>> # 获取 Canny 边缘检测图像
    >>> image = np.array(image)  # 将图像转换为 NumPy 数组
    >>> image = cv2.Canny(image, 100, 200)  # 使用 Canny 算法进行边缘检测
    >>> image = image[:, :, None]  # 将数组维度扩展以便于后续处理
    >>> image = np.concatenate([image, image, image], axis=2)  # 将单通道图像转换为三通道图像
    >>> canny_image = Image.fromarray(image)  # 从 NumPy 数组创建 PIL 图像

    >>> # 生成图像
    >>> image = pipe(
    ...     prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=canny_image
    ... ).images[0]  # 使用提示和 Canny 图像生成新图像,并提取结果
    ```  

从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 导入的 retrieve_timesteps 函数

def retrieve_timesteps(
# 调度器对象
scheduler,
# 推理步骤的数量,默认为 None
num_inference_steps: Optional[int] = None,
# 设备类型,默认为 None
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步列表,默认为 None
timesteps: Optional[List[int]] = None,
# 自定义 sigma 列表,默认为 None
sigmas: Optional[List[float]] = None,
# 其他关键字参数
**kwargs,
):
"""
调用调度器的 set_timesteps 方法,并在调用后从调度器中获取时间步。处理自定义时间步。任何关键字参数将传递给 scheduler.set_timesteps

参数:
    scheduler (`SchedulerMixin`):
        获取时间步的调度器。
    num_inference_steps (`int`):
        生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
    device (`str` 或 `torch.device`, *可选*):
        时间步要移动到的设备。如果为 `None`,则不移动时间步。
    timesteps (`List[int]`, *可选*):
        用于覆盖调度器的时间步间隔策略的自定义时间步。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
    sigmas (`List[float]`, *可选*):
        用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

返回:
    `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了时间步和 sigma
if timesteps is not None and sigmas is not None:
    # 抛出错误,提示只能选择一个
    raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传递了时间步
if timesteps is not None:
    # 检查调度器是否接受自定义时间步
    accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不接受,自定义时间步不被支持,抛出错误
    if not accepts_timesteps:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" timestep schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器设置时间步
    scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
    # 从调度器获取设置后的时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤的数量
    num_inference_steps = len(timesteps)
# 如果传递了 sigma
elif sigmas is not None:
    # 检查调度器是否接受自定义 sigma
    accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不接受,自定义 sigma 不被支持,抛出错误
    if not accept_sigmas:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" sigmas schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器设置 sigma
    scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
    # 从调度器获取设置后的时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤的数量
    num_inference_steps = len(timesteps)
# 否则分支,用于设置调度器的时间步
    else:
        # 调用调度器设置推理步数,并指定设备和其他参数
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取调度器中的时间步列表
        timesteps = scheduler.timesteps
    # 返回时间步列表和推理步数
    return timesteps, num_inference_steps

定义一个名为 StableDiffusionXLControlNetPipeline 的类,继承多个父类

class StableDiffusionXLControlNetPipeline(
# 继承 DiffusionPipeline 类
DiffusionPipeline,
# 继承 StableDiffusionMixin 类
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 类
TextualInversionLoaderMixin,
# 继承 StableDiffusionXLLoraLoaderMixin 类
StableDiffusionXLLoraLoaderMixin,
# 继承 IPAdapterMixin 类
IPAdapterMixin,
# 继承 FromSingleFileMixin 类
FromSingleFileMixin,
):
# 文档字符串,描述该管道的功能和用途
r"""
使用 Stable Diffusion XL 进行文本到图像生成,并结合 ControlNet 指导。

该模型继承自 [`DiffusionPipeline`]。有关所有管道的通用方法的文档,请查看超类文档
(下载、保存、在特定设备上运行等)。

该管道还继承以下加载方法:
    - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
    - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
    - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
    - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
    - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
# 文档字符串,说明函数参数及其类型和作用
Args:
    vae ([`AutoencoderKL`]):  # 变分自编码器模型,用于将图像编码和解码为潜在表示
        Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
    text_encoder ([`~transformers.CLIPTextModel`]):  # 冻结的文本编码器模型,使用 CLIP 进行文本处理
        Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
    text_encoder_2 ([`~transformers.CLIPTextModelWithProjection`]):  # 第二个冻结的文本编码器
        Second frozen text-encoder
        ([laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)).
    tokenizer ([`~transformers.CLIPTokenizer`]):  # 用于文本分词的 CLIP 分词器
        A `CLIPTokenizer` to tokenize text.
    tokenizer_2 ([`~transformers.CLIPTokenizer`]):  # 第二个用于文本分词的 CLIP 分词器
        A `CLIPTokenizer` to tokenize text.
    unet ([`UNet2DConditionModel`]):  # 用于去噪编码图像潜在表示的 UNet 模型
        A `UNet2DConditionModel` to denoise the encoded image latents.
    controlnet ([`ControlNetModel`] or `List[ControlNetModel]`):  # 提供额外条件以帮助 UNet 在去噪过程中
        Provides additional conditioning to the `unet` during the denoising process. If you set multiple
        ControlNets as a list, the outputs from each ControlNet are added together to create one combined
        additional conditioning.
    scheduler ([`SchedulerMixin`]):  # 用于与 UNet 结合使用的调度器,帮助去噪
        A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
        [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
    force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):  # 指定负提示嵌入是否应始终设为0
        Whether the negative prompt embeddings should always be set to 0. Also see the config of
        `stabilityai/stable-diffusion-xl-base-1-0`.
    add_watermarker (`bool`, *optional*):  # 指定是否使用水印库为输出图像添加水印
        Whether to use the [invisible_watermark](https://github.com/ShieldMnt/invisible-watermark/) library to
        watermark output images. If not defined, it defaults to `True` if the package is installed; otherwise no
        watermarker is used.
"""

# 有意不包含 controlnet,因为它会与 unet 进行迭代
model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"  # 定义模型的 CPU 卸载顺序
_optional_components = [  # 可选组件列表,包含可选使用的模型部分
    "tokenizer",  # 第一个分词器
    "tokenizer_2",  # 第二个分词器
    "text_encoder",  # 第一个文本编码器
    "text_encoder_2",  # 第二个文本编码器
    "feature_extractor",  # 特征提取器
    "image_encoder",  # 图像编码器
]
_callback_tensor_inputs = [  # 回调张量输入列表,定义输入的张量名称
    "latents",  # 潜在表示
    "prompt_embeds",  # 提示嵌入
    "negative_prompt_embeds",  # 负提示嵌入
    "add_text_embeds",  # 添加的文本嵌入
    "add_time_ids",  # 添加的时间 ID
    "negative_pooled_prompt_embeds",  # 负池化提示嵌入
    "negative_add_time_ids",  # 负添加时间 ID
]
# 初始化类的构造函数,接收多个参数
    def __init__(
        self,
        # VAE 模型,用于图像编码和解码
        vae: AutoencoderKL,
        # 文本编码器模型,用于将文本转换为特征
        text_encoder: CLIPTextModel,
        # 另一个文本编码器,带有投影层
        text_encoder_2: CLIPTextModelWithProjection,
        # 文本分词器,将文本分割为词元
        tokenizer: CLIPTokenizer,
        # 另一个文本分词器
        tokenizer_2: CLIPTokenizer,
        # UNet 模型,用于图像生成
        unet: UNet2DConditionModel,
        # 控制网络,可以是单个或多个模型
        controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel],
        # 调度器,用于控制生成过程
        scheduler: KarrasDiffusionSchedulers,
        # 是否强制空提示的输出为零
        force_zeros_for_empty_prompt: bool = True,
        # 是否添加水印的可选参数
        add_watermarker: Optional[bool] = None,
        # 特征提取器,用于处理图像特征
        feature_extractor: CLIPImageProcessor = None,
        # 图像编码器,带有投影层
        image_encoder: CLIPVisionModelWithProjection = None,
    ):
        # 调用父类的构造函数
        super().__init__()

        # 如果 controlnet 是列表或元组,则创建多控制网络模型
        if isinstance(controlnet, (list, tuple)):
            controlnet = MultiControlNetModel(controlnet)

        # 注册各个模块到当前对象
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            text_encoder_2=text_encoder_2,
            tokenizer=tokenizer,
            tokenizer_2=tokenizer_2,
            unet=unet,
            controlnet=controlnet,
            scheduler=scheduler,
            feature_extractor=feature_extractor,
            image_encoder=image_encoder,
        )
        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 初始化图像处理器,转换 RGB 图像
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
        # 初始化控制图像处理器,不进行归一化
        self.control_image_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
        )
        # 如果没有提供水印参数,则使用可用性检测
        add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()

        # 根据是否添加水印的条件初始化水印对象
        if add_watermarker:
            self.watermark = StableDiffusionXLWatermarker()
        else:
            self.watermark = None

        # 注册配置参数,处理空提示的设置
        self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)

    # 从 StableDiffusionXLPipeline 复制的编码提示方法
    def encode_prompt(
        # 输入的提示字符串
        prompt: str,
        # 可选的第二个提示字符串
        prompt_2: Optional[str] = None,
        # 可选的设备参数
        device: Optional[torch.device] = None,
        # 每个提示生成的图像数量
        num_images_per_prompt: int = 1,
        # 是否使用分类器自由引导
        do_classifier_free_guidance: bool = True,
        # 可选的负面提示字符串
        negative_prompt: Optional[str] = None,
        # 可选的第二个负面提示字符串
        negative_prompt_2: Optional[str] = None,
        # 可选的提示嵌入
        prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负面提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的聚合提示嵌入
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负面聚合提示嵌入
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的 LoRA 缩放因子
        lora_scale: Optional[float] = None,
        # 可选的跳过参数,用于剪辑
    # 从 StableDiffusionPipeline 复制的编码图像方法
# 定义一个方法用于编码图像,接收图像及其他参数
    def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
        # 获取图像编码器参数的数据类型
        dtype = next(self.image_encoder.parameters()).dtype

        # 如果输入的 image 不是张量,则使用特征提取器处理它
        if not isinstance(image, torch.Tensor):
            # 通过特征提取器将图像转换为张量,并返回像素值
            image = self.feature_extractor(image, return_tensors="pt").pixel_values

        # 将图像移动到指定设备,并转换为指定数据类型
        image = image.to(device=device, dtype=dtype)
        
        # 如果要求输出隐藏状态
        if output_hidden_states:
            # 使用图像编码器处理图像并获取倒数第二层的隐藏状态
            image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
            # 将隐藏状态在第一维上重复 num_images_per_prompt 次
            image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
            # 生成与图像大小相同的零张量,并获取其隐藏状态
            uncond_image_enc_hidden_states = self.image_encoder(
                torch.zeros_like(image), output_hidden_states=True
            ).hidden_states[-2]
            # 将未条件化的隐藏状态重复 num_images_per_prompt 次
            uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                num_images_per_prompt, dim=0
            )
            # 返回编码后的图像隐藏状态和未条件化的图像隐藏状态
            return image_enc_hidden_states, uncond_image_enc_hidden_states
        else:
            # 使用图像编码器处理图像并获取图像嵌入
            image_embeds = self.image_encoder(image).image_embeds
            # 将图像嵌入在第一维上重复 num_images_per_prompt 次
            image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
            # 创建与图像嵌入大小相同的零张量作为未条件化的图像嵌入
            uncond_image_embeds = torch.zeros_like(image_embeds)

            # 返回图像嵌入和未条件化的图像嵌入
            return image_embeds, uncond_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的方法
    def prepare_ip_adapter_image_embeds(
        # 定义方法的参数,包括适配器图像、图像嵌入、设备、每个提示的图像数量以及是否进行无分类器自由引导
        self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
    # 初始化一个空列表,用于存储图像嵌入
    image_embeds = []
    # 如果启用了分类器自由引导,则初始化一个空列表,用于存储负图像嵌入
    if do_classifier_free_guidance:
        negative_image_embeds = []
    # 检查 ip_adapter_image_embeds 是否为 None
    if ip_adapter_image_embeds is None:
        # 如果 ip_adapter_image 不是列表,则将其转换为列表
        if not isinstance(ip_adapter_image, list):
            ip_adapter_image = [ip_adapter_image]

        # 检查 ip_adapter_image 的长度是否与 IP 适配器的数量相同
        if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
            # 如果不相同,则抛出值错误
            raise ValueError(
                f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
            )

        # 遍历每个单独的 IP 适配器图像及其对应的图像投影层
        for single_ip_adapter_image, image_proj_layer in zip(
            ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
        ):
            # 判断输出隐藏状态是否为真,取决于图像投影层的类型
            output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
            # 对单个图像进行编码,获取嵌入和负嵌入
            single_image_embeds, single_negative_image_embeds = self.encode_image(
                single_ip_adapter_image, device, 1, output_hidden_state
            )

            # 将单个图像嵌入添加到图像嵌入列表中
            image_embeds.append(single_image_embeds[None, :])
            # 如果启用了分类器自由引导,则将负图像嵌入添加到负嵌入列表中
            if do_classifier_free_guidance:
                negative_image_embeds.append(single_negative_image_embeds[None, :])
    else:
        # 如果已经存在图像嵌入,则遍历这些嵌入
        for single_image_embeds in ip_adapter_image_embeds:
            # 如果启用了分类器自由引导,则分割单个嵌入为负嵌入和正嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                negative_image_embeds.append(single_negative_image_embeds)
            # 将单个嵌入添加到图像嵌入列表中
            image_embeds.append(single_image_embeds)

    # 初始化一个空列表,用于存储 IP 适配器图像嵌入
    ip_adapter_image_embeds = []
    # 遍历图像嵌入,索引从 i 开始
    for i, single_image_embeds in enumerate(image_embeds):
        # 将单个图像嵌入复制 num_images_per_prompt 次
        single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
        # 如果启用了分类器自由引导,则处理负嵌入
        if do_classifier_free_guidance:
            single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
            # 将负嵌入与正嵌入连接
            single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

        # 将单个图像嵌入移动到指定设备上
        single_image_embeds = single_image_embeds.to(device=device)
        # 将处理后的图像嵌入添加到列表中
        ip_adapter_image_embeds.append(single_image_embeds)

    # 返回处理后的 IP 适配器图像嵌入
    return ip_adapter_image_embeds

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
# 准备额外的步骤关键字参数,用于调度器的步骤,不同调度器的签名可能不同
def prepare_extra_step_kwargs(self, generator, eta):
    # eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
    # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
    # eta 的取值应在 [0, 1] 之间

    # 检查调度器的 step 方法是否接受 eta 参数
    accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 创建一个字典用于存储额外的步骤关键字参数
    extra_step_kwargs = {}
    # 如果接受 eta,则将其添加到字典中
    if accepts_eta:
        extra_step_kwargs["eta"] = eta

    # 检查调度器的 step 方法是否接受 generator 参数
    accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 如果接受 generator,则将其添加到字典中
    if accepts_generator:
        extra_step_kwargs["generator"] = generator
    # 返回包含额外参数的字典
    return extra_step_kwargs

# 检查输入参数的有效性和完整性
def check_inputs(
    self,
    prompt,  # 输入的提示词
    prompt_2,  # 第二个输入的提示词
    image,  # 输入的图像
    callback_steps,  # 回调步骤数
    negative_prompt=None,  # 可选的负面提示词
    negative_prompt_2=None,  # 第二个可选的负面提示词
    prompt_embeds=None,  # 提示词的嵌入表示
    negative_prompt_embeds=None,  # 负面提示词的嵌入表示
    pooled_prompt_embeds=None,  # 池化后的提示词嵌入表示
    ip_adapter_image=None,  # 输入适配器的图像
    ip_adapter_image_embeds=None,  # 输入适配器图像的嵌入表示
    negative_pooled_prompt_embeds=None,  # 负面池化提示词的嵌入表示
    controlnet_conditioning_scale=1.0,  # ControlNet 条件缩放因子,默认为 1.0
    control_guidance_start=0.0,  # ControlNet 指导开始的比例,默认为 0.0
    control_guidance_end=1.0,  # ControlNet 指导结束的比例,默认为 1.0
    callback_on_step_end_tensor_inputs=None,  # 步骤结束时的回调张量输入
# 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.check_image 复制
# 检查输入图像及其相关提示的类型和大小
    def check_image(self, image, prompt, prompt_embeds):
        # 判断输入图像是否为 PIL 图片类型
        image_is_pil = isinstance(image, PIL.Image.Image)
        # 判断输入图像是否为 PyTorch 张量类型
        image_is_tensor = isinstance(image, torch.Tensor)
        # 判断输入图像是否为 NumPy 数组类型
        image_is_np = isinstance(image, np.ndarray)
        # 判断输入是否为 PIL 图片列表
        image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
        # 判断输入是否为 PyTorch 张量列表
        image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
        # 判断输入是否为 NumPy 数组列表
        image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)

        # 如果输入不符合任何图像类型,抛出类型错误
        if (
            not image_is_pil
            and not image_is_tensor
            and not image_is_np
            and not image_is_pil_list
            and not image_is_tensor_list
            and not image_is_np_list
        ):
            raise TypeError(
                f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
            )

        # 如果输入为 PIL 图片,则批处理大小为 1
        if image_is_pil:
            image_batch_size = 1
        else:
            # 否则,获取输入图像的批处理大小
            image_batch_size = len(image)

        # 如果提示不为空且为字符串,批处理大小为 1
        if prompt is not None and isinstance(prompt, str):
            prompt_batch_size = 1
        # 如果提示为列表,批处理大小为列表长度
        elif prompt is not None and isinstance(prompt, list):
            prompt_batch_size = len(prompt)
        # 如果提示嵌入不为空,获取其批处理大小
        elif prompt_embeds is not None:
            prompt_batch_size = prompt_embeds.shape[0]

        # 如果图像批处理大小与提示批处理大小不一致,抛出值错误
        if image_batch_size != 1 and image_batch_size != prompt_batch_size:
            raise ValueError(
                f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
            )

    # 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.prepare_image 复制
    def prepare_image(
        self,
        image,
        width,
        height,
        batch_size,
        num_images_per_prompt,
        device,
        dtype,
        do_classifier_free_guidance=False,
        guess_mode=False,
    ):
        # 使用控制图像处理器预处理图像,并转换为浮点32类型
        image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)
        # 获取图像的批处理大小
        image_batch_size = image.shape[0]

        # 如果图像批处理大小为 1,则重复次数为批大小
        if image_batch_size == 1:
            repeat_by = batch_size
        else:
            # 否则,重复次数为每个提示的图像数量
            repeat_by = num_images_per_prompt

        # 按指定维度重复图像
        image = image.repeat_interleave(repeat_by, dim=0)

        # 将图像移动到指定设备并转换为指定类型
        image = image.to(device=device, dtype=dtype)

        # 如果启用分类器自由引导且未启用猜测模式,重复图像两次
        if do_classifier_free_guidance and not guess_mode:
            image = torch.cat([image] * 2)

        # 返回处理后的图像
        return image

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在变量,返回适当形状的张量
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义潜在变量的形状,包括批量大小和通道数
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器的数量是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果没有传入潜在变量,则随机生成
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果传入潜在变量,将其移动到指定设备
            latents = latents.to(device)

        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    # 从 StableDiffusionXLPipeline 复制的函数,获取附加时间 ID
    def _get_add_time_ids(
        self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
    ):
        # 创建附加时间 ID 列表,合并原始大小、裁剪坐标和目标大小
        add_time_ids = list(original_size + crops_coords_top_left + target_size)

        # 计算传递的附加嵌入维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取期望的附加嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查传递的维度与期望的维度是否匹配
        if expected_add_embed_dim != passed_add_embed_dim:
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将附加时间 ID 转换为张量
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 返回附加时间 ID 张量
        return add_time_ids

    # 从 StableDiffusionUpscalePipeline 复制的函数,升维 VAE
    def upcast_vae(self):
        # 获取 VAE 的数据类型
        dtype = self.vae.dtype
        # 将 VAE 转换为浮点32位
        self.vae.to(dtype=torch.float32)
        # 检查是否使用了 Torch 2.0 或 XFormers
        use_torch_2_0_or_xformers = isinstance(
            self.vae.decoder.mid_block.attentions[0].processor,
            (
                AttnProcessor2_0,
                XFormersAttnProcessor,
            ),
        )
        # 如果使用了 XFormers 或 Torch 2.0,注意力模块不需要保持浮点32位,节省内存
        if use_torch_2_0_or_xformers:
            self.vae.post_quant_conv.to(dtype)
            self.vae.decoder.conv_in.to(dtype)
            self.vae.decoder.mid_block.to(dtype)

    # 从 LatentConsistencyModelPipeline 复制的函数,获取引导缩放嵌入
# 定义一个方法,获取具有引导尺度的嵌入向量
def get_guidance_scale_embedding(
    # 输入的张量 w
    self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
    """
    参考链接:https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298

    参数:
        w (`torch.Tensor`):
            生成具有指定引导尺度的嵌入向量,以随后丰富时间步嵌入。
        embedding_dim (`int`, *可选*, 默认值为 512):
            要生成的嵌入的维度。
        dtype (`torch.dtype`, *可选*, 默认值为 `torch.float32`):
            生成的嵌入的数据类型。

    返回:
        `torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
    """
    # 确保输入张量 w 是一维的
    assert len(w.shape) == 1
    # 将 w 的值乘以 1000.0
    w = w * 1000.0

    # 计算嵌入维度的一半
    half_dim = embedding_dim // 2
    # 计算每个嵌入的基础值
    emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
    # 生成一个指数衰减的嵌入
    emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
    # 将 w 转换为指定的数据类型,并与嵌入相乘
    emb = w.to(dtype)[:, None] * emb[None, :]
    # 将正弦和余弦嵌入连接在一起
    emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
    # 如果嵌入维度为奇数,则在最后填充一个零
    if embedding_dim % 2 == 1:  # zero pad
        emb = torch.nn.functional.pad(emb, (0, 1))
    # 确保输出嵌入的形状为 (w.shape[0], embedding_dim)
    assert emb.shape == (w.shape[0], embedding_dim)
    # 返回生成的嵌入
    return emb

# 定义一个属性,用于获取引导尺度
@property
def guidance_scale(self):
    return self._guidance_scale

# 定义一个属性,用于获取剪切跳过的值
@property
def clip_skip(self):
    return self._clip_skip

# 定义一个属性,判断是否进行无分类器引导
# 此处的 `guidance_scale` 定义类似于 Imagen 论文中的引导权重 `w`(公式 (2))
# `guidance_scale = 1` 对应于不进行无分类器引导。
@property
def do_classifier_free_guidance(self):
    return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

# 定义一个属性,用于获取交叉注意力的参数
@property
def cross_attention_kwargs(self):
    return self._cross_attention_kwargs

# 定义一个属性,用于获取去噪结束的值
@property
def denoising_end(self):
    return self._denoising_end

# 定义一个属性,用于获取时间步的数量
@property
def num_timesteps(self):
    return self._num_timesteps

# 装饰器,禁止在这个方法内计算梯度
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的方法,接受多个参数以执行特定功能
    def __call__(
        # 提示信息,可以是字符串或字符串列表
        self,
        prompt: Union[str, List[str]] = None,
        # 第二个提示信息,选填,可以是字符串或字符串列表
        prompt_2: Optional[Union[str, List[str]]] = None,
        # 输入图像,可以是特定类型
        image: PipelineImageInput = None,
        # 输出图像的高度,选填
        height: Optional[int] = None,
        # 输出图像的宽度,选填
        width: Optional[int] = None,
        # 推理步骤的数量,默认值为50
        num_inference_steps: int = 50,
        # 定义时间步,选填,默认为None
        timesteps: List[int] = None,
        # 噪声标准差列表,选填,默认为None
        sigmas: List[float] = None,
        # 去噪结束的时间点,选填,默认为None
        denoising_end: Optional[float] = None,
        # 指导缩放因子,默认值为5.0
        guidance_scale: float = 5.0,
        # 负向提示信息,选填,可以是字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 第二个负向提示信息,选填,可以是字符串或字符串列表
        negative_prompt_2: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为1
        num_images_per_prompt: Optional[int] = 1,
        # 影响采样过程的参数,默认为0.0
        eta: float = 0.0,
        # 随机数生成器,选填,可以是单个或列表
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 先前生成的潜在表示,选填
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入,选填
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负向提示嵌入,选填
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 聚合后的提示嵌入,选填
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 负向聚合后的提示嵌入,选填
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 输入适配器图像,选填
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 输入适配器图像的嵌入,选填
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 输出类型,默认为"pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典格式的结果,默认为True
        return_dict: bool = True,
        # 跨注意力相关的参数,选填
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # ControlNet条件缩放因子,默认为1.0
        controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
        # 猜测模式,默认为False
        guess_mode: bool = False,
        # ControlNet指导开始时间,默认为0.0
        control_guidance_start: Union[float, List[float]] = 0.0,
        # ControlNet指导结束时间,默认为1.0
        control_guidance_end: Union[float, List[float]] = 1.0,
        # 原始图像大小,选填
        original_size: Tuple[int, int] = None,
        # 图像左上角的裁剪坐标,默认为(0, 0)
        crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 目标图像大小,选填
        target_size: Tuple[int, int] = None,
        # 负向图像的原始大小,选填
        negative_original_size: Optional[Tuple[int, int]] = None,
        # 负向图像的左上角裁剪坐标,默认为(0, 0)
        negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 负向图像的目标大小,选填
        negative_target_size: Optional[Tuple[int, int]] = None,
        # 跳过剪辑的参数,选填
        clip_skip: Optional[int] = None,
        # 在步骤结束时调用的回调函数,选填
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 在步骤结束时的张量输入回调,默认为["latents"]
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # 接收额外关键字参数
        **kwargs,