diffusers-源码解析-三十八-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(三十八)

.\diffusers\pipelines\musicldm\__init__.py

# 导入类型检查模块,便于静态类型检查
from typing import TYPE_CHECKING

# 从上层模块导入所需的工具函数和常量
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 用于判断是否慢导入
    OptionalDependencyNotAvailable,  # 可选依赖不可用的异常
    _LazyModule,  # 延迟加载模块的类
    get_objects_from_module,  # 从模块中获取对象的函数
    is_torch_available,  # 检查 PyTorch 是否可用的函数
    is_transformers_available,  # 检查 Transformers 是否可用的函数
    is_transformers_version,  # 检查 Transformers 版本的函数
)

# 初始化一个空字典,用于存储假对象
_dummy_objects = {}
# 初始化一个空字典,用于存储导入结构
_import_structure = {}

# 尝试块,用于检查依赖
try:
    # 检查是否可用的 Transformers 和 PyTorch,并且版本是否满足要求
    if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.27.0")):
        # 如果依赖不满足,抛出异常
        raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
    # 从工具模块中导入假对象以避免错误
    from ...utils import dummy_torch_and_transformers_objects  # noqa F403

    # 更新假对象字典,添加从假对象模块获取的对象
    _dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果没有异常,执行以下代码
else:
    # 更新导入结构,添加 MusicLDMPipeline 相关的导入
    _import_structure["pipeline_musicldm"] = ["MusicLDMPipeline"]

# 检查是否在类型检查阶段或者慢导入模式
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 尝试块,用于再次检查依赖
    try:
        # 检查依赖是否可用
        if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.27.0")):
            # 如果依赖不满足,抛出异常
            raise OptionalDependencyNotAvailable()

    # 捕获可选依赖不可用的异常
    except OptionalDependencyNotAvailable:
        # 导入假对象以避免错误
        from ...utils.dummy_torch_and_transformers_objects import *
    else:
        # 从 pipeline_musicldm 模块导入 MusicLDMPipeline 类
        from .pipeline_musicldm import MusicLDMPipeline

# 如果不是在类型检查阶段或慢导入模式
else:
    # 导入 sys 模块
    import sys

    # 使用延迟加载模块的方式初始化当前模块
    sys.modules[__name__] = _LazyModule(
        __name__,
        globals()["__file__"],
        _import_structure,
        module_spec=__spec__,
    )

    # 将假对象字典中的每个对象设置到当前模块
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\onnx_utils.py

# coding=utf-8  # 指定源代码的编码为 UTF-8
# Copyright 2024 The HuggingFace Inc. team.  # HuggingFace Inc. 团队的版权声明
# Copyright (c) 2022, NVIDIA CORPORATION.  All rights reserved.  # NVIDIA Corporation 的版权声明
#
# Licensed under the Apache License, Version 2.0 (the "License");  # 说明该文件根据 Apache 许可证进行授权
# you may not use this file except in compliance with the License.  # 使用文件的条件
# You may obtain a copy of the License at  # 获取许可证的链接
#
#     http://www.apache.org/licenses/LICENSE-2.0  # 许可证的网址
#
# Unless required by applicable law or agreed to in writing, software  # 免责声明,表示无任何担保
# distributed under the License is distributed on an "AS IS" BASIS,  # 文件按现状提供
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  # 不提供任何明示或暗示的担保
# See the License for the specific language governing permissions and  # 指定权限和限制
# limitations under the License.  # 许可证下的限制

import os  # 导入操作系统功能模块
import shutil  # 导入高级文件操作模块
from pathlib import Path  # 导入路径操作类
from typing import Optional, Union  # 导入类型提示

import numpy as np  # 导入 NumPy 库,用于数值计算
from huggingface_hub import hf_hub_download  # 从 Hugging Face Hub 下载模型
from huggingface_hub.utils import validate_hf_hub_args  # 验证 Hugging Face Hub 参数

from ..utils import ONNX_EXTERNAL_WEIGHTS_NAME, ONNX_WEIGHTS_NAME, is_onnx_available, logging  # 导入实用工具

if is_onnx_available():  # 检查 ONNX 是否可用
    import onnxruntime as ort  # 导入 ONNX Runtime 库

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器

ORT_TO_NP_TYPE = {  # 创建一个字典,将 ONNX 类型映射到 NumPy 类型
    "tensor(bool)": np.bool_,  # 布尔类型
    "tensor(int8)": np.int8,  # 8位整数类型
    "tensor(uint8)": np.uint8,  # 无符号8位整数类型
    "tensor(int16)": np.int16,  # 16位整数类型
    "tensor(uint16)": np.uint16,  # 无符号16位整数类型
    "tensor(int32)": np.int32,  # 32位整数类型
    "tensor(uint32)": np.uint32,  # 无符号32位整数类型
    "tensor(int64)": np.int64,  # 64位整数类型
    "tensor(uint64)": np.uint64,  # 无符号64位整数类型
    "tensor(float16)": np.float16,  # 16位浮点数类型
    "tensor(float)": np.float32,  # 32位浮点数类型
    "tensor(double)": np.float64,  # 64位浮点数类型
}

class OnnxRuntimeModel:  # 定义 OnnxRuntimeModel 类
    def __init__(self, model=None, **kwargs):  # 构造函数,初始化模型和参数
        logger.info("`diffusers.OnnxRuntimeModel` is experimental and might change in the future.")  # 记录信息,说明该模型为实验性
        self.model = model  # 保存模型实例
        self.model_save_dir = kwargs.get("model_save_dir", None)  # 获取模型保存目录
        self.latest_model_name = kwargs.get("latest_model_name", ONNX_WEIGHTS_NAME)  # 获取最新模型名称

    def __call__(self, **kwargs):  # 定义调用函数,使类实例可调用
        inputs = {k: np.array(v) for k, v in kwargs.items()}  # 将输入参数转换为 NumPy 数组
        return self.model.run(None, inputs)  # 运行模型并返回结果

    @staticmethod  # 静态方法,不需要实例化
    def load_model(path: Union[str, Path], provider=None, sess_options=None):  # 加载 ONNX 模型
        """
        Loads an ONNX Inference session with an ExecutionProvider. Default provider is `CPUExecutionProvider`

        Arguments:
            path (`str` or `Path`):  # 加载模型的路径
                Directory from which to load
            provider(`str`, *optional*):  # 执行提供者,可选参数
                Onnxruntime execution provider to use for loading the model, defaults to `CPUExecutionProvider`
        """
        if provider is None:  # 检查提供者是否为空
            logger.info("No onnxruntime provider specified, using CPUExecutionProvider")  # 记录信息,使用默认提供者
            provider = "CPUExecutionProvider"  # 设置为默认提供者

        return ort.InferenceSession(path, providers=[provider], sess_options=sess_options)  # 创建并返回推理会话
    # 定义一个保存预训练模型及其配置文件的方法
        def _save_pretrained(self, save_directory: Union[str, Path], file_name: Optional[str] = None, **kwargs):
            """
            将模型及其配置文件保存到指定目录,以便可以通过
            [`~optimum.onnxruntime.modeling_ort.ORTModel.from_pretrained`] 类方法重新加载。始终保存
            latest_model_name。
    
            参数:
                save_directory (`str` 或 `Path`):
                    保存模型文件的目录。
                file_name(`str`, *可选*):
                    将默认模型文件名从 `"model.onnx"` 替换为 `file_name`。这允许使用不同的名称保存模型。
            """
            # 根据提供的文件名或默认模型名称设置模型文件名
            model_file_name = file_name if file_name is not None else ONNX_WEIGHTS_NAME
    
            # 创建源路径,指向最新模型的保存目录
            src_path = self.model_save_dir.joinpath(self.latest_model_name)
            # 创建目标路径,指向保存目录和模型文件名
            dst_path = Path(save_directory).joinpath(model_file_name)
            try:
                # 尝试复制模型文件到目标路径
                shutil.copyfile(src_path, dst_path)
            except shutil.SameFileError:
                # 如果源文件和目标文件相同,则忽略错误
                pass
    
            # 复制外部权重(适用于大于2GB的模型)
            src_path = self.model_save_dir.joinpath(ONNX_EXTERNAL_WEIGHTS_NAME)
            # 检查外部权重文件是否存在
            if src_path.exists():
                # 创建目标路径指向外部权重文件
                dst_path = Path(save_directory).joinpath(ONNX_EXTERNAL_WEIGHTS_NAME)
                try:
                    # 尝试复制外部权重文件到目标路径
                    shutil.copyfile(src_path, dst_path)
                except shutil.SameFileError:
                    # 如果源文件和目标文件相同,则忽略错误
                    pass
    
        # 定义保存预训练模型到指定目录的方法
        def save_pretrained(
            self,
            save_directory: Union[str, os.PathLike],
            **kwargs,
        ):
            """
            将模型保存到指定目录,以便可以通过
            [`~OnnxModel.from_pretrained`] 类方法重新加载。:
    
            参数:
                save_directory (`str` 或 `os.PathLike`):
                    要保存的目录。如果不存在,则会创建。
            """
            # 检查提供的路径是否是文件,如果是,则记录错误并返回
            if os.path.isfile(save_directory):
                logger.error(f"Provided path ({save_directory}) should be a directory, not a file")
                return
    
            # 创建保存目录,如果已存在则不报错
            os.makedirs(save_directory, exist_ok=True)
    
            # 保存模型权重/文件
            self._save_pretrained(save_directory, **kwargs)
    
        # 定义一个类方法,从预训练模型加载模型
        @classmethod
        @validate_hf_hub_args
        def _from_pretrained(
            cls,
            model_id: Union[str, Path],
            token: Optional[Union[bool, str, None]] = None,
            revision: Optional[Union[str, None]] = None,
            force_download: bool = False,
            cache_dir: Optional[str] = None,
            file_name: Optional[str] = None,
            provider: Optional[str] = None,
            sess_options: Optional["ort.SessionOptions"] = None,
            **kwargs,
    ):
        """
        从目录或 HF Hub 加载模型。

        参数:
            model_id (`str` 或 `Path`):
                要加载的目录
            token (`str` 或 `bool`):
                加载私有或受限库模型所需
            revision (`str`):
                具体的模型版本,可以是分支名、标签名或提交 ID
            cache_dir (`Union[str, Path]`, *可选*):
                下载的预训练模型配置应缓存的目录路径,如果不使用标准缓存。
            force_download (`bool`, *可选*, 默认值为 `False`):
                是否强制(重新)下载模型权重和配置文件,覆盖已存在的缓存版本。
            file_name(`str`):
                将默认模型文件名从 `"model.onnx"` 替换为 `file_name`。这允许从同一库或目录加载不同的模型文件。
            provider(`str`):
                ONNX 运行时提供者,例如 `CPUExecutionProvider` 或 `CUDAExecutionProvider`。
            kwargs (`Dict`, *可选*):
                初始化时将传递给模型的关键字参数
        """
        # 根据 file_name 判断模型文件名,如果为 None 则使用默认的 ONNX_WEIGHTS_NAME
        model_file_name = file_name if file_name is not None else ONNX_WEIGHTS_NAME
        # 检查 model_id 是否为目录
        if os.path.isdir(model_id):
            # 从本地目录加载模型
            model = OnnxRuntimeModel.load_model(
                # 使用给定模型文件名和提供者加载模型
                Path(model_id, model_file_name).as_posix(), provider=provider, sess_options=sess_options
            )
            # 将模型保存目录加入 kwargs
            kwargs["model_save_dir"] = Path(model_id)
        # 如果 model_id 不是目录,则从 hub 加载模型
        else:
            # 下载模型
            model_cache_path = hf_hub_download(
                # 从 HF Hub 下载模型,使用提供的参数
                repo_id=model_id,
                filename=model_file_name,
                token=token,
                revision=revision,
                cache_dir=cache_dir,
                force_download=force_download,
            )
            # 将模型缓存路径的父目录加入 kwargs
            kwargs["model_save_dir"] = Path(model_cache_path).parent
            # 将下载的最新模型名称加入 kwargs
            kwargs["latest_model_name"] = Path(model_cache_path).name
            # 从缓存路径加载模型
            model = OnnxRuntimeModel.load_model(model_cache_path, provider=provider, sess_options=sess_options)
        # 返回模型实例和关键字参数
        return cls(model=model, **kwargs)

    @classmethod
    @validate_hf_hub_args
    def from_pretrained(
        cls,
        # 要加载的模型 ID,可以是字符串或路径
        model_id: Union[str, Path],
        # 是否强制下载模型,默认值为 True
        force_download: bool = True,
        # 用于私有库的访问令牌,可选
        token: Optional[str] = None,
        # 缓存目录,可选
        cache_dir: Optional[str] = None,
        # 其他模型关键字参数
        **model_kwargs,
    # 结束函数定义
        ):
            # 初始化修订版本为 None
            revision = None
            # 如果模型 ID 以 "@" 分隔成两个部分,则分开赋值
            if len(str(model_id).split("@")) == 2:
                # 分割模型 ID 和修订版本
                model_id, revision = model_id.split("@")
    
            # 从预训练模型中加载,返回加载的模型
            return cls._from_pretrained(
                # 指定模型 ID
                model_id=model_id,
                # 指定修订版本
                revision=revision,
                # 指定缓存目录
                cache_dir=cache_dir,
                # 指定是否强制下载
                force_download=force_download,
                # 提供访问令牌
                token=token,
                # 传递额外的模型参数
                **model_kwargs,
            )

