diffusers-源码解析-四十六-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(四十六)

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_depth2img.py

# 版权信息,声明该文件的所有权归 HuggingFace 团队所有
# 许可信息,指明该文件遵循 Apache License 2.0
# 说明在使用该文件时需遵循该许可证的条款
# 可通过下面的链接获取许可证
#     http://www.apache.org/licenses/LICENSE-2.0
# 除非适用法律要求或书面同意,否则该软件按“原样”提供,不附带任何明示或暗示的担保或条件
# 查看许可证以获取特定语言管理权限和限制的详细信息

import contextlib  # 导入上下文管理库,用于处理上下文
import inspect  # 导入检查库,用于获取对象的信息
from typing import Any, Callable, Dict, List, Optional, Union  # 导入类型提示相关工具

import numpy as np  # 导入 NumPy 库,用于数组和数值计算
import PIL.Image  # 导入 PIL 库中的 Image 模块,用于图像处理
import torch  # 导入 PyTorch 库,用于深度学习计算
from packaging import version  # 导入版本控制工具,用于处理版本信息
from transformers import CLIPTextModel, CLIPTokenizer, DPTForDepthEstimation, DPTImageProcessor  # 导入 Transformers 库中的模型和处理器

from ...configuration_utils import FrozenDict  # 从配置工具中导入 FrozenDict,用于不可变字典
from ...image_processor import PipelineImageInput, VaeImageProcessor  # 导入图像处理相关工具
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 导入加载器工具,用于模型加载
from ...models import AutoencoderKL, UNet2DConditionModel  # 导入模型类
from ...models.lora import adjust_lora_scale_text_encoder  # 导入 LoRA 调整工具
from ...schedulers import KarrasDiffusionSchedulers  # 导入调度器工具
from ...utils import PIL_INTERPOLATION, USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers  # 导入实用工具
from ...utils.torch_utils import randn_tensor  # 从 PyTorch 工具导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput  # 导入扩散管道和图像输出工具

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
    # 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        return encoder_output.latent_dist.sample(generator)  # 从潜在分布中采样
    # 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        return encoder_output.latent_dist.mode()  # 返回潜在分布的众数
    # 检查 encoder_output 是否具有 latents 属性
    elif hasattr(encoder_output, "latents"):
        return encoder_output.latents  # 返回潜在表示
    else:
        raise AttributeError("Could not access latents of provided encoder_output")  # 如果没有访问到潜在表示,抛出异常

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.preprocess 复制的函数
def preprocess(image):
    # 设置弃用提示信息
    deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
    # 调用弃用函数记录弃用信息
    deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
    # 检查输入是否为 PyTorch 张量
    if isinstance(image, torch.Tensor):
        return image  # 如果是张量,则直接返回
    # 检查输入是否为 PIL 图像
    elif isinstance(image, PIL.Image.Image):
        image = [image]  # 如果是图像,则将其封装为列表
    # 检查 image 列表的第一个元素是否为 PIL 的图像对象
    if isinstance(image[0], PIL.Image.Image):
        # 获取图像的宽度和高度
        w, h = image[0].size
        # 将宽度和高度调整为 8 的整数倍
        w, h = (x - x % 8 for x in (w, h))  # resize to integer multiple of 8

        # 将每个图像调整为新的宽高,并转换为 NumPy 数组,增加一个新的维度
        image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
        # 将所有图像沿第 0 维连接成一个大数组
        image = np.concatenate(image, axis=0)
        # 将数组转换为浮点数并归一化到 [0, 1] 范围
        image = np.array(image).astype(np.float32) / 255.0
        # 调整数组的维度顺序,从 (N, H, W, C) 转换为 (N, C, H, W)
        image = image.transpose(0, 3, 1, 2)
        # 将图像数据的值范围从 [0, 1] 变换到 [-1, 1]
        image = 2.0 * image - 1.0
        # 将 NumPy 数组转换为 PyTorch 张量
        image = torch.from_numpy(image)
    # 检查 image 列表的第一个元素是否为 PyTorch 张量
    elif isinstance(image[0], torch.Tensor):
        # 沿第 0 维连接所有 PyTorch 张量
        image = torch.cat(image, dim=0)
    # 返回处理后的图像
    return image