.\diffusers\pipelines\pag\pag_utils.py

# 版权所有 2024 HuggingFace 团队。所有权利保留。
#
# 根据 Apache 许可证,第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非根据适用法律或书面协议另有约定,软件
# 在“按原样”基础上分发,没有任何形式的保证或条件,
# 无论是明示还是暗示的。
# 有关许可证具体条款和条件,请参见许可文件。
import re  # 导入正则表达式模块,用于字符串模式匹配
from typing import Dict, List, Tuple, Union  # 从 typing 导入类型提示,用于增强代码可读性和类型检查

import torch  # 导入 PyTorch 库,用于深度学习
import torch.nn as nn  # 从 PyTorch 导入神经网络模块,用于构建模型

from ...models.attention_processor import (  # 从相对路径导入注意力处理相关类
    Attention,  # 导入注意力机制类
    AttentionProcessor,  # 导入注意力处理器类
    PAGCFGIdentitySelfAttnProcessor2_0,  # 导入特定版本的身份自注意力处理器
    PAGIdentitySelfAttnProcessor2_0,  # 导入另一个特定版本的身份自注意力处理器
)
from ...utils import logging  # 从相对路径导入日志记录工具

logger = logging.get_logger(__name__)  # 创建当前模块的日志记录器实例,pylint 禁用无效名称检查

class PAGMixin:  # 定义一个混合类,用于实现 Pertubed Attention Guidance 功能
    r"""Mixin class for [Pertubed Attention Guidance](https://arxiv.org/abs/2403.17377v1)."""  # 文档字符串,描述该混合类的目的及其引用
    # 定义设置 PAG 注意力处理器的私有方法,接收应用的层和分类器自由引导的标志
    def _set_pag_attn_processor(self, pag_applied_layers, do_classifier_free_guidance):
        r"""
        设置 PAG 层的注意力处理器。
        """
        # 获取当前对象的 PAG 注意力处理器列表
        pag_attn_processors = self._pag_attn_processors
        # 检查是否已设置 PAG 注意力处理器,如果没有,则抛出异常
        if pag_attn_processors is None:
            raise ValueError(
                "No PAG attention processors have been set. Set the attention processors by calling `set_pag_applied_layers` and passing the relevant parameters."
            )
    
        # 根据是否使用分类器自由引导选择对应的 PAG 注意力处理器
        pag_attn_proc = pag_attn_processors[0] if do_classifier_free_guidance else pag_attn_processors[1]
    
        # 检查当前对象是否具有 unet 属性
        if hasattr(self, "unet"):
            # 如果有,设置模型为 unet
            model: nn.Module = self.unet
        else:
            # 如果没有,设置模型为 transformer
            model: nn.Module = self.transformer
    
        # 定义一个检查模块是否为自注意力模块的函数
        def is_self_attn(module: nn.Module) -> bool:
            r"""
            根据模块名称检查它是否是自注意力模块。
            """
            # 判断模块是否为 Attention 类型且不是交叉注意力模块
            return isinstance(module, Attention) and not module.is_cross_attention
    
        # 定义一个检查是否为假积分匹配的函数
        def is_fake_integral_match(layer_id, name):
            # 获取层 ID 和名称的最后部分
            layer_id = layer_id.split(".")[-1]
            name = name.split(".")[-1]
            # 检查层 ID 和名称是否都是数字并且相等
            return layer_id.isnumeric() and name.isnumeric() and layer_id == name
    
        # 遍历应用的 PAG 层
        for layer_id in pag_applied_layers:
            # 为每个 PAG 层输入,找到在 unet 模型中对应的自注意力层
            target_modules = []
    
            # 遍历模型中的所有命名模块
            for name, module in model.named_modules():
                # 确定以下简单情况:
                #   (1) 存在自注意力层
                #   (2) 模块名称是否与 PAG 层 ID 部分匹配
                #   (3) 确保如果层 ID 以数字结尾则不是假积分匹配
                #       例如,blocks.1 和 blocks.10 应该可区分,如果 layer_id="blocks.1"
                if (
                    is_self_attn(module)
                    and re.search(layer_id, name) is not None
                    and not is_fake_integral_match(layer_id, name)
                ):
                    # 记录调试信息,显示应用 PAG 到的层
                    logger.debug(f"Applying PAG to layer: {name}")
                    # 将匹配的模块添加到目标模块列表
                    target_modules.append(module)
    
            # 如果未找到任何目标模块,则抛出异常
            if len(target_modules) == 0:
                raise ValueError(f"Cannot find PAG layer to set attention processor for: {layer_id}")
    
            # 将选定的 PAG 注意力处理器分配给目标模块
            for module in target_modules:
                module.processor = pag_attn_proc
    
    # 定义获取在时间步 `t` 的扰动注意力引导的缩放因子的私有方法
    def _get_pag_scale(self, t):
        r"""
        获取时间步 `t` 的扰动注意力引导的缩放因子。
        """
    
        # 检查是否进行自适应缩放
        if self.do_pag_adaptive_scaling:
            # 计算信号缩放因子
            signal_scale = self.pag_scale - self.pag_adaptive_scale * (1000 - t)
            # 如果信号缩放小于 0,则设置为 0
            if signal_scale < 0:
                signal_scale = 0
            # 返回计算后的信号缩放因子
            return signal_scale
        else:
            # 否则直接返回预设的 PAG 缩放因子
            return self.pag_scale
    # 定义应用扰动注意力引导的函数,更新噪声预测
    def _apply_perturbed_attention_guidance(self, noise_pred, do_classifier_free_guidance, guidance_scale, t):
        r"""
        应用扰动注意力引导到噪声预测中。

        参数:
            noise_pred (torch.Tensor): 噪声预测张量。
            do_classifier_free_guidance (bool): 是否应用无分类器引导。
            guidance_scale (float): 引导项的缩放因子。
            t (int): 当前时间步。

        返回:
            torch.Tensor: 应用扰动注意力引导后更新的噪声预测张量。
        """
        # 获取当前时间步的引导缩放因子
        pag_scale = self._get_pag_scale(t)
        # 如果需要应用无分类器引导
        if do_classifier_free_guidance:
            # 将噪声预测张量分割为三个部分:无条件、文本和扰动
            noise_pred_uncond, noise_pred_text, noise_pred_perturb = noise_pred.chunk(3)
            # 更新噪声预测,结合无条件、文本和扰动信息
            noise_pred = (
                noise_pred_uncond
                + guidance_scale * (noise_pred_text - noise_pred_uncond)
                + pag_scale * (noise_pred_text - noise_pred_perturb)
            )
        else:
            # 将噪声预测张量分割为两部分:文本和扰动
            noise_pred_text, noise_pred_perturb = noise_pred.chunk(2)
            # 更新噪声预测,仅结合文本和扰动信息
            noise_pred = noise_pred_text + pag_scale * (noise_pred_text - noise_pred_perturb)
        # 返回更新后的噪声预测
        return noise_pred

    # 定义准备扰动注意力引导的函数
    def _prepare_perturbed_attention_guidance(self, cond, uncond, do_classifier_free_guidance):
        """
        为 PAG 模型准备扰动注意力引导。

        参数:
            cond (torch.Tensor): 条件输入张量。
            uncond (torch.Tensor): 无条件输入张量。
            do_classifier_free_guidance (bool): 表示是否执行无分类器引导的标志。

        返回:
            torch.Tensor: 准备好的扰动注意力引导张量。
        """

        # 将条件输入张量在维度 0 上重复两次
        cond = torch.cat([cond] * 2, dim=0)

        # 如果需要应用无分类器引导
        if do_classifier_free_guidance:
            # 将无条件输入张量与条件张量在维度 0 上连接
            cond = torch.cat([uncond, cond], dim=0)
        # 返回准备好的条件张量
        return cond

    # 定义设置应用扰动注意力引导层的函数
    def set_pag_applied_layers(
        self,
        pag_applied_layers: Union[str, List[str]],  # 指定应用的层,可以是字符串或字符串列表
        pag_attn_processors: Tuple[AttentionProcessor, AttentionProcessor] = (  # 设置默认的注意力处理器
            PAGCFGIdentitySelfAttnProcessor2_0(),  # 第一个注意力处理器的实例
            PAGIdentitySelfAttnProcessor2_0(),  # 第二个注意力处理器的实例
        ),
    ):
        r""" 
        设置自注意力层以应用PAG。 如果输入无效,则引发ValueError。
        
        参数:
            pag_applied_layers (`str` 或 `List[str]`):
                一个或多个字符串标识层名称,或用于匹配多个层的简单正则表达式,PAG将应用于这些层。预期用法有几种:
                  - 单层指定为 - "blocks.{layer_index}"
                  - 多层作为列表 - ["blocks.{layers_index_1}", "blocks.{layer_index_2}", ...]
                  - 多层作为块名称 - "mid"
                  - 多层作为正则表达式 - "blocks.({layer_index_1}|{layer_index_2})"
            pag_attn_processors:
                (`Tuple[AttentionProcessor, AttentionProcessor]`, 默认值为 `(PAGCFGIdentitySelfAttnProcessor2_0(),
                PAGIdentitySelfAttnProcessor2_0())`): 一个包含两个注意力处理器的元组。第一个注意力
                处理器用于启用分类器无关指导的PAG(条件和无条件)。第二个
                注意力处理器用于禁用CFG的PAG(仅无条件)。
        """

        # 检查实例是否具有属性"_pag_attn_processors",如果没有则将其设置为None
        if not hasattr(self, "_pag_attn_processors"):
            self._pag_attn_processors = None

        # 如果输入的pag_applied_layers不是列表,则将其转换为单元素列表
        if not isinstance(pag_applied_layers, list):
            pag_applied_layers = [pag_applied_layers]
        
        # 如果pag_attn_processors不为None,则检查其类型和长度
        if pag_attn_processors is not None:
            if not isinstance(pag_attn_processors, tuple) or len(pag_attn_processors) != 2:
                # 如果不满足条件,则引发ValueError
                raise ValueError("Expected a tuple of two attention processors")

        # 遍历pag_applied_layers中的每个元素,检查它们是否都是字符串类型
        for i in range(len(pag_applied_layers)):
            if not isinstance(pag_applied_layers[i], str):
                # 如果类型不匹配,则引发ValueError并输出类型信息
                raise ValueError(
                    f"Expected either a string or a list of string but got type {type(pag_applied_layers[i])}"
                )

        # 将有效的pag_applied_layers和pag_attn_processors存储到实例属性中
        self.pag_applied_layers = pag_applied_layers
        self._pag_attn_processors = pag_attn_processors

    @property
    def pag_scale(self) -> float:
        r"""获取扰动注意力引导的缩放因子。"""
        # 返回实例的_pag_scale属性
        return self._pag_scale

    @property
    def pag_adaptive_scale(self) -> float:
        r"""获取扰动注意力引导的自适应缩放因子。"""
        # 返回实例的_pag_adaptive_scale属性
        return self._pag_adaptive_scale

    @property
    def do_pag_adaptive_scaling(self) -> bool:
        r"""检查是否启用扰动注意力引导的自适应缩放。"""
        # 检查_pag_adaptive_scale和_pag_scale是否大于0,并且pag_applied_layers的长度大于0
        return self._pag_adaptive_scale > 0 and self._pag_scale > 0 and len(self.pag_applied_layers) > 0

    @property
    def do_perturbed_attention_guidance(self) -> bool:
        r"""检查是否启用扰动注意力引导。"""
        # 检查_pag_scale是否大于0,并且pag_applied_layers的长度大于0
        return self._pag_scale > 0 and len(self.pag_applied_layers) > 0

    @property
    # 定义一个方法,用于获取 PAG 注意力处理器
    def pag_attn_processors(self) -> Dict[str, AttentionProcessor]:
        r"""
        返回值:
            `dict` 的 PAG 注意力处理器:一个字典包含模型中使用的所有 PAG 注意力处理器
            以层的名称作为键。
        """
    
        # 检查 PAG 注意力处理器是否为 None,如果是,则返回空字典
        if self._pag_attn_processors is None:
            return {}
    
        # 创建一个集合,包含所有有效的注意力处理器类
        valid_attn_processors = {x.__class__ for x in self._pag_attn_processors}
    
        # 初始化一个字典,用于存储处理器
        processors = {}
        # 通过检查是否存在 'unet' 属性来决定使用哪个去噪模块
        # 如果存在,则使用 self.unet
        if hasattr(self, "unet"):
            denoiser_module = self.unet
        # 如果 'unet' 属性不存在,则检查 'transformer' 属性
        elif hasattr(self, "transformer"):
            denoiser_module = self.transformer
        # 如果两者都不存在,则引发错误
        else:
            raise ValueError("No denoiser module found.")
    
        # 遍历去噪模块中的注意力处理器
        for name, proc in denoiser_module.attn_processors.items():
            # 如果当前处理器类在有效处理器集合中,则将其添加到处理器字典中
            if proc.__class__ in valid_attn_processors:
                processors[name] = proc
    
        # 返回找到的处理器字典
        return processors

.\diffusers\pipelines\pag\pipeline_pag_controlnet_sd.py

# 版权声明,标识该文件的版权归 HuggingFace 团队所有
# 按照 Apache 2.0 许可证的条款进行许可
# 除非遵守许可证,否则不得使用此文件
# 可以在以下网址获取许可证副本
#     http://www.apache.org/licenses/LICENSE-2.0
# 除非适用法律或书面协议另有规定,软件以 "按现状" 基础分发
# 不提供任何形式的明示或暗示的保证或条件
# 请参阅许可证以了解特定语言下的权限和限制


# 导入 inspect 模块,用于检查对象
import inspect
# 从 typing 模块导入多种类型提示,用于类型注解
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 numpy 库,常用于科学计算
import numpy as np
# 导入 PIL.Image,用于处理图像
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入 PyTorch 的功能模块
import torch.nn.functional as F
# 从 transformers 库导入 CLIP 模型相关组件
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection

# 导入自定义回调函数类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入图像处理相关类
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 导入多种加载器混合类
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入多种模型类
from ...models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel
# 从 LoRA 模块导入调整 LoRA 比例的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从 utils 模块导入多个实用工具
from ...utils import (
    USE_PEFT_BACKEND,  # 是否使用 PEFT 后端
    logging,  # 日志记录工具
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 缩放 LoRA 层的函数
    unscale_lora_layers,  # 反缩放 LoRA 层的函数
)
# 从 torch_utils 模块导入多种与 PyTorch 相关的工具函数
from ...utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor
# 从 MultiControlNet 模块导入多控制网模型类
from ..controlnet.multicontrolnet import MultiControlNetModel
# 从 pipeline_utils 导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从 stable_diffusion.pipeline_output 导入稳定扩散管道输出类
from ..stable_diffusion.pipeline_output import StableDiffusionPipelineOutput
# 从 stable_diffusion.safety_checker 导入稳定扩散安全检查器类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker
# 从 pag_utils 导入 PAG 混合类
from .pag_utils import PAGMixin


# 创建一个日志记录器实例,用于记录该模块的日志
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name


# 示例文档字符串的模板,用于描述示例代码的结构和用途
EXAMPLE_DOC_STRING = """
# 示例代码
    Examples:
        ```py
        >>> # !pip install opencv-python transformers accelerate  # 安装必要的库
        >>> from diffusers import AutoPipelineForText2Image, ControlNetModel, UniPCMultistepScheduler  # 导入用于图像生成的库
        >>> from diffusers.utils import load_image  # 导入加载图像的工具
        >>> import numpy as np  # 导入NumPy用于数组操作
        >>> import torch  # 导入PyTorch用于深度学习

        >>> import cv2  # 导入OpenCV用于计算机视觉
        >>> from PIL import Image  # 导入PIL用于图像处理

        >>> # 下载一张图片
        >>> image = load_image(  # 使用指定的URL下载图像
        ...     "https://hf.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/hf-logo.png"  # 图像的URL
        ... )
        >>> image = np.array(image)  # 将下载的图像转换为NumPy数组

        >>> # 获取Canny边缘图像
        >>> image = cv2.Canny(image, 100, 200)  # 使用Canny算子检测边缘
        >>> image = image[:, :, None]  # 添加一个新的维度以适配后续操作
        >>> image = np.concatenate([image, image, image], axis=2)  # 将边缘图像复制到三个通道,转换为RGB格式
        >>> canny_image = Image.fromarray(image)  # 将NumPy数组转换回PIL图像

        >>> # 加载控制网和稳定扩散模型v1-5
        >>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16)  # 加载预训练的控制网络
        >>> pipe = AutoPipelineForText2Image.from_pretrained(  # 创建图像生成管道
        ...     "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16, enable_pag=True  # 加载稳定扩散模型并启用分页
        ... )

        >>> # 通过更快的调度器和内存优化加速扩散过程
        >>> # 如果未安装xformers,可以删除以下行
        >>> pipe.enable_xformers_memory_efficient_attention()  # 启用xformers以提高内存效率

        >>> pipe.enable_model_cpu_offload()  # 启用模型CPU卸载以节省GPU内存

        >>> # 生成图像
        >>> generator = torch.manual_seed(0)  # 设置随机种子以确保生成图像的可重现性
        >>> image = pipe(  # 调用管道生成图像
        ...     "aerial view, a futuristic research complex in a bright foggy jungle, hard lighting",  # 输入生成图像的描述
        ...     guidance_scale=7.5,  # 设置引导比例以控制生成图像的质量
        ...     generator=generator,  # 传入随机生成器
        ...     image=canny_image,  # 使用之前处理的Canny图像作为输入
        ...     pag_scale=10,  # 设置分页比例
        ... ).images[0]  # 获取生成的第一张图像
        ```

从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制而来

def retrieve_timesteps(
# 调度器对象
scheduler,
# 推理步骤的数量(可选)
num_inference_steps: Optional[int] = None,
# 设备类型(可选)
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步(可选)
timesteps: Optional[List[int]] = None,
# 自定义 sigma 值(可选)
sigmas: Optional[List[float]] = None,
# 其他关键字参数
**kwargs,
):
"""
调用调度器的 set_timesteps 方法,并在调用后从调度器中检索时间步。处理自定义时间步。任何关键字参数将传递给 scheduler.set_timesteps

参数:
    scheduler (`SchedulerMixin`):
        用于获取时间步的调度器。
    num_inference_steps (`int`):
        生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
    device (`str` 或 `torch.device`,*可选*):
        要将时间步移动到的设备。如果为 `None`,则时间步不会移动。
    timesteps (`List[int]`,*可选*):
        用于覆盖调度器的时间步间距策略的自定义时间步。如果传递 `timesteps`,`num_inference_steps` 和 `sigmas` 必须为 `None`。
    sigmas (`List[float]`,*可选*):
        用于覆盖调度器的时间步间距策略的自定义 sigma 值。如果传递 `sigmas`,`num_inference_steps` 和 `timesteps` 必须为 `None`。

返回:
    `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了时间步和 sigma
if timesteps is not None and sigmas is not None:
    # 抛出错误,提示只能传递一个
    raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查是否传递了时间步
if timesteps is not None:
    # 检查调度器的 `set_timesteps` 方法是否接受时间步参数
    accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不支持,抛出错误
    if not accepts_timesteps:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" timestep schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器的 `set_timesteps` 方法,传递自定义时间步
    scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
    # 从调度器中获取时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤数量
    num_inference_steps = len(timesteps)
# 检查是否传递了 sigma
elif sigmas is not None:
    # 检查调度器的 `set_timesteps` 方法是否接受 sigma 参数
    accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不支持,抛出错误
    if not accept_sigmas:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" sigmas schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器的 `set_timesteps` 方法,传递自定义 sigma
    scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
    # 从调度器中获取时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤数量
    num_inference_steps = len(timesteps)
# 如果不满足之前的条件,则执行以下操作
    else:
        # 设置调度器的时间步数,指定推理步骤数量和设备,传入额外参数
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取当前调度器的时间步列表
        timesteps = scheduler.timesteps
    # 返回时间步列表和推理步骤数量
    return timesteps, num_inference_steps

定义一个名为 StableDiffusionControlNetPAGPipeline 的类,继承自多个基类

class StableDiffusionControlNetPAGPipeline(
# 继承自 DiffusionPipeline 类
DiffusionPipeline,
# 继承自 StableDiffusionMixin 类
StableDiffusionMixin,
# 继承自 TextualInversionLoaderMixin 类
TextualInversionLoaderMixin,
# 继承自 StableDiffusionLoraLoaderMixin 类
StableDiffusionLoraLoaderMixin,
# 继承自 IPAdapterMixin 类
IPAdapterMixin,
# 继承自 FromSingleFileMixin 类
FromSingleFileMixin,
# 继承自 PAGMixin 类
PAGMixin,
):
r"""
用于文本到图像生成的管道,使用 Stable Diffusion 和 ControlNet 指导。

此模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。

此管道还继承以下加载方法:
    - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
    - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
    - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
    - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
    - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器

参数:
    vae ([`AutoencoderKL`] ):
        用于对图像进行编码和解码的变分自编码器 (VAE) 模型。
    text_encoder ([`~transformers.CLIPTextModel`] ):
        冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
    tokenizer ([`~transformers.CLIPTokenizer`] ):
        用于文本分词的 `CLIPTokenizer`。
    unet ([`UNet2DConditionModel`] ):
        用于去噪编码图像潜变量的 `UNet2DConditionModel`。
    controlnet ([`ControlNetModel`] 或 `List[ControlNetModel`] ):
        在去噪过程中为 `unet` 提供额外的条件。如果将多个 ControlNet 设置为列表,则每个 ControlNet 的输出将相加以创建一个组合的额外条件。
    scheduler ([`SchedulerMixin`] ):
        用于与 `unet` 一起去噪编码图像潜变量的调度器。可以是 [`DDIMScheduler`]、[`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
    safety_checker ([`StableDiffusionSafetyChecker`] ):
        分类模块,用于评估生成的图像是否可能被认为是冒犯性或有害的。
        请参考 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) 获取有关模型潜在危害的更多信息。
    feature_extractor ([`~transformers.CLIPImageProcessor`] ):
        用于从生成的图像中提取特征的 `CLIPImageProcessor`;用作 `safety_checker` 的输入。