# 定义一个名为 StableDiffusionDepth2ImgPipeline 的类,继承多个混合类
class StableDiffusionDepth2ImgPipeline(DiffusionPipeline, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin):
    r"""
    使用稳定扩散进行基于深度的图像生成的管道,支持文本引导。

    该模型继承自 [`DiffusionPipeline`],可查看超类文档以了解所有管道实现的通用方法
    (下载、保存、在特定设备上运行等)。

    此管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重

    参数:
        vae ([`AutoencoderKL`]):
            用于将图像编码和解码为潜在表示的变分自编码器 (VAE) 模型。
        text_encoder ([`~transformers.CLIPTextModel`]):
            冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
        tokenizer ([`~transformers.CLIPTokenizer`]):
            用于对文本进行标记化的 `CLIPTokenizer`。
        unet ([`UNet2DConditionModel`]):
            用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
        scheduler ([`SchedulerMixin`]):
            用于与 `unet` 结合使用的调度器,以去噪编码图像潜在表示。可以是
            [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
    """

    # 定义模型 CPU 卸载顺序
    model_cpu_offload_seq = "text_encoder->unet->vae"
    # 定义需要作为回调的张量输入
    _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "depth_mask"]

    # 初始化方法,接受多个参数以配置管道
    def __init__(
        self,
        vae: AutoencoderKL,  # 变分自编码器模型
        text_encoder: CLIPTextModel,  # 文本编码器
        tokenizer: CLIPTokenizer,  # 文本标记化工具
        unet: UNet2DConditionModel,  # 去噪模型
        scheduler: KarrasDiffusionSchedulers,  # 调度器
        depth_estimator: DPTForDepthEstimation,  # 深度估计模型
        feature_extractor: DPTImageProcessor,  # 特征提取器
    # 定义构造函数
        ):
            # 调用父类的构造函数
            super().__init__()
    
            # 检查 unet 配置是否有 diffusers 版本属性,并判断其是否小于 0.9.0
            is_unet_version_less_0_9_0 = hasattr(unet.config, "_diffusers_version") and version.parse(
                version.parse(unet.config._diffusers_version).base_version
            ) < version.parse("0.9.0.dev0")
            # 检查 unet 配置的样本大小是否小于 64
            is_unet_sample_size_less_64 = hasattr(unet.config, "sample_size") and unet.config.sample_size < 64
            # 如果 unet 版本小于 0.9.0 且样本大小小于 64,则给出弃用警告
            if is_unet_version_less_0_9_0 and is_unet_sample_size_less_64:
                # 创建弃用消息,提示用户更新配置文件
                deprecation_message = (
                    "The configuration file of the unet has set the default `sample_size` to smaller than"
                    " 64 which seems highly unlikely .If you're checkpoint is a fine-tuned version of any of the"
                    " following: \n- CompVis/stable-diffusion-v1-4 \n- CompVis/stable-diffusion-v1-3 \n-"
                    " CompVis/stable-diffusion-v1-2 \n- CompVis/stable-diffusion-v1-1 \n- runwayml/stable-diffusion-v1-5"
                    " \n- runwayml/stable-diffusion-inpainting \n you should change 'sample_size' to 64 in the"
                    " configuration file. Please make sure to update the config accordingly as leaving `sample_size=32`"
                    " in the config might lead to incorrect results in future versions. If you have downloaded this"
                    " checkpoint from the Hugging Face Hub, it would be very nice if you could open a Pull request for"
                    " the `unet/config.json` file"
                )
                # 调用弃用函数,记录样本大小小于 64 的警告
                deprecate("sample_size<64", "1.0.0", deprecation_message, standard_warn=False)
                # 创建新的配置字典,修改样本大小为 64
                new_config = dict(unet.config)
                new_config["sample_size"] = 64
                # 更新 unet 的内部字典
                unet._internal_dict = FrozenDict(new_config)
    
            # 注册各个模块
            self.register_modules(
                vae=vae,
                text_encoder=text_encoder,
                tokenizer=tokenizer,
                unet=unet,
                scheduler=scheduler,
                depth_estimator=depth_estimator,
                feature_extractor=feature_extractor,
            )
            # 计算 VAE 的缩放因子
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建 VAE 图像处理器实例
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
    
        # 从 StableDiffusionPipeline 的 _encode_prompt 方法复制
        def _encode_prompt(
            self,
            prompt,
            device,
            num_images_per_prompt,
            do_classifier_free_guidance,
            negative_prompt=None,
            # 可选参数,用于嵌入提示和负面提示
            prompt_embeds: Optional[torch.Tensor] = None,
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            lora_scale: Optional[float] = None,
            # 接收额外的关键字参数
            **kwargs,
    # 结束括号,表示函数参数列表的结束
        ):
            # 警告信息,说明 `_encode_prompt()` 已被弃用,未来版本中将移除,建议使用 `encode_prompt()`
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            # 调用 deprecate 函数记录弃用信息,指定版本和自定义警告选项
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用 encode_prompt 方法生成提示嵌入元组,传入多个参数
            prompt_embeds_tuple = self.encode_prompt(
                prompt=prompt,  # 输入提示
                device=device,  # 设备信息
                num_images_per_prompt=num_images_per_prompt,  # 每个提示的图像数量
                do_classifier_free_guidance=do_classifier_free_guidance,  # 是否进行无分类器引导
                negative_prompt=negative_prompt,  # 负提示内容
                prompt_embeds=prompt_embeds,  # 提示嵌入
                negative_prompt_embeds=negative_prompt_embeds,  # 负提示嵌入
                lora_scale=lora_scale,  # Lora 缩放因子
                **kwargs,  # 其他关键字参数
            )
    
            # 连接嵌入元组中的两个部分,以支持向后兼容
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回组合后的提示嵌入
            return prompt_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
        def encode_prompt(
            self,
            prompt,  # 输入提示
            device,  # 设备信息
            num_images_per_prompt,  # 每个提示的图像数量
            do_classifier_free_guidance,  # 是否进行无分类器引导
            negative_prompt=None,  # 负提示内容,默认为 None
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负提示嵌入
            lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
            clip_skip: Optional[int] = None,  # 可选的剪裁跳过参数
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的函数
        def run_safety_checker(self, image, device, dtype):  # 安全检查器函数,检查输入图像的安全性
            # 检查安全检查器是否存在,如果不存在则将 nsfw 概念标记为 None
            if self.safety_checker is None:
                has_nsfw_concept = None
            else:
                # 如果图像是张量,使用图像处理器进行后处理并转换为 PIL 格式
                if torch.is_tensor(image):
                    feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
                else:
                    # 如果不是张量,则将 NumPy 数组转换为 PIL 图像
                    feature_extractor_input = self.image_processor.numpy_to_pil(image)
                # 使用特征提取器处理图像输入,并将其转换为指定设备的张量
                safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
                # 调用安全检查器,检查图像并返回处理后的图像和 nsfw 概念标记
                image, has_nsfw_concept = self.safety_checker(
                    images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
                )
            # 返回检查后的图像和 nsfw 概念标记
            return image, has_nsfw_concept
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的函数
    # 解码潜在变量的函数
    def decode_latents(self, latents):
        # 定义一个弃用警告信息,提示用户该方法将在未来版本中移除
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        # 调用弃用函数,发出警告,指明该方法弃用的版本
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)

        # 根据 VAE 的缩放因子调整潜在变量
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在变量,获取解码后的图像,返回的结果是一个元组,取第一个元素
        image = self.vae.decode(latents, return_dict=False)[0]
        # 对图像进行归一化处理,将值范围调整到 [0, 1]
        image = (image / 2 + 0.5).clamp(0, 1)
        # 将图像数据转为 float32 类型,便于与 bfloat16 兼容,且不会造成显著开销
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像数据
        return image

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的函数
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的参数,因为并非所有调度器的参数签名相同
        # eta (η) 仅用于 DDIMScheduler,其他调度器将忽略此参数
        # eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
        # eta 的取值应在 [0, 1] 之间

        # 检查调度器的 step 方法是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 创建一个字典以存放额外的步骤参数
        extra_step_kwargs = {}
        # 如果接受 eta,则将其添加到额外参数字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器的 step 方法是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,则将其添加到额外参数字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回包含额外参数的字典
        return extra_step_kwargs

    # 检查输入参数的函数
    def check_inputs(
        self,
        prompt,  # 输入的提示文本
        strength,  # 强度参数
        callback_steps,  # 回调步骤
        negative_prompt=None,  # 可选的负面提示文本
        prompt_embeds=None,  # 可选的提示嵌入
        negative_prompt_embeds=None,  # 可选的负面提示嵌入
        callback_on_step_end_tensor_inputs=None,  # 可选的回调输入
    ):
        # 检查 strength 是否在有效范围内 [0.0, 1.0]
        if strength < 0 or strength > 1:
            # 如果不在范围内,抛出值错误
            raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")

        # 检查 callback_steps 是否为正整数
        if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
            # 如果不是正整数,抛出值错误
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

        # 检查 callback_on_step_end_tensor_inputs 是否在允许的回调张量输入中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
        ):
            # 如果有不在允许输入中的项,抛出值错误
            raise ValueError(
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )
        
        # 检查同时提供 prompt 和 prompt_embeds 是否有效
        if prompt is not None and prompt_embeds is not None:
            # 抛出值错误,提醒只能提供其中一个
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        elif prompt is None and prompt_embeds is None:
            # 抛出值错误,提醒必须提供其中一个
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            # 检查 prompt 的类型是否有效
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查同时提供 negative_prompt 和 negative_prompt_embeds 是否有效
        if negative_prompt is not None and negative_prompt_embeds is not None:
            # 抛出值错误,提醒只能提供其中一个
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查 prompt_embeds 和 negative_prompt_embeds 的形状是否一致
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                # 抛出值错误,提醒两者形状必须一致
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.get_timesteps 复制的代码
    # 获取时间步,进行推理步骤的处理
    def get_timesteps(self, num_inference_steps, strength, device):
        # 计算初始时间步,取 num_inference_steps 与 strength 的乘积和 num_inference_steps 的最小值
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
    
        # 计算开始时间步,确保不小于0
        t_start = max(num_inference_steps - init_timestep, 0)
        # 从调度器中获取相应时间步的切片
        timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
        # 如果调度器有设置开始索引的方法,则调用它
        if hasattr(self.scheduler, "set_begin_index"):
            self.scheduler.set_begin_index(t_start * self.scheduler.order)
    
        # 返回时间步和剩余的推理步骤数
        return timesteps, num_inference_steps - t_start
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.prepare_latents 中复制的
    # 准备深度图,处理输入图像和深度图,适应批量大小及其他参数
    def prepare_depth_map(self, image, depth_map, batch_size, do_classifier_free_guidance, dtype, device):
        # 如果输入的图像是单个 PIL 图像,则将其转换为列表
        if isinstance(image, PIL.Image.Image):
            image = [image]
        else:
            # 如果输入是多个图像,则将其转换为列表
            image = list(image)

        # 检查图像的类型并获取宽度和高度
        if isinstance(image[0], PIL.Image.Image):
            width, height = image[0].size  # 从 PIL 图像中获取宽高
        elif isinstance(image[0], np.ndarray):
            width, height = image[0].shape[:-1]  # 从 numpy 数组中获取宽高
        else:
            height, width = image[0].shape[-2:]  # 从其他格式中获取宽高

        # 如果没有提供深度图,则计算深度图
        if depth_map is None:
            # 使用特征提取器提取图像的像素值,并将其转换为张量
            pixel_values = self.feature_extractor(images=image, return_tensors="pt").pixel_values
            # 将像素值移动到指定的设备并转换为指定的数据类型
            pixel_values = pixel_values.to(device=device, dtype=dtype)
            # DPT-Hybrid 模型使用批量归一化层,不支持 fp16,因此使用自动混合精度
            if torch.backends.mps.is_available():
                autocast_ctx = contextlib.nullcontext()  # 创建一个空上下文
                logger.warning(
                    "The DPT-Hybrid model uses batch-norm layers which are not compatible with fp16, but autocast is not yet supported on MPS."
                )  # 记录警告
            else:
                # 在支持的设备上创建自动混合精度上下文
                autocast_ctx = torch.autocast(device.type, dtype=dtype)

            with autocast_ctx:  # 进入自动混合精度上下文
                # 使用深度估计器计算深度图
                depth_map = self.depth_estimator(pixel_values).predicted_depth
        else:
            # 如果提供了深度图,则将其移动到指定的设备和数据类型
            depth_map = depth_map.to(device=device, dtype=dtype)

        # 调整深度图的大小以适应 VAE 的缩放因子
        depth_map = torch.nn.functional.interpolate(
            depth_map.unsqueeze(1),  # 增加一个维度以适应插值操作
            size=(height // self.vae_scale_factor, width // self.vae_scale_factor),  # 目标大小
            mode="bicubic",  # 使用双三次插值
            align_corners=False,  # 不对齐角点
        )

        # 计算深度图的最小值和最大值
        depth_min = torch.amin(depth_map, dim=[1, 2, 3], keepdim=True)  # 获取深度图的最小值
        depth_max = torch.amax(depth_map, dim=[1, 2, 3], keepdim=True)  # 获取深度图的最大值
        # 将深度图归一化到 [-1, 1] 的范围
        depth_map = 2.0 * (depth_map - depth_min) / (depth_max - depth_min) - 1.0
        # 将深度图转换为指定的数据类型
        depth_map = depth_map.to(dtype)

        # 如果深度图的批量大小小于给定的批量大小,则重复深度图以匹配批量大小
        if depth_map.shape[0] < batch_size:
            repeat_by = batch_size // depth_map.shape[0]  # 计算重复次数
            depth_map = depth_map.repeat(repeat_by, 1, 1, 1)  # 重复深度图

        # 根据是否使用无分类器引导来调整深度图
        depth_map = torch.cat([depth_map] * 2) if do_classifier_free_guidance else depth_map
        # 返回处理后的深度图
        return depth_map

    # 返回指导缩放因子
    @property
    def guidance_scale(self):
        return self._guidance_scale

    # 返回剪辑跳过的参数
    @property
    def clip_skip(self):
        return self._clip_skip

    # 这里的 `guidance_scale` 定义类似于 Imagen 论文中公式 (2) 的指导权重 `w`
    # `guidance_scale = 1` 表示不使用无分类器引导
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1  # 判断是否使用无分类器引导

    # 返回交叉注意力的参数
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs

    # 返回时间步数
    @property
    def num_timesteps(self):
        return self._num_timesteps
    # 使用装饰器禁用梯度计算,以节省内存和计算资源
        @torch.no_grad()
        # 定义可调用方法,接受多个参数以生成图像
        def __call__(
            # 输入提示,字符串或字符串列表,决定生成内容
            self,
            prompt: Union[str, List[str]] = None,
            # 输入图像,类型为 PipelineImageInput,用于图像生成
            image: PipelineImageInput = None,
            # 深度图,类型为可选的 torch.Tensor,用于提供深度信息
            depth_map: Optional[torch.Tensor] = None,
            # 强度参数,决定生成的图像变化程度,默认为 0.8
            strength: float = 0.8,
            # 推理步骤数,决定生成过程的迭代次数,默认为 50
            num_inference_steps: Optional[int] = 50,
            # 指导比例,用于调整生成图像与提示的一致性,默认为 7.5
            guidance_scale: Optional[float] = 7.5,
            # 负向提示,字符串或字符串列表,提供生成限制条件
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: Optional[int] = 1,
            # 随机性参数,控制生成过程中的随机性,默认为 0.0
            eta: Optional[float] = 0.0,
            # 生成器,用于控制随机数生成的可选参数
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 提示的嵌入,类型为可选的 torch.Tensor,提供编码后的提示信息
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负向提示的嵌入,类型为可选的 torch.Tensor,提供编码后的负向提示信息
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 输出类型,默认为 "pil",指示生成结果的格式
            output_type: Optional[str] = "pil",
            # 返回字典标志,决定是否以字典形式返回结果,默认为 True
            return_dict: bool = True,
            # 交叉注意力的额外参数,可选字典类型
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 跳过的剪辑层数,可选整数,控制模型层的使用
            clip_skip: Optional[int] = None,
            # 每一步结束时的回调函数,接受步数、总步数和字典作为参数
            callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
            # 在回调函数中包含的张量输入名称,默认为 ["latents"]
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 额外的关键字参数,允许用户自定义输入
            **kwargs,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_image_variation.py

# 版权声明,标识文件的版权所有者和相关条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件按“原样”提供,
# 不附带任何明示或暗示的担保或条件。
# 请参阅许可证以了解适用的权限和限制。
#
# 导入 inspect 模块,用于检查函数签名和源代码
import inspect
# 从 typing 模块导入类型提示所需的类
from typing import Callable, List, Optional, Union

# 导入 PIL.Image 模块,用于处理图像
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入 version 模块用于处理版本信息
from packaging import version
# 导入 CLIP 相关的图像处理器和模型
from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection

# 从相对路径导入 FrozenDict 配置类
from ...configuration_utils import FrozenDict
# 从相对路径导入图像处理器
from ...image_processor import VaeImageProcessor
# 从相对路径导入自动编码器和 UNet 模型
from ...models import AutoencoderKL, UNet2DConditionModel
# 从相对路径导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从相对路径导入工具函数
from ...utils import deprecate, logging
# 从工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从当前目录导入稳定扩散管道输出类
from . import StableDiffusionPipelineOutput
# 从当前目录导入安全检查器
from .safety_checker import StableDiffusionSafetyChecker

# 创建日志记录器,便于记录调试信息和警告
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义一个类,用于生成图像变体,继承自扩散管道和稳定扩散混合类
class StableDiffusionImageVariationPipeline(DiffusionPipeline, StableDiffusionMixin):
    r"""
    管道用于从输入图像生成图像变体,使用稳定扩散模型。

    该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道的通用方法
    (下载、保存、在特定设备上运行等)。
    # 函数参数说明
    Args:
        vae ([`AutoencoderKL`]):  # 变分自编码器(VAE)模型,用于将图像编码为潜在表示,并从中解码图像。
            Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
        image_encoder ([`~transformers.CLIPVisionModelWithProjection`] ):  # 冻结的 CLIP 图像编码器,具体为 clip-vit-large-patch14。
            Frozen CLIP image-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        text_encoder ([`~transformers.CLIPTextModel`]):  # 冻结的文本编码器,具体为 clip-vit-large-patch14。
            Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        tokenizer ([`~transformers.CLIPTokenizer`]):  # 用于对文本进行分词的 CLIP 分词器。
            A `CLIPTokenizer` to tokenize text.
        unet ([`UNet2DConditionModel`]):  # 用于去噪已编码图像潜在表示的 UNet 模型。
            A `UNet2DConditionModel` to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):  # 用于与 UNet 结合使用以去噪已编码图像潜在表示的调度器,可以是 DDIMScheduler、LMSDiscreteScheduler 或 PNDMScheduler。
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        safety_checker ([`StableDiffusionSafetyChecker`]):  # 分类模块,用于评估生成的图像是否可能被认为是冒犯性或有害的。
            Classification module that estimates whether generated images could be considered offensive or harmful.
            Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
            about a model's potential harms.
        feature_extractor ([`~transformers.CLIPImageProcessor`]):  # CLIP 图像处理器,用于从生成的图像中提取特征;作为安全检查器的输入。
            A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
    """

    # TODO: feature_extractor 是必需的,以便编码图像(如果它们是 PIL 格式),
    # 如果管道没有 feature_extractor,我们应该给出描述性消息。
    _optional_components = ["safety_checker"]  # 可选组件列表,包含安全检查器。
    model_cpu_offload_seq = "image_encoder->unet->vae"  # 模型在 CPU 卸载时的顺序。
    _exclude_from_cpu_offload = ["safety_checker"]  # 在 CPU 卸载时排除的组件,安全检查器不会被卸载。

    def __init__(  # 初始化方法,定义类的构造函数。
        self,
        vae: AutoencoderKL,  # 传入变分自编码器实例。
        image_encoder: CLIPVisionModelWithProjection,  # 传入图像编码器实例。
        unet: UNet2DConditionModel,  # 传入 UNet 实例。
        scheduler: KarrasDiffusionSchedulers,  # 传入调度器实例。
        safety_checker: StableDiffusionSafetyChecker,  # 传入安全检查器实例。
        feature_extractor: CLIPImageProcessor,  # 传入图像处理器实例。
        requires_safety_checker: bool = True,  # 是否需要安全检查器的标志,默认值为 True。
    # 定义一个私有方法用于编码图像,接收多个参数
        def _encode_image(self, image, device, num_images_per_prompt, do_classifier_free_guidance):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 检查输入是否为张量,如果不是,则使用特征提取器处理图像
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(images=image, return_tensors="pt").pixel_values
    
            # 将图像转移到指定设备并转换为所需数据类型
            image = image.to(device=device, dtype=dtype)
            # 通过图像编码器生成图像嵌入
            image_embeddings = self.image_encoder(image).image_embeds
            # 增加一个维度以便于后续处理
            image_embeddings = image_embeddings.unsqueeze(1)
    
            # 针对每个提示生成图像嵌入的副本,使用适合 MPS 的方法
            bs_embed, seq_len, _ = image_embeddings.shape
            # 重复图像嵌入以匹配每个提示生成的图像数量
            image_embeddings = image_embeddings.repeat(1, num_images_per_prompt, 1)
            # 重新调整图像嵌入的形状
            image_embeddings = image_embeddings.view(bs_embed * num_images_per_prompt, seq_len, -1)
    
            # 如果需要无分类器引导,则创建零向量的负提示嵌入
            if do_classifier_free_guidance:
                negative_prompt_embeds = torch.zeros_like(image_embeddings)
    
                # 对于无分类器引导,我们需要进行两次前向传递
                # 这里将无条件和文本嵌入拼接到一个批次中,以避免两次前向传递
                image_embeddings = torch.cat([negative_prompt_embeds, image_embeddings])
    
            # 返回最终的图像嵌入
            return image_embeddings
    
        # 从 StableDiffusionPipeline 复制的方法,用于运行安全检查器
        def run_safety_checker(self, image, device, dtype):
            # 如果安全检查器未定义,则将标记设为 None
            if self.safety_checker is None:
                has_nsfw_concept = None
            else:
                # 检查图像是否为张量,如果是则处理为 PIL 格式
                if torch.is_tensor(image):
                    feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
                else:
                    feature_extractor_input = self.image_processor.numpy_to_pil(image)
                # 使用特征提取器处理图像并转移到指定设备
                safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
                # 运行安全检查器,返回处理后的图像和 NSFW 概念标记
                image, has_nsfw_concept = self.safety_checker(
                    images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
                )
            # 返回处理后的图像及 NSFW 概念标记
            return image, has_nsfw_concept
    
        # 从 StableDiffusionPipeline 复制的方法,用于解码潜在变量
        def decode_latents(self, latents):
            # 显示弃用提示,告知用户该方法将在未来版本中移除
            deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
            deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
            # 按照配置的缩放因子调整潜在变量
            latents = 1 / self.vae.config.scaling_factor * latents
            # 解码潜在变量以生成图像
            image = self.vae.decode(latents, return_dict=False)[0]
            # 归一化图像数据并限制其值在 0 到 1 之间
            image = (image / 2 + 0.5).clamp(0, 1)
            # 将图像转换为 float32 格式,以确保兼容性并避免显著开销
            image = image.cpu().permute(0, 2, 3, 1).float().numpy()
            # 返回解码后的图像
            return image
    
        # 从 StableDiffusionPipeline 复制的方法,用于准备额外步骤的关键字参数
    # 准备额外参数以便于调度器步骤,因不同调度器的签名可能不同
    def prepare_extra_step_kwargs(self, generator, eta):
        # 检查调度器步骤是否接受 eta 参数,eta 仅在 DDIMScheduler 中使用
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # eta 应该在 [0, 1] 范围内
    
        # 判断调度器步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外参数字典
        extra_step_kwargs = {}
        # 如果调度器接受 eta,添加到额外参数中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta
    
        # 检查调度器是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果调度器接受 generator,添加到额外参数中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回额外参数字典
        return extra_step_kwargs
    
    # 检查输入的有效性,包括图像、高度、宽度和回调步数
    def check_inputs(self, image, height, width, callback_steps):
        # 确保图像类型为 torch.Tensor 或 PIL.Image.Image 或图像列表
        if (
            not isinstance(image, torch.Tensor)
            and not isinstance(image, PIL.Image.Image)
            and not isinstance(image, list)
        ):
            raise ValueError(
                "`image` has to be of type `torch.Tensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is"
                f" {type(image)}"
            )
    
        # 确保高度和宽度都是8的倍数
        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
    
        # 确保回调步骤是正整数
        if (callback_steps is None) or (
            callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
        ):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
        def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
            # 定义潜在变量的形状
            shape = (
                batch_size,
                num_channels_latents,
                int(height) // self.vae_scale_factor,
                int(width) // self.vae_scale_factor,
            )
            # 如果传入的生成器列表长度与批量大小不匹配,抛出异常
            if isinstance(generator, list) and len(generator) != batch_size:
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 如果没有提供潜在变量,生成随机潜在变量
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 如果提供了潜在变量,将其移动到指定设备
                latents = latents.to(device)
    
            # 根据调度器要求的标准差缩放初始噪声
            latents = latents * self.scheduler.init_noise_sigma
            # 返回处理后的潜在变量
            return latents
    
        # 禁用梯度计算,以节省内存
        @torch.no_grad()
    # 定义一个可调用的方法,用于处理图像输入
        def __call__(
            self,
            # 输入图像,可以是单个 PIL 图片、图片列表或 PyTorch 张量
            image: Union[PIL.Image.Image, List[PIL.Image.Image], torch.Tensor],
            # 目标高度,可选参数
            height: Optional[int] = None,
            # 目标宽度,可选参数
            width: Optional[int] = None,
            # 推理步骤的数量,默认为 50
            num_inference_steps: int = 50,
            # 引导缩放因子,默认为 7.5
            guidance_scale: float = 7.5,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: Optional[int] = 1,
            # 噪声控制参数,默认为 0.0
            eta: float = 0.0,
            # 随机数生成器,默认为 None,可以是单个生成器或生成器列表
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 预定义的潜在张量,默认为 None
            latents: Optional[torch.Tensor] = None,
            # 输出类型,默认为 "pil"
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认为 True
            return_dict: bool = True,
            # 可选的回调函数,接收步骤、图像索引和张量
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
            # 回调函数调用的步数,默认为 1
            callback_steps: int = 1,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_img2img.py

# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,否则根据许可证分发的软件是按“原样”提供的,
# 不提供任何明示或暗示的担保或条件。
# 有关许可证下权限和限制的具体语言,请参阅许可证。

# 导入 inspect 模块,用于获取对象的签名和其他信息
import inspect
# 从 typing 模块导入类型注解,方便类型提示
from typing import Any, Callable, Dict, List, Optional, Union

# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 PIL 库,用于图像处理
import PIL.Image
# 导入 PyTorch 库,用于深度学习
import torch
# 导入版本管理工具,用于版本比较
from packaging import version
# 从 transformers 库导入 CLIP 相关模型和处理器
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection

# 导入回调函数相关的类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入配置相关的工具类
from ...configuration_utils import FrozenDict
# 导入图像处理相关的输入类
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 导入多种加载器混合类
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入模型相关的类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
# 导入 Lora 调整函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入调度器
from ...schedulers import KarrasDiffusionSchedulers
# 导入实用工具函数
from ...utils import (
    PIL_INTERPOLATION,  # PIL 图像插值方法
    USE_PEFT_BACKEND,   # 是否使用 PEFT 后端
    deprecate,          # 用于标记弃用功能
    logging,            # 日志记录工具
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 调整 Lora 层的比例
    unscale_lora_layers,  # 取消 Lora 层的比例
)
# 导入随机张量生成工具
from ...utils.torch_utils import randn_tensor
# 导入管道相关工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入输出类
from . import StableDiffusionPipelineOutput
# 导入安全检查器
from .safety_checker import StableDiffusionSafetyChecker

# 创建日志记录器实例,用于记录日志
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,展示用法示例
EXAMPLE_DOC_STRING = """
    Examples:
        ```py
        >>> import requests  # 导入 requests 库,用于发送 HTTP 请求
        >>> import torch  # 导入 PyTorch 库
        >>> from PIL import Image  # 从 PIL 导入图像处理类
        >>> from io import BytesIO  # 从 io 导入字节流处理类

        >>> from diffusers import StableDiffusionImg2ImgPipeline  # 导入图像到图像的稳定扩散管道

        >>> device = "cuda"  # 指定使用的设备为 GPU
        >>> model_id_or_path = "runwayml/stable-diffusion-v1-5"  # 指定模型 ID 或路径
        >>> pipe = StableDiffusionImg2ImgPipeline.from_pretrained(model_id_or_path, torch_dtype=torch.float16)  # 从预训练模型创建管道
        >>> pipe = pipe.to(device)  # 将管道转移到指定设备

        >>> url = "https://raw.githubusercontent.com/CompVis/stable-diffusion/main/assets/stable-samples/img2img/sketch-mountains-input.jpg"  # 图像 URL

        >>> response = requests.get(url)  # 发送 GET 请求以获取图像
        >>> init_image = Image.open(BytesIO(response.content)).convert("RGB")  # 打开图像并转换为 RGB 格式
        >>> init_image = init_image.resize((768, 512))  # 调整图像大小

        >>> prompt = "A fantasy landscape, trending on artstation"  # 设置生成图像的提示文本

        >>> images = pipe(prompt=prompt, image=init_image, strength=0.75, guidance_scale=7.5).images  # 生成图像
        >>> images[0].save("fantasy_landscape.png")  # 保存生成的图像
        ```py
"""

# 定义一个函数以检索潜在变量
def retrieve_latents(
    encoder_output: torch.Tensor,  # 输入的编码器输出张量
    generator: Optional[torch.Generator] = None,  # 可选的随机数生成器
    sample_mode: str = "sample"  # 采样模式,默认为“sample”
):
    # 检查 encoder_output 是否有 "latent_dist" 属性,并且采样模式为 "sample"
        if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
            # 从 latent_dist 中采样并返回结果
            return encoder_output.latent_dist.sample(generator)
        # 检查 encoder_output 是否有 "latent_dist" 属性,并且采样模式为 "argmax"
        elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
            # 返回 latent_dist 的众数作为结果
            return encoder_output.latent_dist.mode()
        # 检查 encoder_output 是否有 "latents" 属性
        elif hasattr(encoder_output, "latents"):
            # 直接返回 latents 属性的值
            return encoder_output.latents
        # 如果没有找到任何相关属性,抛出属性错误
        else:
            raise AttributeError("Could not access latents of provided encoder_output")
# 定义预处理图像的函数
def preprocess(image):
    # 定义弃用警告信息,说明该方法在未来的版本中将被移除
    deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
    # 调用弃用函数,传入方法名、版本号、警告信息及标准警告参数
    deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
    # 检查输入是否为 PyTorch 张量
    if isinstance(image, torch.Tensor):
        # 如果是张量,直接返回
        return image
    # 检查输入是否为 PIL 图像
    elif isinstance(image, PIL.Image.Image):
        # 将单个图像放入列表中
        image = [image]

    # 如果列表中的第一个元素是 PIL 图像
    if isinstance(image[0], PIL.Image.Image):
        # 获取图像的宽和高
        w, h = image[0].size
        # 将宽高调整为 8 的整数倍
        w, h = (x - x % 8 for x in (w, h))  # resize to integer multiple of 8

        # 对每个图像进行调整大小,并转为 numpy 数组,增加维度
        image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
        # 将所有图像在第一个维度上拼接
        image = np.concatenate(image, axis=0)
        # 将数组转换为浮点型并归一化到 [0, 1] 区间
        image = np.array(image).astype(np.float32) / 255.0
        # 调整维度顺序为 (批量, 通道, 高, 宽)
        image = image.transpose(0, 3, 1, 2)
        # 将值映射到 [-1, 1] 区间
        image = 2.0 * image - 1.0
        # 将 numpy 数组转换为 PyTorch 张量
        image = torch.from_numpy(image)
    # 如果列表中的第一个元素是 PyTorch 张量
    elif isinstance(image[0], torch.Tensor):
        # 在第一个维度上拼接所有张量
        image = torch.cat(image, dim=0)
    # 返回处理后的图像
    return image


# 定义从调度器获取时间步的函数
def retrieve_timesteps(
    scheduler,
    num_inference_steps: Optional[int] = None,
    device: Optional[Union[str, torch.device]] = None,
    timesteps: Optional[List[int]] = None,
    sigmas: Optional[List[float]] = None,
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法,并在调用后从调度器检索时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            从中获取时间步的调度器。
        num_inference_steps (`int`):
            生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`, *可选*):
            时间步要移动到的设备。如果为 `None`,则不移动时间步。
        timesteps (`List[int]`, *可选*):
            自定义时间步,用于覆盖调度器的时间步间隔策略。如果传入 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`, *可选*):
            自定义 sigma,用于覆盖调度器的时间步间隔策略。如果传入 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

    返回:
        `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步调度,第二个元素是推理步骤的数量。
    """
    # 检查是否同时传入了时间步和 sigma
    if timesteps is not None and sigmas is not None:
        # 如果同时传入,抛出错误
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
    # 检查 timesteps 是否为 None,确定是否需要使用自定义时间步
    if timesteps is not None:
        # 检查调度器的 set_timesteps 方法是否接受 timesteps 参数
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受 timesteps,则抛出 ValueError 异常
        if not accepts_timesteps:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 调用调度器的 set_timesteps 方法,设置自定义时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 获取调度器中的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 检查 sigmas 是否为 None,确定是否需要使用自定义 sigmas
    elif sigmas is not None:
        # 检查调度器的 set_timesteps 方法是否接受 sigmas 参数
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受 sigmas,则抛出 ValueError 异常
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 调用调度器的 set_timesteps 方法,设置自定义 sigmas
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 获取调度器中的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果 timesteps 和 sigmas 都为 None
    else:
        # 调用调度器的 set_timesteps 方法,使用推理步骤的数量
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取调度器中的时间步
        timesteps = scheduler.timesteps
    # 返回时间步和推理步骤的数量
    return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionImg2ImgPipeline 的类,继承多个混合类以实现功能
class StableDiffusionImg2ImgPipeline(
    # 继承自 DiffusionPipeline 类
    DiffusionPipeline,
    # 继承自 StableDiffusionMixin 类
    StableDiffusionMixin,
    # 继承自 TextualInversionLoaderMixin 类
    TextualInversionLoaderMixin,
    # 继承自 IPAdapterMixin 类
    IPAdapterMixin,
    # 继承自 StableDiffusionLoraLoaderMixin 类
    StableDiffusionLoraLoaderMixin,
    # 继承自 FromSingleFileMixin 类
    FromSingleFileMixin,
):
    # 文档字符串,描述该管道的功能和参数
    r"""
    Pipeline for text-guided image-to-image generation using Stable Diffusion.

    # 说明该模型继承自 DiffusionPipeline,提供通用方法的文档
    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
    implemented for all pipelines (downloading, saving, running on a particular device, etc.).

    # 说明该管道还继承了多个加载方法
    The pipeline also inherits the following loading methods:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
        - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters

    # 定义该类的参数及其功能
    Args:
        vae ([`AutoencoderKL`]):
            Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
        text_encoder ([`~transformers.CLIPTextModel`]):
            Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        tokenizer ([`~transformers.CLIPTokenizer`]):
            A `CLIPTokenizer` to tokenize text.
        unet ([`UNet2DConditionModel`]):
            A `UNet2DConditionModel` to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        safety_checker ([`StableDiffusionSafetyChecker`]):
            Classification module that estimates whether generated images could be considered offensive or harmful.
            Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
            about a model's potential harms.
        feature_extractor ([`~transformers.CLIPImageProcessor`]):
            A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
    """

    # 定义一个字符串,指定 CPU 卸载顺序
    model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
    # 定义一个可选组件列表
    _optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
    # 定义一个不参与 CPU 卸载的组件列表
    _exclude_from_cpu_offload = ["safety_checker"]
    # 定义一个用于回调的张量输入列表
    _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
    # 初始化类的构造函数,接收多个参数
        def __init__(
            self,
            # 变分自编码器模型
            vae: AutoencoderKL,
            # 文本编码器模型
            text_encoder: CLIPTextModel,
            # 词汇表处理器
            tokenizer: CLIPTokenizer,
            # 2D 条件生成模型
            unet: UNet2DConditionModel,
            # Karras 扩散调度器
            scheduler: KarrasDiffusionSchedulers,
            # 稳定扩散安全检查器
            safety_checker: StableDiffusionSafetyChecker,
            # 图像处理器
            feature_extractor: CLIPImageProcessor,
            # 可选的图像编码器模型
            image_encoder: CLIPVisionModelWithProjection = None,
            # 是否需要安全检查器,默认值为 True
            requires_safety_checker: bool = True,
        # 从 StableDiffusionPipeline 的 _encode_prompt 方法复制
        def _encode_prompt(
            self,
            # 输入的提示文本
            prompt,
            # 设备类型
            device,
            # 每个提示生成的图像数量
            num_images_per_prompt,
            # 是否使用分类器自由引导
            do_classifier_free_guidance,
            # 可选的负面提示文本
            negative_prompt=None,
            # 可选的提示嵌入
            prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的负面提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的 LoRA 缩放因子
            lora_scale: Optional[float] = None,
            # 其他关键字参数
            **kwargs,
        ):
            # 过时警告信息,提醒用户方法即将被删除
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            # 调用过时警告函数
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用 encode_prompt 方法,获取提示嵌入元组
            prompt_embeds_tuple = self.encode_prompt(
                # 输入的提示文本
                prompt=prompt,
                # 设备类型
                device=device,
                # 每个提示生成的图像数量
                num_images_per_prompt=num_images_per_prompt,
                # 是否使用分类器自由引导
                do_classifier_free_guidance=do_classifier_free_guidance,
                # 可选的负面提示文本
                negative_prompt=negative_prompt,
                # 可选的提示嵌入
                prompt_embeds=prompt_embeds,
                # 可选的负面提示嵌入
                negative_prompt_embeds=negative_prompt_embeds,
                # 可选的 LoRA 缩放因子
                lora_scale=lora_scale,
                # 其他关键字参数
                **kwargs,
            )
    
            # 将提示嵌入元组中的两个部分连接以便向后兼容
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回连接后的提示嵌入
            return prompt_embeds
    
        # 从 StableDiffusionPipeline 的 encode_prompt 方法复制
        def encode_prompt(
            self,
            # 输入的提示文本
            prompt,
            # 设备类型
            device,
            # 每个提示生成的图像数量
            num_images_per_prompt,
            # 是否使用分类器自由引导
            do_classifier_free_guidance,
            # 可选的负面提示文本
            negative_prompt=None,
            # 可选的提示嵌入
            prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的负面提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的 LoRA 缩放因子
            lora_scale: Optional[float] = None,
            # 可选的跳过的 CLIP 层数
            clip_skip: Optional[int] = None,
        # 从 StableDiffusionPipeline 的 encode_image 方法复制
    # 定义编码图像的函数,接受图像、设备、每个提示的图像数量和可选的隐藏状态输出
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 检查输入的图像是否为张量类型
            if not isinstance(image, torch.Tensor):
                # 使用特征提取器将图像转换为张量,并返回像素值
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像移动到指定设备,并转换为指定数据类型
            image = image.to(device=device, dtype=dtype)
            # 如果需要输出隐藏状态
            if output_hidden_states:
                # 获取图像编码器的隐藏状态的倒数第二层
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                # 将隐藏状态按提示数量进行重复
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 生成与输入图像形状相同的全零张量,并获取其隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                # 将无条件隐藏状态按提示数量进行重复
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回图像和无条件的隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 直接获取图像编码器的图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 将图像嵌入按提示数量进行重复
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建与图像嵌入形状相同的全零张量作为无条件嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回图像嵌入和无条件嵌入
                return image_embeds, uncond_image_embeds
    
        # 从稳定扩散管道复制的函数,用于准备适配器图像嵌入
        def prepare_ip_adapter_image_embeds(
            # 定义函数参数:适配器图像、适配器图像嵌入、设备、每个提示的图像数量和是否使用分类器自由引导
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    ):
        # 初始化一个空列表,用于存储图像嵌入
        image_embeds = []
        # 如果启用无分类器自由引导,初始化一个空列表存储负图像嵌入
        if do_classifier_free_guidance:
            negative_image_embeds = []
        # 如果输入适配器的图像嵌入为空
        if ip_adapter_image_embeds is None:
            # 检查输入适配器的图像是否为列表类型,如果不是,则转换为列表
            if not isinstance(ip_adapter_image, list):
                ip_adapter_image = [ip_adapter_image]

            # 检查输入适配器的图像数量与 IP 适配器数量是否匹配
            if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                raise ValueError(
                    # 如果不匹配,抛出值错误并给出相关信息
                    f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                )

            # 遍历每个输入适配器图像和对应的图像投影层
            for single_ip_adapter_image, image_proj_layer in zip(
                ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
            ):
                # 判断输出隐藏状态是否为图像投影层的实例
                output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                # 编码单个图像,获取其嵌入和负嵌入
                single_image_embeds, single_negative_image_embeds = self.encode_image(
                    single_ip_adapter_image, device, 1, output_hidden_state
                )

                # 将单个图像嵌入添加到列表中,增加一个维度
                image_embeds.append(single_image_embeds[None, :])
                # 如果启用无分类器自由引导,将负嵌入添加到列表中
                if do_classifier_free_guidance:
                    negative_image_embeds.append(single_negative_image_embeds[None, :])
        else:
            # 如果输入适配器图像嵌入不为空,遍历每个嵌入
            for single_image_embeds in ip_adapter_image_embeds:
                # 如果启用无分类器自由引导,拆分负嵌入和图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                    # 将负嵌入添加到列表中
                    negative_image_embeds.append(single_negative_image_embeds)
                # 将图像嵌入添加到列表中
                image_embeds.append(single_image_embeds)

        # 初始化一个空列表,用于存储适配器图像嵌入
        ip_adapter_image_embeds = []
        # 遍历图像嵌入及其索引
        for i, single_image_embeds in enumerate(image_embeds):
            # 将单个图像嵌入重复 num_images_per_prompt 次,并在维度 0 上连接
            single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
            # 如果启用无分类器自由引导,处理负嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                # 将负嵌入和正嵌入连接在一起
                single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

            # 将图像嵌入移动到指定设备上
            single_image_embeds = single_image_embeds.to(device=device)
            # 将处理后的图像嵌入添加到列表中
            ip_adapter_image_embeds.append(single_image_embeds)

        # 返回适配器图像嵌入列表
        return ip_adapter_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
    # 定义一个方法来运行安全检查器,接受图像、设备和数据类型作为参数
    def run_safety_checker(self, image, device, dtype):
        # 如果安全检查器未定义,则没有不安全内容的概念
        if self.safety_checker is None:
            has_nsfw_concept = None
        else:
            # 如果输入图像是一个张量
            if torch.is_tensor(image):
                # 将图像处理为 PIL 格式以供特征提取
                feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
            else:
                # 如果图像不是张量,则将其从 NumPy 格式转换为 PIL 格式
                feature_extractor_input = self.image_processor.numpy_to_pil(image)
            # 提取特征,并将其转移到指定设备
            safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
            # 运行安全检查器,返回处理后的图像和是否存在不安全内容的概念
            image, has_nsfw_concept = self.safety_checker(
                images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
            )
        # 返回处理后的图像和不安全内容的概念
        return image, has_nsfw_concept

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
    # 定义一个方法来解码潜在向量
    def decode_latents(self, latents):
        # 定义弃用警告信息
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        # 发出弃用警告
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)

        # 根据 VAE 配置的缩放因子调整潜在向量
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在向量,返回字典中的第一个元素(图像)
        image = self.vae.decode(latents, return_dict=False)[0]
        # 将图像的像素值归一化到 [0, 1] 范围内
        image = (image / 2 + 0.5).clamp(0, 1)
        # 将图像转换为 float32 格式以兼容 bfloat16
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
    # 定义一个方法来准备额外的调度器步骤关键字参数
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的关键字参数,因为并非所有调度器的签名相同
        # eta(η)仅在 DDIMScheduler 中使用,其他调度器将忽略该参数。
        # eta 对应于 DDIM 论文中的 η,范围应在 [0, 1] 之间

        # 检查调度器步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外的步骤关键字参数字典
        extra_step_kwargs = {}
        if accepts_eta:
            # 如果接受 eta,则将其添加到字典中
            extra_step_kwargs["eta"] = eta

        # 检查调度器是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:
            # 如果接受 generator,则将其添加到字典中
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外步骤关键字参数
        return extra_step_kwargs

    # 定义一个方法来检查输入参数的有效性
    def check_inputs(
        self,
        prompt,
        strength,
        callback_steps,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        ip_adapter_image=None,
        ip_adapter_image_embeds=None,
        callback_on_step_end_tensor_inputs=None,
    # 定义获取时间步的方法,参数包括推理步骤数量、强度和设备类型
    def get_timesteps(self, num_inference_steps, strength, device):
        # 计算初始时间步,取num_inference_steps和num_inference_steps * strength的最小值
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

        # 计算时间步开始的位置,确保不小于0
        t_start = max(num_inference_steps - init_timestep, 0)
        # 从调度器中获取时间步,从t_start开始到结束
        timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
        # 如果调度器有设置开始索引的方法,则调用它
        if hasattr(self.scheduler, "set_begin_index"):
            self.scheduler.set_begin_index(t_start * self.scheduler.order)

        # 返回时间步和剩余推理步骤数量
        return timesteps, num_inference_steps - t_start

    # 从指定的文本到图像管道中复制的方法,用于获取引导比例嵌入
    def get_guidance_scale_embedding(
        self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
    ) -> torch.Tensor:
        """
        参考文献链接,获取引导比例嵌入。

        参数:
            w (`torch.Tensor`):
                使用指定的引导比例生成嵌入向量,以丰富时间步嵌入。
            embedding_dim (`int`, *可选*, 默认为512):
                生成的嵌入的维度。
            dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
                生成的嵌入的数据类型。

        返回:
            `torch.Tensor`: 形状为`(len(w), embedding_dim)`的嵌入向量。
        """
        # 确保输入张量的形状是一维的
        assert len(w.shape) == 1
        # 将w乘以1000.0以调整比例
        w = w * 1000.0

        # 计算嵌入的半维度
        half_dim = embedding_dim // 2
        # 计算用于嵌入的缩放因子
        emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
        # 生成指数衰减的嵌入
        emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
        # 扩展w并计算最终的嵌入
        emb = w.to(dtype)[:, None] * emb[None, :]
        # 将正弦和余弦嵌入连接在一起
        emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
        # 如果嵌入维度为奇数,进行零填充
        if embedding_dim % 2 == 1:  # zero pad
            emb = torch.nn.functional.pad(emb, (0, 1))
        # 确保嵌入的形状与预期相符
        assert emb.shape == (w.shape[0], embedding_dim)
        # 返回生成的嵌入
        return emb

    # 定义引导比例属性,返回私有变量
    @property
    def guidance_scale(self):
        return self._guidance_scale

    # 定义跳过剪辑属性,返回私有变量
    @property
    def clip_skip(self):
        return self._clip_skip

    # 定义是否进行无分类器引导的属性,基于引导比例和UNet配置
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

    # 定义交叉注意力参数的属性,返回私有变量
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs

    # 定义时间步数属性,返回私有变量
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 定义中断属性,返回私有变量
    @property
    def interrupt(self):
        return self._interrupt

    # 该方法不计算梯度,并替换示例文档字符串
    @torch.no_grad()
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义一个可调用的类方法,用于处理生成图像的请求
        def __call__(
            # 提示文本,可以是单个字符串或字符串列表
            self,
            prompt: Union[str, List[str]] = None,
            # 输入的图像,通常用于处理图像生成
            image: PipelineImageInput = None,
            # 生成强度的参数,默认值为0.8
            strength: float = 0.8,
            # 推理步骤的数量,默认值为50
            num_inference_steps: Optional[int] = 50,
            # 采样的时间步,通常用于控制生成过程
            timesteps: List[int] = None,
            # 噪声水平的列表,用于控制生成图像的随机性
            sigmas: List[float] = None,
            # 指导比例,控制生成图像的多样性,默认值为7.5
            guidance_scale: Optional[float] = 7.5,
            # 负面提示文本,可以是单个字符串或字符串列表,用于避免某些特征
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量,默认值为1
            num_images_per_prompt: Optional[int] = 1,
            # 生成过程中使用的超参数,默认值为0.0
            eta: Optional[float] = 0.0,
            # 随机数生成器,可以是单个生成器或生成器列表
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 预先计算的提示嵌入,用于加速生成
            prompt_embeds: Optional[torch.Tensor] = None,
            # 预先计算的负面提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 用于图像适配器的输入图像
            ip_adapter_image: Optional[PipelineImageInput] = None,
            # 图像适配器的输入图像嵌入列表
            ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
            # 输出类型,默认值为"pil",表示返回PIL图像
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认值为True
            return_dict: bool = True,
            # 交叉注意力的额外参数字典
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 跳过的clip层数,用于调整模型的特征提取
            clip_skip: int = None,
            # 结束步骤时的回调函数,可以是单个函数或多个回调的组合
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 结束步骤时的张量输入的列表,默认值为["latents"]
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 其他额外的参数
            **kwargs,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_inpaint.py

# 版权信息,声明该代码的所有权和许可信息
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)许可;
# 除非符合许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用的法律要求或书面同意,否则根据许可证分发的软件是按“原样”基础分发的,
# 不附有任何形式的明示或暗示的担保或条件。
# 有关许可证的具体条款和权限限制,请参阅许可证。

# 导入 inspect 模块,用于获取对象的签名和信息
import inspect
# 导入类型相关的类,用于类型注解
from typing import Any, Callable, Dict, List, Optional, Union

# 导入图像处理库 PIL
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入版本管理工具
from packaging import version
# 导入 Hugging Face Transformers 中的相关类
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection

# 导入其他模块和类,涉及回调、配置、图像处理等
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
from ...configuration_utils import FrozenDict
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
from ...models import AsymmetricAutoencoderKL, AutoencoderKL, ImageProjection, UNet2DConditionModel
from ...models.lora import adjust_lora_scale_text_encoder
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
from ...utils.torch_utils import randn_tensor
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyChecker

# 初始化日志记录器,以当前模块的名称为标识
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义函数,从编码器输出中检索潜在变量
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents
def retrieve_latents(
    encoder_output: torch.Tensor,  # 输入的编码器输出,类型为张量
    generator: Optional[torch.Generator] = None,  # 可选的随机数生成器
    sample_mode: str = "sample"  # 采样模式,默认为 "sample"
):
    # 如果编码器输出有潜在分布且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从潜在分布中采样并返回结果
        return encoder_output.latent_dist.sample(generator)
    # 如果编码器输出有潜在分布且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回潜在分布的众数
        return encoder_output.latent_dist.mode()
    # 如果编码器输出有潜在变量
    elif hasattr(encoder_output, "latents"):
        # 直接返回潜在变量
        return encoder_output.latents
    # 如果以上条件都不满足,抛出属性错误
    else:
        raise AttributeError("Could not access latents of provided encoder_output")

# 定义函数,从调度器中检索时间步
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
    scheduler,  # 调度器对象
    num_inference_steps: Optional[int] = None,  # 可选的推理步骤数量
    device: Optional[Union[str, torch.device]] = None,  # 可选的设备信息
    timesteps: Optional[List[int]] = None,  # 可选的时间步列表
    sigmas: Optional[List[float]] = None,  # 可选的 sigma 值列表
    **kwargs,  # 其他可选参数
):
    """
    调用调度器的 `set_timesteps` 方法,并在调用后从调度器中检索时间步。
    处理自定义时间步。任何其他参数都将传递给 `scheduler.set_timesteps`。
    # 定义函数参数的文档字符串
    Args:
        scheduler (`SchedulerMixin`):
            # 调度器,用于获取时间步
            The scheduler to get timesteps from.
        num_inference_steps (`int`):
            # 生成样本时使用的扩散步骤数量,如果使用此参数,则 `timesteps` 必须为 `None`
            The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
            must be `None`.
        device (`str` or `torch.device`, *optional*):
            # 要将时间步移动到的设备,如果为 `None`,则时间步不移动
            The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
        timesteps (`List[int]`, *optional*):
            # 自定义时间步,用于覆盖调度器的时间步间隔策略,如果提供了 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`
            Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
            `num_inference_steps` and `sigmas` must be `None`.
        sigmas (`List[float]`, *optional*):
            # 自定义 sigma 值,用于覆盖调度器的时间步间隔策略,如果提供了 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`
            Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
            `num_inference_steps` and `timesteps` must be `None`.

    Returns:
        # 返回一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步骤的数量
        `Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
        second element is the number of inference steps.
    """
    # 检查是否同时传入了 `timesteps` 和 `sigmas`
    if timesteps is not None and sigmas is not None:
        # 如果同时存在,则引发错误
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
    
    # 检查是否传入了 `timesteps`
    if timesteps is not None:
        # 检查调度器是否接受自定义时间步
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持,则引发错误
        if not accepts_timesteps:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 设置调度器的时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 获取调度器的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    
    # 检查是否传入了 `sigmas`
    elif sigmas is not None:
        # 检查调度器是否接受自定义 sigma 值
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持,则引发错误
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 设置调度器的 sigma 值
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 获取调度器的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    
    # 如果都没有传入,则使用默认推理步骤设置时间步
    else:
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取调度器的时间步
        timesteps = scheduler.timesteps
    
    # 返回时间步和推理步骤数量
    return timesteps, num_inference_steps
# 定义一个用于文本引导图像修复的管道类,继承自多个基类
class StableDiffusionInpaintPipeline(
    # 继承自 DiffusionPipeline,提供通用的管道功能
    DiffusionPipeline,
    # 继承自 StableDiffusionMixin,增加稳定扩散特性
    StableDiffusionMixin,
    # 继承自 TextualInversionLoaderMixin,支持文本反转加载
    TextualInversionLoaderMixin,
    # 继承自 IPAdapterMixin,支持 IP 适配器加载
    IPAdapterMixin,
    # 继承自 StableDiffusionLoraLoaderMixin,支持 LoRA 权重加载
    StableDiffusionLoraLoaderMixin,
    # 继承自 FromSingleFileMixin,支持从单个文件加载
    FromSingleFileMixin,
):
    # 文档字符串,描述管道的功能和参数
    r"""
    Pipeline for text-guided image inpainting using Stable Diffusion.

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
    implemented for all pipelines (downloading, saving, running on a particular device, etc.).

    The pipeline also inherits the following loading methods:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
        - [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files

    Args:
        vae ([`AutoencoderKL`, `AsymmetricAutoencoderKL`]):
            Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
        text_encoder ([`CLIPTextModel`]):
            Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        tokenizer ([`~transformers.CLIPTokenizer`]):
            A `CLIPTokenizer` to tokenize text.
        unet ([`UNet2DConditionModel`]):
            A `UNet2DConditionModel` to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
            [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
        safety_checker ([`StableDiffusionSafetyChecker`]):
            Classification module that estimates whether generated images could be considered offensive or harmful.
            Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
            about a model's potential harms.
        feature_extractor ([`~transformers.CLIPImageProcessor`]):
            A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
    """

    # 定义模型的 CPU 卸载顺序,指定组件的处理顺序
    model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
    # 可选组件列表,包含安全检查器和特征提取器
    _optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
    # 排除 CPU 卸载的组件,安全检查器不会被卸载
    _exclude_from_cpu_offload = ["safety_checker"]
    # 定义需要回调的张量输入列表,包含潜在变量和提示嵌入
    _callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "mask", "masked_image_latents"]
    # 初始化方法,用于创建类的实例
        def __init__(
            self,
            vae: Union[AutoencoderKL, AsymmetricAutoencoderKL],  # VAE模型,支持两种类型
            text_encoder: CLIPTextModel,  # 文本编码器,处理文本输入
            tokenizer: CLIPTokenizer,  # 分词器,将文本转换为标记
            unet: UNet2DConditionModel,  # UNet模型,处理生成任务
            scheduler: KarrasDiffusionSchedulers,  # 调度器,用于调整生成过程
            safety_checker: StableDiffusionSafetyChecker,  # 安全检查器,确保生成内容的安全性
            feature_extractor: CLIPImageProcessor,  # 特征提取器,处理图像输入
            image_encoder: CLIPVisionModelWithProjection = None,  # 可选图像编码器,用于图像的额外处理
            requires_safety_checker: bool = True,  # 是否需要安全检查器的标志
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制
        def _encode_prompt(
            self,
            prompt,  # 输入的提示文本
            device,  # 设备信息(CPU或GPU)
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否执行无分类器引导
            negative_prompt=None,  # 可选的负面提示文本
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
            lora_scale: Optional[float] = None,  # 可选的LORA缩放因子
            **kwargs,  # 其他可选参数
        ):
            # 弃用消息,提示用户该方法将来会被移除
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            # 发出弃用警告
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用 encode_prompt 方法并获取嵌入元组
            prompt_embeds_tuple = self.encode_prompt(
                prompt=prompt,  # 提示文本
                device=device,  # 设备信息
                num_images_per_prompt=num_images_per_prompt,  # 每个提示生成的图像数量
                do_classifier_free_guidance=do_classifier_free_guidance,  # 无分类器引导标志
                negative_prompt=negative_prompt,  # 负面提示文本
                prompt_embeds=prompt_embeds,  # 提示嵌入
                negative_prompt_embeds=negative_prompt_embeds,  # 负面提示嵌入
                lora_scale=lora_scale,  # LORA缩放因子
                **kwargs,  # 其他参数
            )
    
            # 将嵌入元组的内容拼接以兼容旧版
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回拼接后的提示嵌入
            return prompt_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制
        def encode_prompt(
            self,
            prompt,  # 输入的提示文本
            device,  # 设备信息(CPU或GPU)
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否执行无分类器引导
            negative_prompt=None,  # 可选的负面提示文本
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
            lora_scale: Optional[float] = None,  # 可选的LORA缩放因子
            clip_skip: Optional[int] = None,  # 可选的剪切参数
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制
    # 定义一个方法用于编码图像,接收图像、设备、每个提示的图像数量和可选的隐藏状态输出
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 如果输入的图像不是张量类型,则使用特征提取器处理图像并返回张量格式
            if not isinstance(image, torch.Tensor):
                image = self.feature_extractor(image, return_tensors="pt").pixel_values
    
            # 将图像转移到指定设备,并转换为正确的数据类型
            image = image.to(device=device, dtype=dtype)
            # 如果需要输出隐藏状态
            if output_hidden_states:
                # 编码图像并获取倒数第二层的隐藏状态
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
                # 将隐藏状态在第0维上重复,数量为每个提示的图像数量
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
                # 对一个全零的图像进行编码以获取无条件图像的隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]
                # 同样在第0维上重复无条件图像的隐藏状态
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                    num_images_per_prompt, dim=0
                )
                # 返回有条件和无条件的隐藏状态
                return image_enc_hidden_states, uncond_image_enc_hidden_states
            else:
                # 如果不需要输出隐藏状态,直接编码图像并获取图像嵌入
                image_embeds = self.image_encoder(image).image_embeds
                # 在第0维上重复图像嵌入,数量为每个提示的图像数量
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
                # 创建与图像嵌入相同形状的全零张量作为无条件图像嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)
    
                # 返回有条件和无条件的图像嵌入
                return image_embeds, uncond_image_embeds
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的方法,用于准备图像嵌入
        def prepare_ip_adapter_image_embeds(
            # 接收输入适配器图像、适配器图像嵌入、设备、每个提示的图像数量和分类器自由引导的开关
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    # 定义函数体的结束
    ):
        # 初始化一个空列表以存储图像嵌入
        image_embeds = []
        # 如果启用无分类器自由引导,则初始化负图像嵌入列表
        if do_classifier_free_guidance:
            negative_image_embeds = []
        # 如果输入适配器图像嵌入为空
        if ip_adapter_image_embeds is None:
            # 检查输入适配器图像是否为列表,如果不是则转换为列表
            if not isinstance(ip_adapter_image, list):
                ip_adapter_image = [ip_adapter_image]

            # 检查输入适配器图像与 IP 适配器数量是否匹配
            if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                # 如果不匹配,则抛出值错误
                raise ValueError(
                    f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                )

            # 遍历输入适配器图像与图像投影层的组合
            for single_ip_adapter_image, image_proj_layer in zip(
                ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
            ):
                # 判断输出隐藏状态是否为 True
                output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                # 编码单个图像以获取图像嵌入和负图像嵌入
                single_image_embeds, single_negative_image_embeds = self.encode_image(
                    single_ip_adapter_image, device, 1, output_hidden_state
                )

                # 将图像嵌入添加到列表中
                image_embeds.append(single_image_embeds[None, :])
                # 如果启用无分类器自由引导,则添加负图像嵌入
                if do_classifier_free_guidance:
                    negative_image_embeds.append(single_negative_image_embeds[None, :])
        else:
            # 遍历现有的输入适配器图像嵌入
            for single_image_embeds in ip_adapter_image_embeds:
                # 如果启用无分类器自由引导,则分割嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                    # 将负图像嵌入添加到列表中
                    negative_image_embeds.append(single_negative_image_embeds)
                # 将图像嵌入添加到列表中
                image_embeds.append(single_image_embeds)

        # 初始化一个空列表以存储 IP 适配器图像嵌入
        ip_adapter_image_embeds = []
        # 遍历图像嵌入和它们的索引
        for i, single_image_embeds in enumerate(image_embeds):
            # 将单个图像嵌入按数量扩展
            single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
            # 如果启用无分类器自由引导,则扩展负图像嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                # 将负图像嵌入与正图像嵌入合并
                single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

            # 将图像嵌入移动到指定设备
            single_image_embeds = single_image_embeds.to(device=device)
            # 将处理后的图像嵌入添加到列表中
            ip_adapter_image_embeds.append(single_image_embeds)

        # 返回 IP 适配器图像嵌入列表
        return ip_adapter_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的代码
    # 运行安全检查器以确保图像符合安全标准
        def run_safety_checker(self, image, device, dtype):
            # 检查安全检查器是否存在
            if self.safety_checker is None:
                # 如果没有安全检查器,设置NSFW概念为None
                has_nsfw_concept = None
            else:
                # 如果输入是张量,则处理为PIL格式
                if torch.is_tensor(image):
                    feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
                else:
                    # 如果输入是numpy数组,将其转换为PIL格式
                    feature_extractor_input = self.image_processor.numpy_to_pil(image)
                # 提取特征并将其传输到指定设备
                safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
                # 运行安全检查器,并返回处理后的图像和NSFW概念
                image, has_nsfw_concept = self.safety_checker(
                    images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
                )
            # 返回处理后的图像和NSFW概念
            return image, has_nsfw_concept
    
        # 从StableDiffusionPipeline复制的函数,准备额外的调度器步骤参数
        def prepare_extra_step_kwargs(self, generator, eta):
            # 为调度器步骤准备额外的关键字参数,因为并非所有调度器具有相同的签名
            # eta仅在DDIM调度器中使用,其他调度器会忽略
            # eta对应于DDIM论文中的η,应在[0, 1]之间
    
            # 检查调度器是否接受eta参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 创建额外步骤参数的字典
            extra_step_kwargs = {}
            if accepts_eta:
                # 如果接受eta,则将其添加到字典中
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器是否接受generator参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            if accepts_generator:
                # 如果接受generator,则将其添加到字典中
                extra_step_kwargs["generator"] = generator
            # 返回额外步骤参数字典
            return extra_step_kwargs
    
        # 检查输入参数的有效性和完整性
        def check_inputs(
            self,
            prompt,
            image,
            mask_image,
            height,
            width,
            strength,
            callback_steps,
            output_type,
            negative_prompt=None,
            prompt_embeds=None,
            negative_prompt_embeds=None,
            ip_adapter_image=None,
            ip_adapter_image_embeds=None,
            callback_on_step_end_tensor_inputs=None,
            padding_mask_crop=None,
        # 准备潜在变量的函数
        def prepare_latents(
            self,
            batch_size,
            num_channels_latents,
            height,
            width,
            dtype,
            device,
            generator,
            latents=None,
            image=None,
            timestep=None,
            is_strength_max=True,
            return_noise=False,
            return_image_latents=False,
    ):
        # 定义形状,包含批次大小、通道数、调整后的高度和宽度
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器列表的长度是否与批次大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 验证图像和时间步是否提供,且强度不为最大值
        if (image is None or timestep is None) and not is_strength_max:
            raise ValueError(
                "Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
                "However, either the image or the noise timestep has not been provided."
            )

        # 根据条件处理图像潜在变量
        if return_image_latents or (latents is None and not is_strength_max):
            # 将图像移动到指定设备和数据类型
            image = image.to(device=device, dtype=dtype)

            # 如果图像有4个通道,则直接使用图像潜在变量
            if image.shape[1] == 4:
                image_latents = image
            else:
                # 使用 VAE 编码图像以获取潜在变量
                image_latents = self._encode_vae_image(image=image, generator=generator)
            # 根据批次大小重复潜在变量
            image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)

        # 如果潜在变量为空,则生成噪声
        if latents is None:
            noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            # 根据强度初始化潜在变量
            latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
            # 如果强度为最大值,则按调度器的初始 sigma 缩放潜在变量
            latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
        else:
            # 将现有潜在变量转换到设备上
            noise = latents.to(device)
            # 按调度器的初始 sigma 缩放潜在变量
            latents = noise * self.scheduler.init_noise_sigma

        # 输出结果,包括潜在变量
        outputs = (latents,)

        # 如果需要返回噪声,则添加噪声到输出
        if return_noise:
            outputs += (noise,)

        # 如果需要返回图像潜在变量,则添加到输出
        if return_image_latents:
            outputs += (image_latents,)

        # 返回最终输出
        return outputs

    # 编码 VAE 图像以获取潜在变量
    def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
        # 检查生成器是否为列表
        if isinstance(generator, list):
            # 为每个图像批次编码潜在变量并检索
            image_latents = [
                retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
                for i in range(image.shape[0])
            ]
            # 将所有潜在变量合并为一个张量
            image_latents = torch.cat(image_latents, dim=0)
        else:
            # 使用单个生成器编码图像并检索潜在变量
            image_latents = retrieve_latents(self.vae.encode(image), generator=generator)

        # 按配置的缩放因子缩放潜在变量
        image_latents = self.vae.config.scaling_factor * image_latents

        # 返回缩放后的潜在变量
        return image_latents

    # 准备掩膜潜在变量的方法
    def prepare_mask_latents(
        self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
    ):
        # 将掩码调整为与潜在特征图的形状相同,以便将掩码与潜在特征图拼接
        # 在转换数据类型之前进行此操作,以避免在使用 cpu_offload 和半精度时出现问题
        mask = torch.nn.functional.interpolate(
            # 使用插值方法调整掩码的大小,目标尺寸为根据 VAE 缩放因子调整后的高度和宽度
            mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
        )
        # 将掩码移动到指定设备,并转换为指定数据类型
        mask = mask.to(device=device, dtype=dtype)

        # 将掩码图像移动到指定设备,并转换为指定数据类型
        masked_image = masked_image.to(device=device, dtype=dtype)

        # 检查掩码图像的通道数是否为4
        if masked_image.shape[1] == 4:
            # 如果是4通道,则将其直接赋值给潜在特征图
            masked_image_latents = masked_image
        else:
            # 否则,使用 VAE 编码掩码图像以获取潜在特征图
            masked_image_latents = self._encode_vae_image(masked_image, generator=generator)

        # 针对每个提示生成,重复掩码和潜在特征图以适应批量大小,使用对 MPS 友好的方法
        if mask.shape[0] < batch_size:
            # 如果掩码的数量小于批量大小,则检查批量大小是否能被掩码数量整除
            if not batch_size % mask.shape[0] == 0:
                # 如果不能整除,抛出值错误
                raise ValueError(
                    "传入的掩码与所需的批量大小不匹配。掩码应复制到"
                    f" 总批量大小 {batch_size},但传入了 {mask.shape[0]} 个掩码。请确保传入的掩码数量"
                    " 能被所请求的总批量大小整除。"
                )
            # 通过重复掩码,调整其数量以匹配批量大小
            mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)
        # 同样检查潜在特征图的数量
        if masked_image_latents.shape[0] < batch_size:
            # 检查批量大小是否能被潜在特征图数量整除
            if not batch_size % masked_image_latents.shape[0] == 0:
                # 如果不能整除,抛出值错误
                raise ValueError(
                    "传入的图像与所需的批量大小不匹配。图像应复制到"
                    f" 总批量大小 {batch_size},但传入了 {masked_image_latents.shape[0]} 个图像。"
                    " 请确保传入的图像数量能被所请求的总批量大小整除。"
                )
            # 通过重复潜在特征图,调整其数量以匹配批量大小
            masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)

        # 如果启用分类器自由引导,则重复掩码以进行拼接
        mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
        # 如果启用分类器自由引导,则重复潜在特征图以进行拼接
        masked_image_latents = (
            torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
        )

        # 确保潜在特征图的设备与潜在模型输入一致,以避免设备错误
        masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)
        # 返回掩码和潜在特征图
        return mask, masked_image_latents

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.get_timesteps 复制
    # 定义获取时间步长的函数,接受推理步数、强度和设备作为参数
    def get_timesteps(self, num_inference_steps, strength, device):
        # 根据给定的推理步数和强度计算初始时间步
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

        # 计算时间步开始的索引,确保不小于零
        t_start = max(num_inference_steps - init_timestep, 0)
        # 从调度器中获取时间步长,从 t_start 开始
        timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
        # 如果调度器具有设置开始索引的方法,则调用该方法
        if hasattr(self.scheduler, "set_begin_index"):
            self.scheduler.set_begin_index(t_start * self.scheduler.order)

        # 返回时间步长和剩余的推理步数
        return timesteps, num_inference_steps - t_start

    # 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 中复制的函数
    def get_guidance_scale_embedding(
        self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
    ) -> torch.Tensor:
        """
        参考链接: https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298

        参数:
            w (`torch.Tensor`):
                生成带有指定引导尺度的嵌入向量,以后丰富时间步嵌入。
            embedding_dim (`int`, *可选*, 默认为 512):
                要生成的嵌入维度。
            dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
                生成的嵌入的数据类型。

        返回:
            `torch.Tensor`: 嵌入向量,形状为 `(len(w), embedding_dim)`。
        """
        # 确保输入的张量 w 是一维的
        assert len(w.shape) == 1
        # 将 w 扩大 1000 倍
        w = w * 1000.0

        # 计算嵌入维度的一半
        half_dim = embedding_dim // 2
        # 计算嵌入的基数
        emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
        # 计算指数衰减的嵌入值
        emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
        # 将 w 转换为目标数据类型,并与嵌入值相乘
        emb = w.to(dtype)[:, None] * emb[None, :]
        # 将正弦和余弦值连接成最终嵌入
        emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
        # 如果嵌入维度是奇数,则在最后填充一个零
        if embedding_dim % 2 == 1:  # zero pad
            emb = torch.nn.functional.pad(emb, (0, 1))
        # 确保最终的嵌入形状是正确的
        assert emb.shape == (w.shape[0], embedding_dim)
        # 返回计算得到的嵌入
        return emb

    # 获取引导尺度的属性
    @property
    def guidance_scale(self):
        return self._guidance_scale

    # 获取剪辑跳过的属性
    @property
    def clip_skip(self):
        return self._clip_skip

    # 判断是否进行无分类器引导的属性
    # 这里的 `guidance_scale` 类似于公式 (2) 中的引导权重 `w`
    # 参见 Imagen 论文: https://arxiv.org/pdf/2205.11487.pdf 。`guidance_scale = 1`
    # 表示不进行分类器自由引导。
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

    # 获取交叉注意力的关键字参数
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs

    # 获取时间步数的属性
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 获取中断状态的属性
    @property
    def interrupt(self):
        return self._interrupt

    # 在计算梯度时不追踪
    @torch.no_grad()
    # 定义一个可调用的类方法,允许传入多个参数
        def __call__(
            # 提示信息,可以是字符串或字符串列表
            self,
            prompt: Union[str, List[str]] = None,
            # 输入图像,用于处理的管道图像
            image: PipelineImageInput = None,
            # 用于掩蔽的图像
            mask_image: PipelineImageInput = None,
            # 掩蔽图像的潜在表示,Tensor 类型
            masked_image_latents: torch.Tensor = None,
            # 输出图像的高度,默认为 None
            height: Optional[int] = None,
            # 输出图像的宽度,默认为 None
            width: Optional[int] = None,
            # 填充掩码裁剪的大小,默认为 None
            padding_mask_crop: Optional[int] = None,
            # 强度参数,默认为 1.0
            strength: float = 1.0,
            # 推理步骤的数量,默认为 50
            num_inference_steps: int = 50,
            # 预定义的时间步列表,默认为 None
            timesteps: List[int] = None,
            # sigma 值列表,默认为 None
            sigmas: List[float] = None,
            # 指导缩放因子,默认为 7.5
            guidance_scale: float = 7.5,
            # 负提示信息,可以是字符串或字符串列表,默认为 None
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: Optional[int] = 1,
            # eta 参数,默认为 0.0
            eta: float = 0.0,
            # 随机数生成器,可以是单个或列表,默认为 None
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 潜在表示,默认为 None
            latents: Optional[torch.Tensor] = None,
            # 提示的嵌入表示,默认为 None
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负提示的嵌入表示,默认为 None
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 输入适配器图像,默认为 None
            ip_adapter_image: Optional[PipelineImageInput] = None,
            # 输入适配器图像的嵌入列表,默认为 None
            ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
            # 输出类型,默认为 "pil"
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认为 True
            return_dict: bool = True,
            # 交叉注意力的参数字典,默认为 None
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 跳过的剪辑步骤数,默认为 None
            clip_skip: int = None,
            # 在每个步骤结束时的回调函数,默认为 None
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 回调时输入的张量名称列表,默认为 ["latents"]
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 额外的关键字参数
            **kwargs,

.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_instruct_pix2pix.py

# 版权声明,包含版权信息和许可证使用条款
# Copyright 2024 The InstructPix2Pix Authors and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版授权
# 您不得在不遵守许可证的情况下使用此文件。
# 您可以在以下网址获取许可证的副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件在许可证下分发
# 是基于“按现状”原则,没有任何明示或暗示的担保或条件。
# 查看许可证以获取特定的权限和限制。

import inspect  # 导入 inspect 模块,用于获取有关活跃对象的信息
from typing import Any, Callable, Dict, List, Optional, Union  # 导入类型提示相关的类

import numpy as np  # 导入 numpy,常用的数值计算库
import PIL.Image  # 导入 PIL.Image,处理图像的库
import torch  # 导入 PyTorch,深度学习框架
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection  # 导入 transformers 库中的 CLIP 相关类

from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 导入回调类
from ...image_processor import PipelineImageInput, VaeImageProcessor  # 导入图像处理相关类
from ...loaders import IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 导入加载器混合类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel  # 导入模型类
from ...schedulers import KarrasDiffusionSchedulers  # 导入调度器类
from ...utils import PIL_INTERPOLATION, deprecate, logging  # 导入实用工具类
from ...utils.torch_utils import randn_tensor  # 导入随机张量生成工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 导入管道工具类
from . import StableDiffusionPipelineOutput  # 导入管道输出类
from .safety_checker import StableDiffusionSafetyChecker  # 导入安全检查器类

logger = logging.get_logger(__name__)  # 创建一个日志记录器,用于记录信息,禁用 pylint 名称无效警告

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.preprocess 复制的函数
def preprocess(image):
    # 设置弃用警告消息,指示预处理方法已弃用
    deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
    # 调用弃用函数,显示警告
    deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
    # 如果输入是张量,则直接返回
    if isinstance(image, torch.Tensor):
        return image
    # 如果输入是 PIL 图像,则将其封装在列表中
    elif isinstance(image, PIL.Image.Image):
        image = [image]

    # 如果列表中的第一个元素是 PIL 图像
    if isinstance(image[0], PIL.Image.Image):
        w, h = image[0].size  # 获取图像的宽和高
        # 将宽高调整为 8 的整数倍
        w, h = (x - x % 8 for x in (w, h))  # resize to integer multiple of 8

        # 将图像调整为新的宽高,并转换为 NumPy 数组,增加一个维度
        image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
        # 将所有图像沿第0维合并
        image = np.concatenate(image, axis=0)
        # 将数组转换为 float32 类型并归一化到 [0, 1] 范围
        image = np.array(image).astype(np.float32) / 255.0
        # 调整数组维度顺序,从 (N, H, W, C) 转为 (N, C, H, W)
        image = image.transpose(0, 3, 1, 2)
        # 将像素值范围调整到 [-1, 1]
        image = 2.0 * image - 1.0
        # 将 NumPy 数组转换为 PyTorch 张量
        image = torch.from_numpy(image)
    # 如果输入是张量列表
    elif isinstance(image[0], torch.Tensor):
        # 将多个张量沿第0维合并
        image = torch.cat(image, dim=0)
    # 返回处理后的图像
    return image


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
    # 检查 encoder_output 是否具有 "latent_dist" 属性,并且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从 latent_dist 中进行采样,使用指定的生成器
        return encoder_output.latent_dist.sample(generator)
    # 检查 encoder_output 是否具有 "latent_dist" 属性,并且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回 latent_dist 的众数
        return encoder_output.latent_dist.mode()
    # 检查 encoder_output 是否具有 "latents" 属性
    elif hasattr(encoder_output, "latents"):
        # 返回 latents 属性的值
        return encoder_output.latents
    # 如果以上条件都不满足,则抛出 AttributeError
    else:
        raise AttributeError("Could not access latents of provided encoder_output")
# 定义一个用于像素级图像编辑的管道类,继承多个混合类以实现功能
class StableDiffusionInstructPix2PixPipeline(
    # 继承 DiffusionPipeline 类以获得基础功能
    DiffusionPipeline,
    # 继承 StableDiffusionMixin 以获取稳定扩散相关功能
    StableDiffusionMixin,
    # 继承 TextualInversionLoaderMixin 以支持文本反转加载
    TextualInversionLoaderMixin,
    # 继承 StableDiffusionLoraLoaderMixin 以支持 LoRA 权重加载和保存
    StableDiffusionLoraLoaderMixin,
    # 继承 IPAdapterMixin 以支持 IP 适配器加载
    IPAdapterMixin,
):
    r"""
    管道用于通过遵循文本指令进行像素级图像编辑(基于稳定扩散)。

    该模型从 [`DiffusionPipeline`] 继承。有关所有管道实现的通用方法(下载、保存、在特定设备上运行等)的文档,请查看超类文档。

    此管道还继承以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
        - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器

    参数:
        vae ([`AutoencoderKL`]):
            用于将图像编码和解码为潜在表示的变分自编码器(VAE)模型。
        text_encoder ([`~transformers.CLIPTextModel`]):
            冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
        tokenizer ([`~transformers.CLIPTokenizer`]):
            用于对文本进行分词的 `CLIPTokenizer`。
        unet ([`UNet2DConditionModel`]):
            用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
        scheduler ([`SchedulerMixin`]):
            与 `unet` 结合使用的调度器,用于去噪编码图像潜在表示。可以是
            [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
        safety_checker ([`StableDiffusionSafetyChecker`]):
            分类模块,估计生成的图像是否可能被认为是冒犯性或有害的。
            有关模型潜在危害的更多详细信息,请参阅 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
        feature_extractor ([`~transformers.CLIPImageProcessor`]):
            用于从生成的图像中提取特征的 `CLIPImageProcessor`;作为输入用于 `safety_checker`。
    """

    # 定义模型在 CPU 上的卸载顺序
    model_cpu_offload_seq = "text_encoder->unet->vae"
    # 定义可选组件列表
    _optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
    # 定义从 CPU 卸载时排除的组件
    _exclude_from_cpu_offload = ["safety_checker"]
    # 定义回调张量输入
    _callback_tensor_inputs = ["latents", "prompt_embeds", "image_latents"]

    # 构造函数,初始化管道所需的各个组件
    def __init__(
        # 初始化变分自编码器(VAE)
        self,
        vae: AutoencoderKL,
        # 初始化文本编码器
        text_encoder: CLIPTextModel,
        # 初始化分词器
        tokenizer: CLIPTokenizer,
        # 初始化去噪模型
        unet: UNet2DConditionModel,
        # 初始化调度器
        scheduler: KarrasDiffusionSchedulers,
        # 初始化安全检查器
        safety_checker: StableDiffusionSafetyChecker,
        # 初始化特征提取器
        feature_extractor: CLIPImageProcessor,
        # 可选图像编码器
        image_encoder: Optional[CLIPVisionModelWithProjection] = None,
        # 是否需要安全检查器的标志
        requires_safety_checker: bool = True,
    # 初始化父类
        ):
            super().__init__()
    
            # 检查安全检查器是否为 None,并且需要安全检查器时发出警告
            if safety_checker is None and requires_safety_checker:
                logger.warning(
                    # 警告信息,提醒用户禁用安全检查器的风险和使用条款
                    f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
                    " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
                    " results in services or applications open to the public. Both the diffusers team and Hugging Face"
                    " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
                    " it only for use-cases that involve analyzing network behavior or auditing its results. For more"
                    " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
                )
    
            # 检查安全检查器不为 None 时,特征提取器必须定义
            if safety_checker is not None and feature_extractor is None:
                raise ValueError(
                    # 抛出异常,提示用户需要定义特征提取器
                    "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
                    " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
                )
    
            # 注册各个模块,以便后续使用
            self.register_modules(
                vae=vae,
                text_encoder=text_encoder,
                tokenizer=tokenizer,
                unet=unet,
                scheduler=scheduler,
                safety_checker=safety_checker,
                feature_extractor=feature_extractor,
                image_encoder=image_encoder,
            )
            # 计算 VAE 的缩放因子
            self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
            # 创建图像处理器
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
            # 将配置注册到类中,指明是否需要安全检查器
            self.register_to_config(requires_safety_checker=requires_safety_checker)
    
        # 装饰器,指示该函数不需要计算梯度
        @torch.no_grad()
        def __call__(
            # 输入参数,包括提示文本、图像、推理步骤等
            prompt: Union[str, List[str]] = None,
            image: PipelineImageInput = None,
            num_inference_steps: int = 100,
            guidance_scale: float = 7.5,
            image_guidance_scale: float = 1.5,
            negative_prompt: Optional[Union[str, List[str]]] = None,
            num_images_per_prompt: Optional[int] = 1,
            eta: float = 0.0,
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            latents: Optional[torch.Tensor] = None,
            prompt_embeds: Optional[torch.Tensor] = None,
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            ip_adapter_image: Optional[PipelineImageInput] = None,
            ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
            output_type: Optional[str] = "pil",
            return_dict: bool = True,
            # 回调函数定义,处理步骤结束时的操作
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 定义步骤结束时的张量输入
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 交叉注意力的额外参数
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 接收额外参数
            **kwargs,
    # 定义一个编码提示的私有方法
        def _encode_prompt(
            self,  # 方法的第一个参数,表示调用该方法时传入的提示文本
            prompt,  # 提示文本
            device,  # 设备类型(如 CPU 或 GPU)
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否使用无分类器的引导
            negative_prompt=None,  # 可选的负提示文本
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入,类型为 Torch 张量
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负提示嵌入,类型为 Torch 张量
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制而来
        def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):  # 定义一个编码图像的方法
            dtype = next(self.image_encoder.parameters()).dtype  # 获取图像编码器参数的数据类型
    
            if not isinstance(image, torch.Tensor):  # 如果图像不是 Torch 张量
                image = self.feature_extractor(image, return_tensors="pt").pixel_values  # 使用特征提取器将图像转换为张量
    
            image = image.to(device=device, dtype=dtype)  # 将图像移动到指定设备并转换为相应的数据类型
            if output_hidden_states:  # 如果需要输出隐藏状态
                image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]  # 编码图像并获取倒数第二层的隐藏状态
                image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)  # 根据每个提示的图像数量重复隐藏状态
                uncond_image_enc_hidden_states = self.image_encoder(  # 编码零图像以获取无条件隐藏状态
                    torch.zeros_like(image), output_hidden_states=True
                ).hidden_states[-2]  # 获取倒数第二层的隐藏状态
                uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(  # 根据每个提示的图像数量重复无条件隐藏状态
                    num_images_per_prompt, dim=0
                )
                return image_enc_hidden_states, uncond_image_enc_hidden_states  # 返回编码后的隐藏状态
            else:  # 如果不需要输出隐藏状态
                image_embeds = self.image_encoder(image).image_embeds  # 编码图像并获取图像嵌入
                image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)  # 根据每个提示的图像数量重复图像嵌入
                uncond_image_embeds = torch.zeros_like(image_embeds)  # 创建与图像嵌入相同形状的零张量作为无条件嵌入
    
                return image_embeds, uncond_image_embeds  # 返回编码后的图像嵌入和无条件嵌入
    
        # 定义一个准备图像嵌入的适配器方法
        def prepare_ip_adapter_image_embeds(
            self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
    ):
        # 检查 ip_adapter_image_embeds 是否为 None
        if ip_adapter_image_embeds is None:
            # 如果 ip_adapter_image 不是列表,则将其转换为列表
            if not isinstance(ip_adapter_image, list):
                ip_adapter_image = [ip_adapter_image]

            # 确保 ip_adapter_image 的长度与 IP Adapters 的数量相同
            if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                raise ValueError(
                    # 抛出错误提示,指出图像数量与 IP Adapters 数量不匹配
                    f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                )

            # 初始化图像嵌入列表
            image_embeds = []
            # 遍历每个单独的图像和对应的投影层
            for single_ip_adapter_image, image_proj_layer in zip(
                ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
            ):
                # 确定是否需要输出隐藏状态
                output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                # 编码图像,获取正向和负向图像嵌入
                single_image_embeds, single_negative_image_embeds = self.encode_image(
                    single_ip_adapter_image, device, 1, output_hidden_state
                )
                # 将正向图像嵌入重复 num_images_per_prompt 次
                single_image_embeds = torch.stack([single_image_embeds] * num_images_per_prompt, dim=0)
                # 将负向图像嵌入重复 num_images_per_prompt 次
                single_negative_image_embeds = torch.stack(
                    [single_negative_image_embeds] * num_images_per_prompt, dim=0
                )

                # 如果启用了分类器自由引导
                if do_classifier_free_guidance:
                    # 将正向和负向图像嵌入拼接在一起
                    single_image_embeds = torch.cat(
                        [single_image_embeds, single_negative_image_embeds, single_negative_image_embeds]
                    )
                    # 将图像嵌入转移到指定设备
                    single_image_embeds = single_image_embeds.to(device)

                # 将单个图像嵌入添加到图像嵌入列表中
                image_embeds.append(single_image_embeds)
        else:
            # 定义重复维度
            repeat_dims = [1]
            # 初始化图像嵌入列表
            image_embeds = []
            # 遍历已有的图像嵌入
            for single_image_embeds in ip_adapter_image_embeds:
                # 如果启用了分类器自由引导
                if do_classifier_free_guidance:
                    # 将图像嵌入分割为正向和负向嵌入
                    (
                        single_image_embeds,
                        single_negative_image_embeds,
                        single_negative_image_embeds,
                    ) = single_image_embeds.chunk(3)
                    # 重复正向图像嵌入
                    single_image_embeds = single_image_embeds.repeat(
                        num_images_per_prompt, *(repeat_dims * len(single_image_embeds.shape[1:]))
                    )
                    # 重复负向图像嵌入
                    single_negative_image_embeds = single_negative_image_embeds.repeat(
                        num_images_per_prompt, *(repeat_dims * len(single_negative_image_embeds.shape[1:]))
                    )
                    # 将正向和负向图像嵌入拼接在一起
                    single_image_embeds = torch.cat(
                        [single_image_embeds, single_negative_image_embeds, single_negative_image_embeds]
                    )
                else:
                    # 重复单个图像嵌入
                    single_image_embeds = single_image_embeds.repeat(
                        num_images_per_prompt, *(repeat_dims * len(single_image_embeds.shape[1:]))
                    )
                # 将单个图像嵌入添加到图像嵌入列表中
                image_embeds.append(single_image_embeds)

        # 返回最终的图像嵌入列表
        return image_embeds
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
    def run_safety_checker(self, image, device, dtype):
        # 如果安全检查器未定义,设置 nsfw 概念为 None
        if self.safety_checker is None:
            has_nsfw_concept = None
        else:
            # 如果输入为张量,使用图像处理器将其后处理为 PIL 格式
            if torch.is_tensor(image):
                feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
            else:
                # 如果输入为 NumPy 数组,转换为 PIL 格式
                feature_extractor_input = self.image_processor.numpy_to_pil(image)
            # 使用特征提取器处理输入,返回张量并移动到指定设备
            safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
            # 调用安全检查器,返回处理后的图像和 nsfw 概念判断结果
            image, has_nsfw_concept = self.safety_checker(
                images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
            )
        # 返回处理后的图像和 nsfw 概念判断结果
        return image, has_nsfw_concept
    
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的关键字参数,因为并非所有调度器都有相同的签名
        # eta (η) 仅在 DDIMScheduler 中使用,其他调度器会忽略该参数。
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 值应在 [0, 1] 之间
    
        # 检查调度器的步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        # 如果接受 eta,将其添加到额外参数中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta
    
        # 检查调度器的步骤是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,将其添加到额外参数中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外参数
        return extra_step_kwargs
    
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
    def decode_latents(self, latents):
        # 警告信息,表示该方法已弃用,将在 1.0.0 中移除,建议使用 VaeImageProcessor.postprocess(...)
        deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
        deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
    
        # 按比例缩放 latents
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码 latents,获取图像
        image = self.vae.decode(latents, return_dict=False)[0]
        # 将图像像素值规范化到 [0, 1] 范围
        image = (image / 2 + 0.5).clamp(0, 1)
        # 始终转换为 float32,确保与 bfloat16 兼容且不会造成显著开销
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image
    
    # 定义检查输入参数的方法
    def check_inputs(
        self,
        prompt,
        callback_steps,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        ip_adapter_image=None,
        ip_adapter_image_embeds=None,
        callback_on_step_end_tensor_inputs=None,
    ):
        # 检查 callback_steps 是否为正整数,如果不是则引发 ValueError
        if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
            raise ValueError(
                # 报告 callback_steps 不是正整数的错误信息
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

        # 检查 callback_on_step_end_tensor_inputs 是否为 None,且是否包含在 _callback_tensor_inputs 中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
        ):
            raise ValueError(
                # 报告 callback_on_step_end_tensor_inputs 中的某些元素不在 _callback_tensor_inputs 的错误信息
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )

        # 检查 prompt 和 prompt_embeds 是否同时存在
        if prompt is not None and prompt_embeds is not None:
            raise ValueError(
                # 报告同时提供 prompt 和 prompt_embeds 的错误信息
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        # 检查 prompt 和 prompt_embeds 是否都为 None
        elif prompt is None and prompt_embeds is None:
            raise ValueError(
                # 报告需要提供 prompt 或 prompt_embeds 之一的错误信息
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        # 检查 prompt 的类型是否为 str 或 list
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查 negative_prompt 和 negative_prompt_embeds 是否同时存在
        if negative_prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                # 报告同时提供 negative_prompt 和 negative_prompt_embeds 的错误信息
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查 prompt_embeds 和 negative_prompt_embeds 是否同时存在且形状一致
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    # 报告 prompt_embeds 和 negative_prompt_embeds 形状不一致的错误信息
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

        # 检查 ip_adapter_image 和 ip_adapter_image_embeds 是否同时存在
        if ip_adapter_image is not None and ip_adapter_image_embeds is not None:
            raise ValueError(
                # 报告需要提供 ip_adapter_image 或 ip_adapter_image_embeds 之一的错误信息
                "Provide either `ip_adapter_image` or `ip_adapter_image_embeds`. Cannot leave both `ip_adapter_image` and `ip_adapter_image_embeds` defined."
            )

        # 检查 ip_adapter_image_embeds 是否存在且类型为 list
        if ip_adapter_image_embeds is not None:
            if not isinstance(ip_adapter_image_embeds, list):
                raise ValueError(
                    # 报告 ip_adapter_image_embeds 不是 list 类型的错误信息
                    f"`ip_adapter_image_embeds` has to be of type `list` but is {type(ip_adapter_image_embeds)}"
                )
            # 检查 ip_adapter_image_embeds 中第一个元素的维度是否为 3D 或 4D
            elif ip_adapter_image_embeds[0].ndim not in [3, 4]:
                raise ValueError(
                    # 报告 ip_adapter_image_embeds 中的张量维度不正确的错误信息
                    f"`ip_adapter_image_embeds` has to be a list of 3D or 4D tensors but is {ip_adapter_image_embeds[0].ndim}D"
                )
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义形状,包含批大小、通道数和缩放后的高度和宽度
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,  # 按 VAE 缩放因子调整高度
            int(width) // self.vae_scale_factor,    # 按 VAE 缩放因子调整宽度
        )
        # 检查生成器是否为列表且长度与批大小不匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            # 抛出值错误,提示生成器长度与批大小不匹配
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果没有传入潜在变量
        if latents is None:
            # 生成随机张量作为潜在变量,使用指定的生成器、设备和数据类型
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果已传入潜在变量,将其转移到指定的设备
            latents = latents.to(device)

        # 按调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    def prepare_image_latents(
        # 准备图像潜在变量的方法,接受图像及相关参数
        self, image, batch_size, num_images_per_prompt, dtype, device, do_classifier_free_guidance, generator=None
    ):
        # 检查输入的图像类型是否为 torch.Tensor、PIL.Image.Image 或列表
        if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)):
            # 如果类型不匹配,则抛出错误并显示当前类型
            raise ValueError(
                f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}"
            )

        # 将图像转换到指定的设备和数据类型
        image = image.to(device=device, dtype=dtype)

        # 根据提示数量调整批处理大小
        batch_size = batch_size * num_images_per_prompt

        # 如果图像有4个通道,则直接使用它
        if image.shape[1] == 4:
            image_latents = image
        else:
            # 编码图像并以 "argmax" 模式检索潜在表示
            image_latents = retrieve_latents(self.vae.encode(image), sample_mode="argmax")

        # 如果批处理大小大于潜在表示的数量且可以整除
        if batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] == 0:
            # 生成警告消息
            deprecation_message = (
                f"You have passed {batch_size} text prompts (`prompt`), but only {image_latents.shape[0]} initial"
                " images (`image`). Initial images are now duplicating to match the number of text prompts. Note"
                " that this behavior is deprecated and will be removed in a version 1.0.0. Please make sure to update"
                " your script to pass as many initial images as text prompts to suppress this warning."
            )
            # 发出弃用警告
            deprecate("len(prompt) != len(image)", "1.0.0", deprecation_message, standard_warn=False)
            # 计算每个提示需要额外的图像数量
            additional_image_per_prompt = batch_size // image_latents.shape[0]
            # 扩展潜在表示以匹配批处理大小
            image_latents = torch.cat([image_latents] * additional_image_per_prompt, dim=0)
        # 如果批处理大小大于潜在表示数量但不能整除
        elif batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] != 0:
            # 抛出错误,表示无法复制图像以匹配文本提示
            raise ValueError(
                f"Cannot duplicate `image` of batch size {image_latents.shape[0]} to {batch_size} text prompts."
            )
        else:
            # 将潜在表示展开为一个批次
            image_latents = torch.cat([image_latents], dim=0)

        # 如果启用分类器自由引导
        if do_classifier_free_guidance:
            # 创建与潜在表示形状相同的零张量
            uncond_image_latents = torch.zeros_like(image_latents)
            # 将潜在表示和未条件潜在表示合并
            image_latents = torch.cat([image_latents, image_latents, uncond_image_latents], dim=0)

        # 返回处理后的潜在表示
        return image_latents

    # 定义属性:引导比例
    @property
    def guidance_scale(self):
        return self._guidance_scale

    # 定义属性:图像引导比例
    @property
    def image_guidance_scale(self):
        return self._image_guidance_scale

    # 定义属性:时间步数
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 此处的 `guidance_scale` 定义类似于 Imagen 论文中方程 (2) 的引导权重 `w`
    # `guidance_scale = 1` 表示不使用分类器自由引导。
    @property
    def do_classifier_free_guidance(self):
        # 根据引导比例和图像引导比例决定是否使用分类器自由引导
        return self.guidance_scale > 1.0 and self.image_guidance_scale >= 1.0