"""

# 定义 CPU 卸载顺序,确定各组件在 CPU 卸载时的顺序
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
# 定义可选组件列表,包括安全检查器、特征提取器和图像编码器
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义不参与 CPU 卸载的组件,安全检查器不在其中
_exclude_from_cpu_offload = ["safety_checker"]
# 定义回调张量输入,包含潜变量和提示嵌入等
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
# 初始化方法,设置模型的基本参数和组件
    def __init__(
        self,
        vae: AutoencoderKL,  # 变分自编码器,用于图像生成
        text_encoder: CLIPTextModel,  # 文本编码器,将文本转为向量表示
        tokenizer: CLIPTokenizer,  # 分词器,将文本转换为token
        unet: UNet2DConditionModel,  # UNet模型,用于图像处理
        controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel],  # 控制网络,用于条件控制
        scheduler: KarrasDiffusionSchedulers,  # 调度器,控制生成过程的时间步
        safety_checker: StableDiffusionSafetyChecker,  # 安全检查器,确保生成内容符合安全标准
        feature_extractor: CLIPImageProcessor,  # 特征提取器,从图像中提取特征
        image_encoder: CLIPVisionModelWithProjection = None,  # 可选的图像编码器,用于处理图像
        requires_safety_checker: bool = True,  # 是否需要安全检查器的标志
        pag_applied_layers: Union[str, List[str]] = "mid",  # 应用的层的名称,可以是单个或多个层
    ):
        super().__init__()  # 调用父类的初始化方法

        # 检查是否禁用安全检查器,并发出警告
        if safety_checker is None and requires_safety_checker:
            logger.warning(
                f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
                " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
                " results in services or applications open to the public. Both the diffusers team and Hugging Face"
                " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
                " it only for use-cases that involve analyzing network behavior or auditing its results. For more"
                " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
            )

        # 如果安全检查器存在但特征提取器为None,抛出错误
        if safety_checker is not None and feature_extractor is None:
            raise ValueError(
                "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
                " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
            )

        # 如果控制网络是列表或元组,则转换为MultiControlNetModel
        if isinstance(controlnet, (list, tuple)):
            controlnet = MultiControlNetModel(controlnet)

        # 注册各个模块,便于管理
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            controlnet=controlnet,
            scheduler=scheduler,
            safety_checker=safety_checker,
            feature_extractor=feature_extractor,
            image_encoder=image_encoder,
        )
        # 计算VAE缩放因子,基于模型配置
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 初始化图像处理器,设置为RGB转换
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
        # 初始化控制图像处理器,禁用归一化
        self.control_image_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
        )
        # 注册配置,记录是否需要安全检查器
        self.register_to_config(requires_safety_checker=requires_safety_checker)

        # 设置应用的层
        self.set_pag_applied_layers(pag_applied_layers)

    # 从StableDiffusionPipeline类中复制的编码提示方法
# 定义编码提示的函数,接受多个参数以处理图像和提示
    def encode_prompt(
        self,
        prompt,  # 输入的提示文本
        device,  # 目标设备,例如CPU或GPU
        num_images_per_prompt,  # 每个提示生成的图像数量
        do_classifier_free_guidance,  # 是否执行无分类器引导
        negative_prompt=None,  # 可选的负面提示文本
        prompt_embeds: Optional[torch.Tensor] = None,  # 提示的嵌入表示,可选
        negative_prompt_embeds: Optional[torch.Tensor] = None,  # 负面提示的嵌入表示,可选
        lora_scale: Optional[float] = None,  # LoRA缩放因子,可选
        clip_skip: Optional[int] = None,  # 可选的剪切跳过参数
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制的函数
    def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):  # 定义编码图像的函数
        dtype = next(self.image_encoder.parameters()).dtype  # 获取图像编码器参数的数据类型

        # 检查输入是否为张量,如果不是则通过特征提取器处理
        if not isinstance(image, torch.Tensor):
            image = self.feature_extractor(image, return_tensors="pt").pixel_values

        # 将图像移动到指定设备并设置数据类型
        image = image.to(device=device, dtype=dtype)
        # 如果需要输出隐藏状态,则进行以下处理
        if output_hidden_states:
            # 编码图像并获取倒数第二个隐藏状态
            image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
            # 重复隐藏状态以匹配图像数量
            image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
            # 生成与图像大小相同的零张量,并编码以获取无条件的隐藏状态
            uncond_image_enc_hidden_states = self.image_encoder(
                torch.zeros_like(image), output_hidden_states=True
            ).hidden_states[-2]
            # 重复无条件隐藏状态以匹配图像数量
            uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                num_images_per_prompt, dim=0
            )
            # 返回编码后的隐藏状态
            return image_enc_hidden_states, uncond_image_enc_hidden_states
        else:
            # 如果不需要输出隐藏状态,则直接编码图像
            image_embeds = self.image_encoder(image).image_embeds
            # 重复图像嵌入以匹配图像数量
            image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
            # 创建与图像嵌入大小相同的零张量作为无条件嵌入
            uncond_image_embeds = torch.zeros_like(image_embeds)

            # 返回编码后的图像嵌入和无条件嵌入
            return image_embeds, uncond_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的函数
    def prepare_ip_adapter_image_embeds(
        self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance  # 准备图像适配器的图像嵌入
# 开始处理图像嵌入
    ):
        # 初始化图像嵌入列表
        image_embeds = []
        # 如果启用无分类器自由引导,则初始化负图像嵌入列表
        if do_classifier_free_guidance:
            negative_image_embeds = []
        # 检查 ip_adapter_image_embeds 是否为 None
        if ip_adapter_image_embeds is None:
            # 确保 ip_adapter_image 是一个列表
            if not isinstance(ip_adapter_image, list):
                ip_adapter_image = [ip_adapter_image]

            # 检查 ip_adapter_image 的长度是否与 IP 适配器数量相同
            if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
                raise ValueError(
                    # 抛出值错误,提示图像数量与适配器数量不符
                    f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
                )

            # 遍历每个单独的适配器图像及其对应的图像投影层
            for single_ip_adapter_image, image_proj_layer in zip(
                ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
            ):
                # 判断输出是否为隐藏状态
                output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
                # 编码图像,返回单个图像嵌入及其负嵌入
                single_image_embeds, single_negative_image_embeds = self.encode_image(
                    single_ip_adapter_image, device, 1, output_hidden_state
                )

                # 将单个图像嵌入添加到列表中
                image_embeds.append(single_image_embeds[None, :])
                # 如果启用无分类器自由引导,则添加负图像嵌入
                if do_classifier_free_guidance:
                    negative_image_embeds.append(single_negative_image_embeds[None, :])
        else:
            # 遍历已存在的图像嵌入
            for single_image_embeds in ip_adapter_image_embeds:
                # 如果启用无分类器自由引导,则分离负图像嵌入
                if do_classifier_free_guidance:
                    single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                    negative_image_embeds.append(single_negative_image_embeds)
                # 添加单个图像嵌入到列表中
                image_embeds.append(single_image_embeds)

        # 初始化最终的图像嵌入列表
        ip_adapter_image_embeds = []
        # 遍历每个图像嵌入,进行重复操作
        for i, single_image_embeds in enumerate(image_embeds):
            # 将单个图像嵌入重复指定次数
            single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
            # 如果启用无分类器自由引导,则处理负图像嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
                single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

            # 将图像嵌入移动到指定设备
            single_image_embeds = single_image_embeds.to(device=device)
            # 将最终的图像嵌入添加到列表中
            ip_adapter_image_embeds.append(single_image_embeds)

        # 返回所有的图像嵌入
        return ip_adapter_image_embeds

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
# 定义运行安全检查器的方法,接受图像、设备和数据类型作为参数
def run_safety_checker(self, image, device, dtype):
    # 如果安全检查器未初始化,则设置 nsfw 概念为 None
    if self.safety_checker is None:
        has_nsfw_concept = None
    else:
        # 检查输入图像是否为 PyTorch 张量
        if torch.is_tensor(image):
            # 将张量图像后处理为 PIL 格式以供特征提取器使用
            feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
        else:
            # 将 NumPy 数组图像转换为 PIL 格式
            feature_extractor_input = self.image_processor.numpy_to_pil(image)
        # 使用特征提取器处理图像并返回 PyTorch 张量,移动到指定设备
        safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
        # 运行安全检查器,获取处理后的图像和 nsfw 概念标志
        image, has_nsfw_concept = self.safety_checker(
            images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
        )
    # 返回处理后的图像和 nsfw 概念标志
    return image, has_nsfw_concept

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的方法,用于准备额外的步骤参数
def prepare_extra_step_kwargs(self, generator, eta):
    # 为调度器步骤准备额外的参数,因为并非所有调度器具有相同的参数签名
    # eta(η)仅在 DDIMScheduler 中使用,其他调度器将忽略它
    # eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
    # 应该在 [0, 1] 之间

    # 检查调度器的步骤是否接受 eta 参数
    accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 初始化一个字典以存储额外的步骤参数
    extra_step_kwargs = {}
    # 如果接受 eta 参数,则将其添加到字典中
    if accepts_eta:
        extra_step_kwargs["eta"] = eta

    # 检查调度器的步骤是否接受 generator 参数
    accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 如果接受 generator 参数,则将其添加到字典中
    if accepts_generator:
        extra_step_kwargs["generator"] = generator
    # 返回准备好的额外步骤参数字典
    return extra_step_kwargs

# 定义检查输入的方法,接受多个参数以验证输入的有效性
def check_inputs(
    self,
    prompt,
    image,
    negative_prompt=None,
    prompt_embeds=None,
    negative_prompt_embeds=None,
    ip_adapter_image=None,
    ip_adapter_image_embeds=None,
    controlnet_conditioning_scale=1.0,
    control_guidance_start=0.0,
    control_guidance_end=1.0,
    callback_on_step_end_tensor_inputs=None,
# 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline 复制的方法,检查图像输入
# 检查输入图像及其提示是否符合要求
def check_image(self, image, prompt, prompt_embeds):
    # 判断输入是否为 PIL 图像类型
    image_is_pil = isinstance(image, PIL.Image.Image)
    # 判断输入是否为 PyTorch 张量类型
    image_is_tensor = isinstance(image, torch.Tensor)
    # 判断输入是否为 NumPy 数组类型
    image_is_np = isinstance(image, np.ndarray)
    # 判断输入是否为 PIL 图像的列表
    image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
    # 判断输入是否为 PyTorch 张量的列表
    image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
    # 判断输入是否为 NumPy 数组的列表
    image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)

    # 如果输入不属于任何已知的图像类型,则抛出类型错误
    if (
        not image_is_pil
        and not image_is_tensor
        and not image_is_np
        and not image_is_pil_list
        and not image_is_tensor_list
        and not image_is_np_list
    ):
        raise TypeError(
            f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
        )

    # 如果输入为 PIL 图像,则图像批量大小为 1
    if image_is_pil:
        image_batch_size = 1
    else:
        # 否则,图像批量大小为输入列表的长度
        image_batch_size = len(image)

    # 如果提示不为空且为字符串,则提示批量大小为 1
    if prompt is not None and isinstance(prompt, str):
        prompt_batch_size = 1
    # 如果提示为列表,则提示批量大小为列表的长度
    elif prompt is not None and isinstance(prompt, list):
        prompt_batch_size = len(prompt)
    # 如果提示嵌入不为空,则提示批量大小为嵌入的第一维长度
    elif prompt_embeds is not None:
        prompt_batch_size = prompt_embeds.shape[0]

    # 如果图像批量大小不为 1,且与提示批量大小不一致,则抛出值错误
    if image_batch_size != 1 and image_batch_size != prompt_batch_size:
        raise ValueError(
            f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
        )

# 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.prepare_image 复制
def prepare_image(
    self,
    image,
    width,
    height,
    batch_size,
    num_images_per_prompt,
    device,
    dtype,
    do_classifier_free_guidance=False,
    guess_mode=False,
):
    # 预处理图像,并调整为指定的高度和宽度,转换为浮点类型
    image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)
    # 获取图像的批量大小
    image_batch_size = image.shape[0]

    # 如果图像批量大小为 1,则根据批量大小重复图像
    if image_batch_size == 1:
        repeat_by = batch_size
    else:
        # 否则,图像批量大小应与提示批量大小相同
        repeat_by = num_images_per_prompt

    # 根据重复因子重复图像
    image = image.repeat_interleave(repeat_by, dim=0)

    # 将图像移动到指定设备并转换为指定数据类型
    image = image.to(device=device, dtype=dtype)

    # 如果进行无分类器自由引导且不处于猜测模式,则将图像重复连接
    if do_classifier_free_guidance and not guess_mode:
        image = torch.cat([image] * 2)

    # 返回处理后的图像
    return image

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在变量,接收批量大小、通道数、图像高度、宽度、数据类型、设备、生成器及潜在变量(可选)
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
    # 根据输入参数计算潜在变量的形状
    shape = (
        batch_size,
        num_channels_latents,
        int(height) // self.vae_scale_factor,  # 根据 VAE 缩放因子调整高度
        int(width) // self.vae_scale_factor,    # 根据 VAE 缩放因子调整宽度
    )
    # 检查生成器是否为列表且长度与批量大小不匹配,若是则抛出异常
    if isinstance(generator, list) and len(generator) != batch_size:
        raise ValueError(
            f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
            f" size of {batch_size}. Make sure the batch size matches the length of the generators."
        )

    # 如果没有提供潜在变量,则生成随机的潜在变量
    if latents is None:
        latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
    else:
        # 将提供的潜在变量转移到指定设备
        latents = latents.to(device)

    # 将初始噪声按调度器要求的标准差进行缩放
    latents = latents * self.scheduler.init_noise_sigma
    # 返回处理后的潜在变量
    return latents

# 从 latent_consistency_models.pipeline_latent_consistency_text2img 模型中复制的方法,获取引导尺度嵌入
def get_guidance_scale_embedding(
    self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
    """
    查看指定链接中的实现细节

    参数:
        w (`torch.Tensor`):
            生成具有指定引导尺度的嵌入向量,以随后丰富时间步嵌入。
        embedding_dim (`int`, *可选*, 默认为 512):
            生成嵌入的维度。
        dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
            生成的嵌入的数据类型。

    返回:
        `torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
    """
    # 确保输入向量 w 为一维
    assert len(w.shape) == 1
    # 将输入向量 w 放大 1000 倍
    w = w * 1000.0

    # 计算嵌入的半维度
    half_dim = embedding_dim // 2
    # 计算嵌入的频率
    emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
    # 生成频率嵌入
    emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
    # 将 w 转换为指定 dtype 并计算最终的嵌入
    emb = w.to(dtype)[:, None] * emb[None, :]
    # 将正弦和余弦值连接起来,形成完整的嵌入
    emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
    # 如果嵌入维度为奇数,则在最后填充一个零
    if embedding_dim % 2 == 1:  # zero pad
        emb = torch.nn.functional.pad(emb, (0, 1))
    # 确保嵌入的形状符合预期
    assert emb.shape == (w.shape[0], embedding_dim)
    # 返回最终的嵌入
    return emb

# 返回当前的引导尺度
@property
def guidance_scale(self):
    return self._guidance_scale

# 返回当前的剪裁跳过值
@property
def clip_skip(self):
    return self._clip_skip

# 这里 `guidance_scale` 的定义类似于 Imagen 论文中的引导权重 `w`,当 `guidance_scale = 1`
# 表示没有进行分类器自由引导。
@property
# 定义一个方法,用于判断是否使用无分类器自由引导
    def do_classifier_free_guidance(self):
        # 返回判断:如果引导比例大于1且时间条件投影维度为None
        return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

    # 定义一个属性,返回交叉注意力的关键字参数
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs

    # 定义一个属性,返回时间步数
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 装饰器:无梯度计算
    @torch.no_grad()
    # 替换示例文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义一个可调用的方法,接受多个参数
    def __call__(
        # 提示文本,支持字符串或字符串列表
        prompt: Union[str, List[str]] = None,
        # 输入图像
        image: PipelineImageInput = None,
        # 输出图像的高度
        height: Optional[int] = None,
        # 输出图像的宽度
        width: Optional[int] = None,
        # 推理步骤数
        num_inference_steps: int = 50,
        # 时间步数列表
        timesteps: List[int] = None,
        # sigma值列表
        sigmas: List[float] = None,
        # 引导比例
        guidance_scale: float = 7.5,
        # 负提示文本,支持字符串或字符串列表
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量
        num_images_per_prompt: Optional[int] = 1,
        # eta值
        eta: float = 0.0,
        # 随机数生成器
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在表示
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 输入适配器图像
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 输入适配器图像嵌入列表
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 输出类型,默认为"PIL"
        output_type: Optional[str] = "pil",
        # 是否返回字典格式
        return_dict: bool = True,
        # 交叉注意力的关键字参数
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 控制网条件比例
        controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
        # 是否开启猜测模式
        guess_mode: bool = False,
        # 控制引导开始值
        control_guidance_start: Union[float, List[float]] = 0.0,
        # 控制引导结束值
        control_guidance_end: Union[float, List[float]] = 1.0,
        # 跳过剪辑的数量
        clip_skip: Optional[int] = None,
        # 每步结束时的回调
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 每步结束时的张量输入回调
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # pag_scale值
        pag_scale: float = 3.0,
        # pag自适应比例
        pag_adaptive_scale: float = 0.0,

# `.\diffusers\pipelines\pag\pipeline_pag_controlnet_sd_xl.py`

```py
# 版权声明,说明此代码由 HuggingFace 团队版权所有
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 您不得在未遵守许可证的情况下使用此文件。
# 您可以在以下网址获取许可证副本:
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,
# 否则根据许可证分发的软件按“原样”提供,
# 不提供任何形式的明示或暗示的担保或条件。
# 请参阅许可证,以获取有关许可和
# 限制的特定语言。

# 导入 inspect 模块,用于获取对象的信息
import inspect
# 从 typing 模块导入类型提示所需的多个类型
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 numpy 库,通常用于数组和数值计算
import numpy as np
# 导入 PIL 库中的 Image 模块,用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 从 PyTorch 中导入功能性模块
import torch.nn.functional as F
# 从 transformers 库中导入 CLIP 相关的处理器和模型
from transformers import (
    CLIPImageProcessor,  # CLIP 图像处理器
    CLIPTextModel,  # CLIP 文本模型
    CLIPTextModelWithProjection,  # 带投影的 CLIP 文本模型
    CLIPTokenizer,  # CLIP 分词器
    CLIPVisionModelWithProjection,  # 带投影的 CLIP 视觉模型
)

# 从 diffusers.utils 中导入一个工具函数,检查水印是否可用
from diffusers.utils.import_utils import is_invisible_watermark_available

# 从当前目录的回调模块中导入回调相关类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 从当前目录的图像处理模块中导入图像输入和 VAE 图像处理器
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 从当前目录的加载器模块中导入多个加载器类
from ...loaders import (
    FromSingleFileMixin,  # 单文件加载混合器
    IPAdapterMixin,  # IP 适配器混合器
    StableDiffusionXLLoraLoaderMixin,  # 稳定扩散 XL Lora 加载器混合器
    TextualInversionLoaderMixin,  # 文本反转加载器混合器
)
# 从当前目录的模型模块中导入多种模型类
from ...models import AutoencoderKL, ControlNetModel, ImageProjection, UNet2DConditionModel
# 从当前目录的注意力处理器模块中导入多个注意力处理器
from ...models.attention_processor import (
    AttnProcessor2_0,  # 版本 2.0 的注意力处理器
    XFormersAttnProcessor,  # XFormers 注意力处理器
)
# 从当前目录的 Lora 模型中导入 Lora 相关函数
from ...models.lora import adjust_lora_scale_text_encoder
# 从当前目录的调度器模块中导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从当前目录的工具模块中导入多个工具函数和常量
from ...utils import (
    USE_PEFT_BACKEND,  # 使用 PEFT 后端的标志
    logging,  # 日志记录模块
    replace_example_docstring,  # 替换示例文档字符串的函数
    scale_lora_layers,  # 缩放 Lora 层的函数
    unscale_lora_layers,  # 取消缩放 Lora 层的函数
)
# 从当前目录的 torch_utils 模块中导入 PyTorch 相关的工具函数
from ...utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor
# 从当前目录的管道工具模块中导入扩散管道和稳定扩散混合器
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从稳定扩散 XL 的管道输出模块中导入输出类
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput
# 从当前目录的 pag_utils 模块中导入 PAG 相关混合器
from .pag_utils import PAGMixin

# 如果不可见水印可用,则导入相应的水印处理器
if is_invisible_watermark_available():
    from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker

# 从控制网模块中导入多控制网模型
from ..controlnet.multicontrolnet import MultiControlNetModel

# 创建日志记录器,用于记录模块中的日志信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义示例文档字符串,用于说明代码示例的格式
EXAMPLE_DOC_STRING = """
# 示例代码
    Examples:
        ```py
        >>> # 安装所需库
        >>> # !pip install opencv-python transformers accelerate
        >>> # 导入所需的库
        >>> from diffusers import AutoPipelineForText2Image, ControlNetModel, AutoencoderKL
        >>> from diffusers.utils import load_image
        >>> import numpy as np
        >>> import torch

        >>> import cv2
        >>> from PIL import Image

        >>> # 设置生成图像的提示信息
        >>> prompt = "aerial view, a futuristic research complex in a bright foggy jungle, hard lighting"
        >>> # 设置负面提示信息
        >>> negative_prompt = "low quality, bad quality, sketches"

        >>> # 下载一张图像
        >>> image = load_image(
        ...     "https://hf.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/hf-logo.png"
        ... )

        >>> # 初始化模型和管道
        >>> controlnet_conditioning_scale = 0.5  # 推荐用于良好的泛化
        >>> # 从预训练模型加载 ControlNet
        >>> controlnet = ControlNetModel.from_pretrained(
        ...     "diffusers/controlnet-canny-sdxl-1.0", torch_dtype=torch.float16
        ... )
        >>> # 从预训练模型加载自动编码器
        >>> vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16)
        >>> # 从预训练模型加载文本到图像的自动管道
        >>> pipe = AutoPipelineForText2Image.from_pretrained(
        ...     "stabilityai/stable-diffusion-xl-base-1.0",
        ...     controlnet=controlnet,
        ...     vae=vae,
        ...     torch_dtype=torch.float16,
        ...     enable_pag=True,
        ... )
        >>> # 启用模型的 CPU 内存卸载
        >>> pipe.enable_model_cpu_offload()

        >>> # 获取 Canny 边缘检测图像
        >>> image = np.array(image)
        >>> # 应用 Canny 边缘检测
        >>> image = cv2.Canny(image, 100, 200)
        >>> # 增加维度以适应图像格式
        >>> image = image[:, :, None]
        >>> # 将单通道图像扩展为三通道
        >>> image = np.concatenate([image, image, image], axis=2)
        >>> # 从数组创建图像对象
        >>> canny_image = Image.fromarray(image)

        >>> # 生成图像
        >>> image = pipe(
        ...     prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=canny_image, pag_scale=0.3
        ... ).images[0]

从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 导入的函数

def retrieve_timesteps(
# 调度器对象,用于获取时间步
scheduler,
# 用于生成样本的推理步骤数量,可选
num_inference_steps: Optional[int] = None,
# 指定时间步移动的设备,可选
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步,可选
timesteps: Optional[List[int]] = None,
# 自定义 sigma 值,可选
sigmas: Optional[List[float]] = None,
# 额外的关键字参数
**kwargs,
):
"""
调用调度器的 set_timesteps 方法并在调用后从调度器获取时间步。处理自定义时间步。
任何关键字参数将被传递给 scheduler.set_timesteps

参数:
    scheduler (`SchedulerMixin`):
        要获取时间步的调度器。
    num_inference_steps (`int`):
        用于生成样本的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
    device (`str` 或 `torch.device`, *可选*):
        要移动时间步的设备。如果为 `None`,则不移动时间步。
    timesteps (`List[int]`, *可选*):
        自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
    sigmas (`List[float]`, *可选*):
        自定义 sigma 值,用于覆盖调度器的时间步间隔策略。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

返回:
    `Tuple[torch.Tensor, int]`: 一个元组,其中第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
"""
# 检查是否同时传入了时间步和 sigma 值
if timesteps is not None and sigmas is not None:
    raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传入了时间步
if timesteps is not None:
    # 检查调度器的 `set_timesteps` 方法是否接受时间步参数
    accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不支持时间步,则抛出错误
    if not accepts_timesteps:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" timestep schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器的 `set_timesteps` 方法设置时间步
    scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
    # 从调度器获取设置后的时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤的数量
    num_inference_steps = len(timesteps)
# 如果传入了 sigma 值
elif sigmas is not None:
    # 检查调度器的 `set_timesteps` 方法是否接受 sigma 参数
    accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
    # 如果不支持 sigma,则抛出错误
    if not accept_sigmas:
        raise ValueError(
            f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
            f" sigmas schedules. Please check whether you are using the correct scheduler."
        )
    # 调用调度器的 `set_timesteps` 方法设置 sigma 值
    scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
    # 从调度器获取设置后的时间步
    timesteps = scheduler.timesteps
    # 计算推理步骤的数量
    num_inference_steps = len(timesteps)
# 如果不是第一个条件的情况
    else:
        # 设置推理步骤的时间步,指定设备和其他参数
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取调度器中的时间步
        timesteps = scheduler.timesteps
    # 返回时间步和推理步骤的数量
    return timesteps, num_inference_steps

定义一个名为 StableDiffusionXLControlNetPAGPipeline 的类,继承多个混合类

class StableDiffusionXLControlNetPAGPipeline(
# 继承自 DiffusionPipeline 类,提供扩散管道的基本功能
DiffusionPipeline,
# 继承自 StableDiffusionMixin 类,增加稳定扩散相关功能
StableDiffusionMixin,
# 继承自 TextualInversionLoaderMixin 类,用于加载文本反演嵌入
TextualInversionLoaderMixin,
# 继承自 StableDiffusionXLLoraLoaderMixin 类,用于加载和保存 LoRA 权重
StableDiffusionXLLoraLoaderMixin,
# 继承自 IPAdapterMixin 类,用于加载 IP 适配器
IPAdapterMixin,
# 继承自 FromSingleFileMixin 类,用于从单个文件加载模型
FromSingleFileMixin,
# 继承自 PAGMixin 类,提供与 PAG 相关的功能
PAGMixin,
):
# 文档字符串,描述此类的用途
r"""
使用 Stable Diffusion XL 和 ControlNet 引导进行文本到图像生成的管道。

此模型继承自 [`DiffusionPipeline`]。请查看超类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。

该管道还继承了以下加载方法:
    - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反演嵌入
    - [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
    - [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
    - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
    - [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
"""
# 函数参数说明
Args:
    # 变分自编码器模型,用于编码和解码图像与潜在表示之间的转换
    vae ([`AutoencoderKL`]):
        Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
    # 冻结的文本编码器,用于处理文本输入
    text_encoder ([`~transformers.CLIPTextModel`]):
        Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
    # 第二个冻结文本编码器,提供更多文本处理能力
    text_encoder_2 ([`~transformers.CLIPTextModelWithProjection`]):
        Second frozen text-encoder
        ([laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)).
    # 用于将文本标记化的 CLIPTokenizer
    tokenizer ([`~transformers.CLIPTokenizer`]):
        A `CLIPTokenizer` to tokenize text.
    # 另一个用于文本标记化的 CLIPTokenizer
    tokenizer_2 ([`~transformers.CLIPTokenizer`]):
        A `CLIPTokenizer` to tokenize text.
    # 用于去噪编码后图像潜在表示的 UNet 模型
    unet ([`UNet2DConditionModel`]):
        A `UNet2DConditionModel` to denoise the encoded image latents.
    # 提供额外条件给 UNet 的 ControlNet 模型,可以是单个或多个
    controlnet ([`ControlNetModel`] or `List[ControlNetModel]`):
        Provides additional conditioning to the `unet` during the denoising process. If you set multiple
        ControlNets as a list, the outputs from each ControlNet are added together to create one combined
        additional conditioning.
    # 与 UNet 配合使用的调度器,用于去噪图像潜在表示
    scheduler ([`SchedulerMixin`]):
        A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
        [`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
    # 是否始终将负面提示嵌入设置为0的布尔值,默认为真
    force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
        Whether the negative prompt embeddings should always be set to 0. Also see the config of
        `stabilityai/stable-diffusion-xl-base-1-0`.
    # 是否使用隐形水印库在输出图像上添加水印的布尔值
    add_watermarker (`bool`, *optional*):
        Whether to use the [invisible_watermark](https://github.com/ShieldMnt/invisible-watermark/) library to
        watermark output images. If not defined, it defaults to `True` if the package is installed; otherwise no
        watermarker is used.
"""

# 有意不包括 controlnet,因为它与 unet 迭代
model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
# 可选组件列表,包括不同的编码器和标记器
_optional_components = [
    "tokenizer",
    "tokenizer_2",
    "text_encoder",
    "text_encoder_2",
    "feature_extractor",
    "image_encoder",
]
# 回调张量输入的列表,包含潜在向量和嵌入
_callback_tensor_inputs = [
    "latents",
    "prompt_embeds",
    "negative_prompt_embeds",
    "add_text_embeds",
    "add_time_ids",
    "negative_pooled_prompt_embeds",
    "negative_add_time_ids",
]
# 初始化方法,定义对象的基本属性和参数
    def __init__(
        self,
        vae: AutoencoderKL,  # 自动编码器模型
        text_encoder: CLIPTextModel,  # 文本编码器模型
        text_encoder_2: CLIPTextModelWithProjection,  # 第二个文本编码器,带投影功能
        tokenizer: CLIPTokenizer,  # 用于文本分词的工具
        tokenizer_2: CLIPTokenizer,  # 第二个分词工具
        unet: UNet2DConditionModel,  # UNet模型,用于生成任务
        controlnet: Union[ControlNetModel, List[ControlNetModel], Tuple[ControlNetModel], MultiControlNetModel],  # 控制网络模型,可以是单个或多个模型
        scheduler: KarrasDiffusionSchedulers,  # 用于调度的扩散调度器
        force_zeros_for_empty_prompt: bool = True,  # 控制是否对空提示强制零
        add_watermarker: Optional[bool] = None,  # 可选参数,用于添加水印
        feature_extractor: CLIPImageProcessor = None,  # 可选的图像特征提取器
        image_encoder: CLIPVisionModelWithProjection = None,  # 可选的图像编码器,带投影功能
        pag_applied_layers: Union[str, List[str]] = "mid",  # 应用的层,默认为“mid”
    ):
        super().__init__()  # 调用父类的初始化方法

        # 如果 controlnet 是列表或元组,将其转换为 MultiControlNetModel
        if isinstance(controlnet, (list, tuple)):
            controlnet = MultiControlNetModel(controlnet)

        # 注册模型组件到当前对象
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            text_encoder_2=text_encoder_2,
            tokenizer=tokenizer,
            tokenizer_2=tokenizer_2,
            unet=unet,
            controlnet=controlnet,
            scheduler=scheduler,
            feature_extractor=feature_extractor,
            image_encoder=image_encoder,
        )
        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 初始化图像处理器,用于 VAE
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
        # 初始化控制图像处理器,用于 VAE
        self.control_image_processor = VaeImageProcessor(
            vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
        )
        # 确定是否添加水印,如果未提供则根据可用性设置
        add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()

        # 根据参数决定是否创建水印对象
        if add_watermarker:
            self.watermark = StableDiffusionXLWatermarker()  # 创建水印对象
        else:
            self.watermark = None  # 不创建水印对象

        # 将配置注册到当前对象
        self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
        # 设置应用的层
        self.set_pag_applied_layers(pag_applied_layers)

    # 从外部库复制的编码提示方法
    def encode_prompt(
        self,
        prompt: str,  # 主提示字符串
        prompt_2: Optional[str] = None,  # 可选的第二个提示字符串
        device: Optional[torch.device] = None,  # 可选的设备参数
        num_images_per_prompt: int = 1,  # 每个提示生成的图像数量
        do_classifier_free_guidance: bool = True,  # 是否使用无分类器引导
        negative_prompt: Optional[str] = None,  # 可选的负提示字符串
        negative_prompt_2: Optional[str] = None,  # 可选的第二个负提示字符串
        prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入张量
        negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负提示嵌入张量
        pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的池化提示嵌入张量
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的池化负提示嵌入张量
        lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
        clip_skip: Optional[int] = None,  # 可选的跳过剪辑层参数
    # 从外部库复制的编码图像方法
# 定义编码图像的函数,输入包括图像、设备、每个提示的图像数量和可选的隐藏状态输出
    def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
        # 获取图像编码器参数的数据类型
        dtype = next(self.image_encoder.parameters()).dtype

        # 如果输入图像不是张量,则使用特征提取器将其转换为张量
        if not isinstance(image, torch.Tensor):
            image = self.feature_extractor(image, return_tensors="pt").pixel_values

        # 将图像移动到指定设备并转换为所需的数据类型
        image = image.to(device=device, dtype=dtype)
        # 如果需要输出隐藏状态,则获取编码后的隐藏状态
        if output_hidden_states:
            # 通过图像编码器编码图像并获取倒数第二层的隐藏状态
            image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
            # 重复隐藏状态以匹配每个提示的图像数量
            image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
            # 对于未条件的图像,通过零张量编码并获取隐藏状态
            uncond_image_enc_hidden_states = self.image_encoder(
                torch.zeros_like(image), output_hidden_states=True
            ).hidden_states[-2]
            # 重复未条件隐藏状态以匹配每个提示的图像数量
            uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
                num_images_per_prompt, dim=0
            )
            # 返回编码后的图像和未条件的图像隐藏状态
            return image_enc_hidden_states, uncond_image_enc_hidden_states
        else:
            # 如果不需要输出隐藏状态,则获取编码后的图像嵌入
            image_embeds = self.image_encoder(image).image_embeds
            # 重复图像嵌入以匹配每个提示的图像数量
            image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
            # 创建与图像嵌入相同形状的零张量作为未条件的图像嵌入
            uncond_image_embeds = torch.zeros_like(image_embeds)

            # 返回编码后的图像嵌入和未条件的图像嵌入
            return image_embeds, uncond_image_embeds

    # 从 StableDiffusionPipeline 类中复制的函数,用于准备 IP 适配器的图像嵌入
    def prepare_ip_adapter_image_embeds(
        self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
    # 初始化一个空列表,用于存储图像嵌入
    image_embeds = []
    # 如果启用分类器自由引导,则初始化一个空列表,用于存储负图像嵌入
    if do_classifier_free_guidance:
        negative_image_embeds = []
    # 如果输入适配器的图像嵌入为空
    if ip_adapter_image_embeds is None:
        # 如果输入适配器图像不是列表,则将其转换为列表
        if not isinstance(ip_adapter_image, list):
            ip_adapter_image = [ip_adapter_image]

        # 检查输入适配器图像的长度是否与 IP 适配器的层数匹配
        if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
            # 如果长度不匹配,抛出值错误
            raise ValueError(
                f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
            )

        # 遍历输入适配器图像和对应的图像投影层
        for single_ip_adapter_image, image_proj_layer in zip(
            ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
        ):
            # 判断当前图像投影层是否是 ImageProjection 类型,输出隐藏状态的标志
            output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
            # 对单个输入适配器图像进行编码,获取嵌入
            single_image_embeds, single_negative_image_embeds = self.encode_image(
                single_ip_adapter_image, device, 1, output_hidden_state
            )

            # 将单个图像嵌入添加到图像嵌入列表中,并扩展维度
            image_embeds.append(single_image_embeds[None, :])
            # 如果启用分类器自由引导,添加负图像嵌入
            if do_classifier_free_guidance:
                negative_image_embeds.append(single_negative_image_embeds[None, :])
    else:
        # 如果输入适配器的图像嵌入不为空,遍历这些嵌入
        for single_image_embeds in ip_adapter_image_embeds:
            # 如果启用分类器自由引导,将嵌入拆分为负嵌入和正嵌入
            if do_classifier_free_guidance:
                single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
                # 添加负图像嵌入
                negative_image_embeds.append(single_negative_image_embeds)
            # 添加正图像嵌入
            image_embeds.append(single_image_embeds)

    # 初始化一个空列表,用于存储最终的适配器图像嵌入
    ip_adapter_image_embeds = []
    # 遍历每个图像嵌入及其索引
    for i, single_image_embeds in enumerate(image_embeds):
        # 将单个图像嵌入扩展到 num_images_per_prompt 的数量
        single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
        # 如果启用分类器自由引导,将负图像嵌入扩展
        if do_classifier_free_guidance:
            single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
            # 将负图像嵌入和正图像嵌入合并
            single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)

        # 将图像嵌入移动到指定设备
        single_image_embeds = single_image_embeds.to(device=device)
        # 将处理后的图像嵌入添加到最终列表中
        ip_adapter_image_embeds.append(single_image_embeds)

    # 返回适配器图像嵌入的列表
    return ip_adapter_image_embeds

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
# 准备额外的参数以供调度器步骤使用,因为并非所有调度器具有相同的参数签名
def prepare_extra_step_kwargs(self, generator, eta):
    # 检查调度器的步骤函数是否接受 eta 参数
    accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 初始化额外参数字典
    extra_step_kwargs = {}
    # 如果接受 eta,则将其添加到字典中
    if accepts_eta:
        extra_step_kwargs["eta"] = eta

    # 检查调度器的步骤函数是否接受 generator 参数
    accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
    # 如果接受 generator,则将其添加到字典中
    if accepts_generator:
        extra_step_kwargs["generator"] = generator
    # 返回额外参数字典
    return extra_step_kwargs

# 从 diffusers.pipelines.controlnet.pipeline_controlnet_sd_xl.StableDiffusionXLControlNetPipeline.check_inputs 复制
def check_inputs(
    self,
    prompt,
    prompt_2,
    image,
    callback_steps,
    negative_prompt=None,
    negative_prompt_2=None,
    prompt_embeds=None,
    negative_prompt_embeds=None,
    pooled_prompt_embeds=None,
    ip_adapter_image=None,
    ip_adapter_image_embeds=None,
    negative_pooled_prompt_embeds=None,
    controlnet_conditioning_scale=1.0,
    control_guidance_start=0.0,
    control_guidance_end=1.0,
    callback_on_step_end_tensor_inputs=None,
    # 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.check_image 复制
# 检查图像的类型和尺寸,确保其与提示的批量大小一致
    def check_image(self, image, prompt, prompt_embeds):
        # 判断图像是否为 PIL 图片类型
        image_is_pil = isinstance(image, PIL.Image.Image)
        # 判断图像是否为 Torch 张量类型
        image_is_tensor = isinstance(image, torch.Tensor)
        # 判断图像是否为 NumPy 数组类型
        image_is_np = isinstance(image, np.ndarray)
        # 判断图像是否为 PIL 图片的列表
        image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
        # 判断图像是否为 Torch 张量的列表
        image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
        # 判断图像是否为 NumPy 数组的列表
        image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)

        # 如果图像不是以上任何一种类型,则抛出类型错误
        if (
            not image_is_pil
            and not image_is_tensor
            and not image_is_np
            and not image_is_pil_list
            and not image_is_tensor_list
            and not image_is_np_list
        ):
            raise TypeError(
                f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
            )

        # 如果图像是 PIL 图片,设置批量大小为 1
        if image_is_pil:
            image_batch_size = 1
        else:
            # 否则,批量大小为图像的长度
            image_batch_size = len(image)

        # 如果提示不为空且为字符串,设置提示的批量大小为 1
        if prompt is not None and isinstance(prompt, str):
            prompt_batch_size = 1
        # 如果提示为列表,设置提示的批量大小为列表的长度
        elif prompt is not None and isinstance(prompt, list):
            prompt_batch_size = len(prompt)
        # 如果提示嵌入不为空,设置批量大小为提示嵌入的形状的第一维
        elif prompt_embeds is not None:
            prompt_batch_size = prompt_embeds.shape[0]

        # 如果图像批量大小不为 1 且与提示批量大小不相等,抛出值错误
        if image_batch_size != 1 and image_batch_size != prompt_batch_size:
            raise ValueError(
                f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
            )

    # 处理图像以适应模型输入的准备过程
    def prepare_image(
        self,
        image,
        width,
        height,
        batch_size,
        num_images_per_prompt,
        device,
        dtype,
        do_classifier_free_guidance=False,
        guess_mode=False,
    ):
        # 预处理图像,调整大小并转换为指定的数据类型
        image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)
        # 获取处理后图像的批量大小
        image_batch_size = image.shape[0]

        # 如果图像批量大小为 1,重复次数为 batch_size
        if image_batch_size == 1:
            repeat_by = batch_size
        else:
            # 否则,重复次数为每个提示的图像数量
            repeat_by = num_images_per_prompt

        # 按照重复次数扩展图像维度
        image = image.repeat_interleave(repeat_by, dim=0)

        # 将图像转移到指定的设备和数据类型
        image = image.to(device=device, dtype=dtype)

        # 如果启用分类器自由引导且不在猜测模式下,将图像重复两次
        if do_classifier_free_guidance and not guess_mode:
            image = torch.cat([image] * 2)

        # 返回处理后的图像
        return image

    # 复制自 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents
# 准备潜在向量,定义形状和其他参数
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义潜在向量的形状,考虑批量大小和维度缩放
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器列表长度是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果没有提供潜在向量,则生成随机潜在向量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 将现有潜在向量转移到指定设备
            latents = latents.to(device)

        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在向量
        return latents

    # 从 diffusers.pipelines.stable_diffusion_xl 复制的方法,获取添加的时间ID
    def _get_add_time_ids(
        self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
    ):
        # 创建包含原始大小、裁剪坐标和目标大小的时间ID列表
        add_time_ids = list(original_size + crops_coords_top_left + target_size)

        # 计算通过添加时间嵌入维度的总维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取模型期望的添加时间嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查通过的维度与期望维度是否匹配
        if expected_add_embed_dim != passed_add_embed_dim:
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将添加时间ID转换为指定类型的张量
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 返回添加时间ID的张量
        return add_time_ids

    # 从 diffusers.pipelines.latent_consistency_models 复制的方法,提升 VAE 精度
    def upcast_vae(self):
        # 获取 VAE 的数据类型
        dtype = self.vae.dtype
        # 将 VAE 转换为 float32 类型
        self.vae.to(dtype=torch.float32)
        # 检查是否使用 Torch 2.0 或 XFormers 处理器
        use_torch_2_0_or_xformers = isinstance(
            self.vae.decoder.mid_block.attentions[0].processor,
            (
                AttnProcessor2_0,
                XFormersAttnProcessor,
            ),
        )
        # 如果使用了 XFormers 或 Torch 2.0,则无需将注意力块保持为 float32,可以节省大量内存
        if use_torch_2_0_or_xformers:
            # 将各个部分转换为之前保存的 dtype
            self.vae.post_quant_conv.to(dtype)
            self.vae.decoder.conv_in.to(dtype)
            self.vae.decoder.mid_block.to(dtype)

    # 从 diffusers.pipelines.latent_consistency_models 复制的方法,获取指导缩放嵌入
# 定义获取指导尺度嵌入的函数,接受输入张量 w,嵌入维度和数据类型
def get_guidance_scale_embedding(
        self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
    ) -> torch.Tensor:
        # 文档字符串,提供函数的链接和参数说明
        """
        See https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298

        Args:
            w (`torch.Tensor`):
                Generate embedding vectors with a specified guidance scale to subsequently enrich timestep embeddings.
            embedding_dim (`int`, *optional*, defaults to 512):
                Dimension of the embeddings to generate.
            dtype (`torch.dtype`, *optional*, defaults to `torch.float32`):
                Data type of the generated embeddings.

        Returns:
            `torch.Tensor`: Embedding vectors with shape `(len(w), embedding_dim)`.
        """
        # 确保输入张量 w 是一维的
        assert len(w.shape) == 1
        # 将 w 乘以 1000.0,以调整指导尺度
        w = w * 1000.0

        # 计算嵌入维度的一半
        half_dim = embedding_dim // 2
        # 计算指数衰减的常数
        emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
        # 生成衰减的嵌入向量
        emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
        # 根据输入张量和嵌入向量生成最终的嵌入
        emb = w.to(dtype)[:, None] * emb[None, :]
        # 将正弦和余弦值拼接成最终的嵌入
        emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
        # 如果嵌入维度为奇数,则进行零填充
        if embedding_dim % 2 == 1:  # zero pad
            emb = torch.nn.functional.pad(emb, (0, 1))
        # 确保输出的嵌入形状符合预期
        assert emb.shape == (w.shape[0], embedding_dim)
        # 返回生成的嵌入张量
        return emb

    # 定义指导尺度的属性,返回内部变量
    @property
    def guidance_scale(self):
        return self._guidance_scale

    # 定义跳过剪辑的属性,返回内部变量
    @property
    def clip_skip(self):
        return self._clip_skip

    # 定义是否进行无分类器指导的属性,基于指导尺度的值
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None

    # 定义交叉注意力的参数属性,返回内部变量
    @property
    def cross_attention_kwargs(self):
        return self._cross_attention_kwargs

    # 定义去噪结束的属性,返回内部变量
    @property
    def denoising_end(self):
        return self._denoising_end

    # 定义时间步数的属性,返回内部变量
    @property
    def num_timesteps(self):
        return self._num_timesteps

    # 禁用梯度计算装饰器,确保在不计算梯度的情况下运行
    @torch.no_grad()
    # 替换示例文档字符串的装饰器
    @replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用方法,允许对象被当作函数使用
    def __call__(
        # 提示文本,可以是字符串或字符串列表
        self,
        prompt: Union[str, List[str]] = None,
        # 第二个提示文本,默认为 None
        prompt_2: Optional[Union[str, List[str]]] = None,
        # 输入图像,类型为 PipelineImageInput
        image: PipelineImageInput = None,
        # 输出图像高度,默认为 None
        height: Optional[int] = None,
        # 输出图像宽度,默认为 None
        width: Optional[int] = None,
        # 推理步骤数,默认为 50
        num_inference_steps: int = 50,
        # 时间步列表,默认为 None
        timesteps: List[int] = None,
        # σ 值列表,默认为 None
        sigmas: List[float] = None,
        # 去噪结束值,默认为 None
        denoising_end: Optional[float] = None,
        # 指导比例,默认为 5.0
        guidance_scale: float = 5.0,
        # 负提示文本,默认为 None
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 第二个负提示文本,默认为 None
        negative_prompt_2: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为 1
        num_images_per_prompt: Optional[int] = 1,
        # η 值,默认为 0.0
        eta: float = 0.0,
        # 随机数生成器,默认为 None
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在表示,默认为 None
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入,默认为 None
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负提示嵌入,默认为 None
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 池化的提示嵌入,默认为 None
        pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 负池化提示嵌入,默认为 None
        negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
        # 输入适配器图像,默认为 None
        ip_adapter_image: Optional[PipelineImageInput] = None,
        # 输入适配器图像嵌入列表,默认为 None
        ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
        # 输出类型,默认为 "pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典,默认为 True
        return_dict: bool = True,
        # 交叉注意力的参数,默认为 None
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # ControlNet 条件缩放,默认为 1.0
        controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
        # 控制指导开始值,默认为 0.0
        control_guidance_start: Union[float, List[float]] = 0.0,
        # 控制指导结束值,默认为 1.0
        control_guidance_end: Union[float, List[float]] = 1.0,
        # 原始图像大小,默认为 None
        original_size: Tuple[int, int] = None,
        # 裁剪坐标的左上角,默认为 (0, 0)
        crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 目标图像大小,默认为 None
        target_size: Tuple[int, int] = None,
        # 负原始图像大小,默认为 None
        negative_original_size: Optional[Tuple[int, int]] = None,
        # 负裁剪坐标的左上角,默认为 (0, 0)
        negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
        # 负目标图像大小,默认为 None
        negative_target_size: Optional[Tuple[int, int]] = None,
        # 剪切跳过的数量,默认为 None
        clip_skip: Optional[int] = None,
        # 步骤结束时的回调函数,默认为 None
        callback_on_step_end: Optional[
            Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
        ] = None,
        # 结束时张量输入的回调,默认为 ["latents"]
        callback_on_step_end_tensor_inputs: List[str] = ["latents"],
        # PAG 缩放,默认为 3.0
        pag_scale: float = 3.0,
        # 自适应 PAG 缩放,默认为 0.0
        pag_adaptive_scale: float = 0.0,

# `.\diffusers\pipelines\pag\pipeline_pag_hunyuandit.py`

```py
# 版权声明,注明代码作者及版权信息
# Copyright 2024 HunyuanDiT Authors and The HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0 授权该文件的使用
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 说明如何获取授权副本
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 说明在适用法律情况下的免责条款
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 参见许可证关于权限和限制的具体说明
# See the License for the specific language governing permissions and
# limitations under the License.

# 导入 inspect 模块以检查对象的签名和文档
import inspect
# 从 typing 模块导入类型提示
from typing import Callable, Dict, List, Optional, Tuple, Union

# 导入 numpy 用于数值计算
import numpy as np
# 导入 torch 用于深度学习操作
import torch
# 导入 transformers 中的相关模型和工具
from transformers import BertModel, BertTokenizer, CLIPImageProcessor, MT5Tokenizer, T5EncoderModel

# 从 diffusers 导入 StableDiffusionPipelineOutput,用于稳定扩散输出
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput

# 导入回调函数相关的模块
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入图像处理器
from ...image_processor import VaeImageProcessor
# 导入模型定义
from ...models import AutoencoderKL, HunyuanDiT2DModel
# 导入注意力处理器
from ...models.attention_processor import PAGCFGHunyuanAttnProcessor2_0, PAGHunyuanAttnProcessor2_0
# 导入嵌入相关功能
from ...models.embeddings import get_2d_rotary_pos_embed
# 导入安全检查器
from ...pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker
# 导入调度器
from ...schedulers import DDPMScheduler
# 导入实用工具
from ...utils import (
    is_torch_xla_available,
    logging,
    replace_example_docstring,
)
# 从工具模块导入随机生成张量的功能
from ...utils.torch_utils import randn_tensor
# 导入扩散管道工具
from ..pipeline_utils import DiffusionPipeline
# 导入 PAGMixin 模块
from .pag_utils import PAGMixin

# 检查是否可用 torch_xla 库,用于分布式训练
if is_torch_xla_available():
    import torch_xla.core.xla_model as xm  # 导入 XLA 相关功能

    XLA_AVAILABLE = True  # 设置 XLA 可用标志为 True
else:
    XLA_AVAILABLE = False  # 设置 XLA 可用标志为 False

# 设置日志记录器,获取当前模块的日志记录器
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义示例文档字符串,展示如何使用管道
EXAMPLE_DOC_STRING = """
    Examples:
        ```python
        >>> import torch
        >>> from diffusers import AutoPipelineForText2Image

        >>> pipe = AutoPipelineForText2Image.from_pretrained(
        ...     "Tencent-Hunyuan/HunyuanDiT-v1.2-Diffusers",
        ...     torch_dtype=torch.float16,
        ...     enable_pag=True,
        ...     pag_applied_layers=[14],
        ... ).to("cuda")

        >>> # prompt = "an astronaut riding a horse"
        >>> prompt = "一个宇航员在骑马"
        >>> image = pipe(prompt, guidance_scale=4, pag_scale=3).images[0]
        ```py
"""

# 定义标准比例,包含多种常见的宽高比
STANDARD_RATIO = np.array(
    [
        1.0,  # 1:1
        4.0 / 3.0,  # 4:3
        3.0 / 4.0,  # 3:4
        16.0 / 9.0,  # 16:9
        9.0 / 16.0,  # 9:16
    ]
)
# 定义标准形状,包含不同宽高比下的尺寸组合
STANDARD_SHAPE = [
    [(1024, 1024), (1280, 1280)],  # 1:1
    [(1024, 768), (1152, 864), (1280, 960)],  # 4:3
    [(768, 1024), (864, 1152), (960, 1280)],  # 3:4
    [(1280, 768)],  # 16:9
    [(768, 1280)],  # 9:16
]
# 计算每种形状的面积,存储在标准面积列表中
STANDARD_AREA = [np.array([w * h for w, h in shapes]) for shapes in STANDARD_SHAPE]
# 定义支持的形状,包含常见的图像尺寸
SUPPORTED_SHAPE = [
    (1024, 1024),
    (1280, 1280),  # 1:1
    (1024, 768),
    (1152, 864),
    (1280, 960),  # 4:3
    (768, 1024),
    # 定义一个宽度为 864,高度为 1152 的尺寸元组
    (864, 1152),
    # 定义一个宽度为 960,高度为 1280 的尺寸元组,比例为 3:4
    (960, 1280),  # 3:4
    # 定义一个宽度为 1280,高度为 768 的尺寸元组,比例为 16:9
    (1280, 768),  # 16:9
    # 定义一个宽度为 768,高度为 1280 的尺寸元组,比例为 9:16
    (768, 1280),  # 9:16
]


# 将目标宽度和高度映射到标准形状
def map_to_standard_shapes(target_width, target_height):
    # 计算目标的宽高比
    target_ratio = target_width / target_height
    # 找到与目标宽高比最接近的标准宽高比的索引
    closest_ratio_idx = np.argmin(np.abs(STANDARD_RATIO - target_ratio))
    # 找到与目标面积最接近的标准面积的索引
    closest_area_idx = np.argmin(np.abs(STANDARD_AREA[closest_ratio_idx] - target_width * target_height))
    # 根据索引获取对应的标准宽度和高度
    width, height = STANDARD_SHAPE[closest_ratio_idx][closest_area_idx]
    # 返回标准宽度和高度
    return width, height


# 根据源图像尺寸和目标尺寸计算裁剪区域
def get_resize_crop_region_for_grid(src, tgt_size):
    # 目标尺寸的高度和宽度
    th = tw = tgt_size
    # 源图像的高度和宽度
    h, w = src

    # 计算源图像的宽高比
    r = h / w

    # 根据宽高比决定如何调整尺寸
    # 如果高度大于宽度,按高度调整
    if r > 1:
        resize_height = th  # 设置调整后的高度为目标高度
        resize_width = int(round(th / h * w))  # 根据比例计算调整后的宽度
    else:
        resize_width = tw  # 设置调整后的宽度为目标宽度
        resize_height = int(round(tw / w * h))  # 根据比例计算调整后的高度

    # 计算裁剪区域的上边和左边
    crop_top = int(round((th - resize_height) / 2.0))  # 计算裁剪区域的上边界
    crop_left = int(round((tw - resize_width) / 2.0))  # 计算裁剪区域的左边界

    # 返回裁剪区域的坐标
    return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制而来
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
    """
    根据 guidance_rescale 调整 `noise_cfg` 的尺度。基于文献 [Common Diffusion Noise Schedules and
    Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 的研究。见第 3.4 节。
    """
    # 计算噪声预测文本的标准差
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算噪声配置的标准差
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 根据文本标准差和配置标准差调整噪声预测结果(修复过度曝光)
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 将调整后的噪声与原始噪声按指导比例混合,以避免图像“平淡”
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回调整后的噪声配置
    return noise_cfg


class HunyuanDiTPAGPipeline(DiffusionPipeline, PAGMixin):
    r"""
    使用 HunyuanDiT 和 [Perturbed Attention
    Guidance](https://huggingface.co/docs/diffusers/en/using-diffusers/pag) 的中英图像生成管道。

    此模型继承自 [`DiffusionPipeline`]。有关库为所有管道实现的通用方法(例如下载或保存、在特定设备上运行等),请查看父类文档。

    HunyuanDiT 使用两个文本编码器:[mT5](https://huggingface.co/google/mt5-base) 和 [双语 CLIP](由我们自行微调)。
    # 参数说明
        Args:
            vae ([`AutoencoderKL`]):
                变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。我们使用
                `sdxl-vae-fp16-fix`。
            text_encoder (Optional[`~transformers.BertModel`, `~transformers.CLIPTextModel`]):
                冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
                HunyuanDiT 使用经过微调的 [双语 CLIP]。
            tokenizer (Optional[`~transformers.BertTokenizer`, `~transformers.CLIPTokenizer`]):
                用于分词的 `BertTokenizer` 或 `CLIPTokenizer`。
            transformer ([`HunyuanDiT2DModel`]):
                腾讯 Hunyuan 设计的 HunyuanDiT 模型。
            text_encoder_2 (`T5EncoderModel`):
                mT5 嵌入器,具体为 't5-v1_1-xxl'。
            tokenizer_2 (`MT5Tokenizer`):
                mT5 嵌入器的分词器。
            scheduler ([`DDPMScheduler`]):
                与 HunyuanDiT 结合使用的调度器,用于对编码的图像潜在进行去噪。
        """
    
        # 模型的 CPU 卸载顺序
        model_cpu_offload_seq = "text_encoder->text_encoder_2->transformer->vae"
        # 可选组件列表
        _optional_components = [
            "safety_checker",
            "feature_extractor",
            "text_encoder_2",
            "tokenizer_2",
            "text_encoder",
            "tokenizer",
        ]
        # 从 CPU 卸载中排除的组件
        _exclude_from_cpu_offload = ["safety_checker"]
        # 回调张量输入列表
        _callback_tensor_inputs = [
            "latents",
            "prompt_embeds",
            "negative_prompt_embeds",
            "prompt_embeds_2",
            "negative_prompt_embeds_2",
        ]
    
        # 初始化函数定义
        def __init__(
            self,
            vae: AutoencoderKL,  # 传入的 VAE 模型
            text_encoder: BertModel,  # 文本编码器
            tokenizer: BertTokenizer,  # 文本分词器
            transformer: HunyuanDiT2DModel,  # HunyuanDiT 模型
            scheduler: DDPMScheduler,  # 调度器
            safety_checker: Optional[StableDiffusionSafetyChecker] = None,  # 可选的安全检查器
            feature_extractor: Optional[CLIPImageProcessor] = None,  # 可选的特征提取器
            requires_safety_checker: bool = True,  # 是否需要安全检查器的标志
            text_encoder_2: Optional[T5EncoderModel] = None,  # 可选的第二文本编码器
            tokenizer_2: Optional[MT5Tokenizer] = None,  # 可选的第二分词器
            pag_applied_layers: Union[str, List[str]] = "blocks.1",  # 应用层的字符串或列表
    # 初始化父类构造函数
        ):
            super().__init__()
    
            # 注册各个模块到当前对象中
            self.register_modules(
                vae=vae,  # 注册变分自编码器
                text_encoder=text_encoder,  # 注册文本编码器
                tokenizer=tokenizer,  # 注册分词器
                tokenizer_2=tokenizer_2,  # 注册第二个分词器
                transformer=transformer,  # 注册变换器
                scheduler=scheduler,  # 注册调度器
                safety_checker=safety_checker,  # 注册安全检查器
                feature_extractor=feature_extractor,  # 注册特征提取器
                text_encoder_2=text_encoder_2,  # 注册第二个文本编码器
            )
    
            # 如果安全检查器为 None 且需要安全检查器,则发出警告
            if safety_checker is None and requires_safety_checker:
                logger.warning(
                    f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
                    " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
                    " results in services or applications open to the public. Both the diffusers team and Hugging Face"
                    " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
                    " it only for use-cases that involve analyzing network behavior or auditing its results. For more"
                    " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
                )
    
            # 如果安全检查器不为 None 但特征提取器为 None,则引发错误
            if safety_checker is not None and feature_extractor is None:
                raise ValueError(
                    "Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
                    " checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
                )
    
            # 设置 VAE 的缩放因子,若 VAE 存在则取其配置中的通道数
            self.vae_scale_factor = (
                2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
            )
            # 创建图像处理器实例
            self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
            # 将是否需要安全检查器的配置注册到当前对象
            self.register_to_config(requires_safety_checker=requires_safety_checker)
            # 设置默认的样本大小,若变换器存在则使用其配置中的样本大小
            self.default_sample_size = (
                self.transformer.config.sample_size
                if hasattr(self, "transformer") and self.transformer is not None
                else 128
            )
    
            # 设置应用的层及其注意力处理器
            self.set_pag_applied_layers(
                pag_applied_layers, pag_attn_processors=(PAGCFGHunyuanAttnProcessor2_0(), PAGHunyuanAttnProcessor2_0())
            )
    
        # 从 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.HunyuanDiTPipeline 复制的 encode_prompt 方法
        def encode_prompt(
            self,
            prompt: str,  # 要编码的提示文本
            device: torch.device = None,  # 设备类型(如 CPU 或 GPU)
            dtype: torch.dtype = None,  # 数据类型
            num_images_per_prompt: int = 1,  # 每个提示生成的图像数量
            do_classifier_free_guidance: bool = True,  # 是否执行无分类器引导
            negative_prompt: Optional[str] = None,  # 负面提示文本
            prompt_embeds: Optional[torch.Tensor] = None,  # 提示文本的嵌入表示
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 负面提示的嵌入表示
            prompt_attention_mask: Optional[torch.Tensor] = None,  # 提示的注意力掩码
            negative_prompt_attention_mask: Optional[torch.Tensor] = None,  # 负面提示的注意力掩码
            max_sequence_length: Optional[int] = None,  # 最大序列长度
            text_encoder_index: int = 0,  # 文本编码器的索引
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的代码
    def run_safety_checker(self, image, device, dtype):
        # 检查安全检查器是否存在
        if self.safety_checker is None:
            # 如果不存在,设置无 NSFW 概念标志为 None
            has_nsfw_concept = None
        else:
            # 如果输入是张量格式
            if torch.is_tensor(image):
                # 将图像处理后转为 PIL 格式
                feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
            else:
                # 如果输入不是张量,则将其转为 PIL 格式
                feature_extractor_input = self.image_processor.numpy_to_pil(image)
            # 使用特征提取器处理图像并将其移动到指定设备
            safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
            # 使用安全检查器处理图像并获取是否包含 NSFW 概念的标志
            image, has_nsfw_concept = self.safety_checker(
                images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
            )
        # 返回处理后的图像及 NSFW 概念标志
        return image, has_nsfw_concept

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的代码
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的参数,因为并非所有调度器都有相同的参数签名
        # eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略。
        # eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
        # 值应在 [0, 1] 范围内

        # 检查调度器的步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外参数字典
        extra_step_kwargs = {}
        if accepts_eta:
            # 如果接受 eta,添加到额外参数字典中
            extra_step_kwargs["eta"] = eta

        # 检查调度器的步骤是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:
            # 如果接受 generator,添加到额外参数字典中
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外参数字典
        return extra_step_kwargs

    # 从 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.HunyuanDiTPipeline.check_inputs 复制的代码
    def check_inputs(
        self,
        prompt,
        height,
        width,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        prompt_attention_mask=None,
        negative_prompt_attention_mask=None,
        prompt_embeds_2=None,
        negative_prompt_embeds_2=None,
        prompt_attention_mask_2=None,
        negative_prompt_attention_mask_2=None,
        callback_on_step_end_tensor_inputs=None,
    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
    # 准备潜在变量,用于模型的生成过程
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 定义潜在变量的形状,基于输入参数
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器列表的长度是否与批处理大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )
    
        # 如果没有提供潜在变量,则生成随机潜在变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 将现有潜在变量转移到指定设备
            latents = latents.to(device)
    
        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents
    
    # 返回引导缩放的属性值
    @property
    def guidance_scale(self):
        return self._guidance_scale
    
    # 返回引导重缩放的属性值
    @property
    def guidance_rescale(self):
        return self._guidance_rescale
    
    # 根据Imagen论文定义的分类器自由引导的标志
    @property
    def do_classifier_free_guidance(self):
        return self._guidance_scale > 1
    
    # 返回时间步数的属性值
    @property
    def num_timesteps(self):
        return self._num_timesteps
    
    # 返回中断状态的属性值
    @property
    def interrupt(self):
        return self._interrupt
    
    # 关闭梯度计算,优化性能
    @torch.no_grad()
    # 替换示例文档字符串的装饰器
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义一个可调用的方法,接收多个参数以生成图像
        def __call__(
            self,
            # 提示文本,可以是字符串或字符串列表
            prompt: Union[str, List[str]] = None,
            # 图像的高度,默认为 None
            height: Optional[int] = None,
            # 图像的宽度,默认为 None
            width: Optional[int] = None,
            # 推理步骤的数量,默认为 50
            num_inference_steps: Optional[int] = 50,
            # 指导缩放比例,默认为 5.0
            guidance_scale: Optional[float] = 5.0,
            # 负提示文本,可以是字符串或字符串列表
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: Optional[int] = 1,
            # 额外的随机性控制,默认为 0.0
            eta: Optional[float] = 0.0,
            # 随机数生成器,可以是单个或多个
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 潜在变量,可以是一个张量
            latents: Optional[torch.Tensor] = None,
            # 提示嵌入,可以是一个张量
            prompt_embeds: Optional[torch.Tensor] = None,
            # 第二组提示嵌入,可以是一个张量
            prompt_embeds_2: Optional[torch.Tensor] = None,
            # 负提示嵌入,可以是一个张量
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 第二组负提示嵌入,可以是一个张量
            negative_prompt_embeds_2: Optional[torch.Tensor] = None,
            # 提示的注意力掩码,可以是一个张量
            prompt_attention_mask: Optional[torch.Tensor] = None,
            # 第二组提示的注意力掩码,可以是一个张量
            prompt_attention_mask_2: Optional[torch.Tensor] = None,
            # 负提示的注意力掩码,可以是一个张量
            negative_prompt_attention_mask: Optional[torch.Tensor] = None,
            # 第二组负提示的注意力掩码,可以是一个张量
            negative_prompt_attention_mask_2: Optional[torch.Tensor] = None,
            # 输出类型,默认为 "pil"
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果,默认为 True
            return_dict: bool = True,
            # 步骤结束时的回调函数
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 回调时输入的张量名称列表,默认为 ["latents"]
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 指导重标定值,默认为 0.0
            guidance_rescale: float = 0.0,
            # 原始图像大小,默认为 (1024, 1024)
            original_size: Optional[Tuple[int, int]] = (1024, 1024),
            # 目标图像大小,默认为 None
            target_size: Optional[Tuple[int, int]] = None,
            # 裁剪区域的左上角坐标,默认为 (0, 0)
            crops_coords_top_left: Tuple[int, int] = (0, 0),
            # 是否使用分辨率分箱,默认为 True
            use_resolution_binning: bool = True,
            # 页面缩放比例,默认为 3.0
            pag_scale: float = 3.0,
            # 自适应页面缩放比例,默认为 0.0
            pag_adaptive_scale: float = 0.0,