diffusers-源码解析-五十四-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(五十四)

.\diffusers\pipelines\unclip\pipeline_unclip_image_variation.py

# 版权所有 2024 Kakao Brain 和 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,软件在许可证下分发时是按“原样”提供的,
# 不附有任何明示或暗示的担保或条件。
# 有关许可证下特定权利和限制,请参阅许可证。

import inspect  # 导入 inspect 模块以获取有关活跃对象的信息
from typing import List, Optional, Union  # 从 typing 模块导入类型提示工具

import PIL.Image  # 导入 PIL.Image 以处理图像文件
import torch  # 导入 PyTorch 库以进行张量运算
from torch.nn import functional as F  # 导入 PyTorch 的功能性 API 以进行各种神经网络操作
from transformers import (  # 从 transformers 库导入必要的模型和处理器
    CLIPImageProcessor,  # 导入 CLIP 图像处理器
    CLIPTextModelWithProjection,  # 导入 CLIP 文本模型,带有投影
    CLIPTokenizer,  # 导入 CLIP 分词器
    CLIPVisionModelWithProjection,  # 导入 CLIP 视觉模型,带有投影
)

from ...models import UNet2DConditionModel, UNet2DModel  # 从相对路径导入 UNet 模型
from ...schedulers import UnCLIPScheduler  # 从相对路径导入 UnCLIP 调度器
from ...utils import logging  # 从相对路径导入 logging 工具
from ...utils.torch_utils import randn_tensor  # 从相对路径导入随机张量生成工具
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput  # 从相对路径导入 DiffusionPipeline 和 ImagePipelineOutput
from .text_proj import UnCLIPTextProjModel  # 从当前目录导入 UnCLIP 文本投影模型


logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器实例; pylint 禁用无效名称警告


class UnCLIPImageVariationPipeline(DiffusionPipeline):  # 定义 UnCLIP 图像变体生成管道类,继承自 DiffusionPipeline
    """
    使用 UnCLIP 从输入图像生成图像变体的管道。

    该模型继承自 [`DiffusionPipeline`]。有关所有管道通用方法的文档(下载、保存、在特定设备上运行等),请查看超类文档。
    # 参数说明
    Args:
        text_encoder ([`~transformers.CLIPTextModelWithProjection`]):
            # 冻结的文本编码器
            Frozen text-encoder.
        tokenizer ([`~transformers.CLIPTokenizer`]):
            # 用于对文本进行分词的 CLIPTokenizer
            A `CLIPTokenizer` to tokenize text.
        feature_extractor ([`~transformers.CLIPImageProcessor`]):
            # 从生成的图像中提取特征以作为图像编码器的输入的模型
            Model that extracts features from generated images to be used as inputs for the `image_encoder`.
        image_encoder ([`~transformers.CLIPVisionModelWithProjection`]):
            # 冻结的 CLIP 图像编码器,使用 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)
            Frozen CLIP image-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
        text_proj ([`UnCLIPTextProjModel`]):
            # 准备和组合嵌入的工具类,嵌入将传递给解码器
            Utility class to prepare and combine the embeddings before they are passed to the decoder.
        decoder ([`UNet2DConditionModel`]):
            # 将图像嵌入反转为图像的解码器
            The decoder to invert the image embedding into an image.
        super_res_first ([`UNet2DModel`]):
            # 超分辨率 UNet,用于超分辨率扩散过程的所有步骤,除了最后一步
            Super resolution UNet. Used in all but the last step of the super resolution diffusion process.
        super_res_last ([`UNet2DModel`]):
            # 超分辨率 UNet,用于超分辨率扩散过程的最后一步
            Super resolution UNet. Used in the last step of the super resolution diffusion process.
        decoder_scheduler ([`UnCLIPScheduler`]):
            # 在解码器去噪过程中使用的调度器(修改后的 [`DDPMScheduler`])
            Scheduler used in the decoder denoising process (a modified [`DDPMScheduler`]).
        super_res_scheduler ([`UnCLIPScheduler`]):
            # 在超分辨率去噪过程中使用的调度器(修改后的 [`DDPMScheduler`])
            Scheduler used in the super resolution denoising process (a modified [`DDPMScheduler`]).
    """

    # 定义解码器的类型
    decoder: UNet2DConditionModel
    # 定义文本嵌入的处理模型
    text_proj: UnCLIPTextProjModel
    # 定义文本编码器的类型
    text_encoder: CLIPTextModelWithProjection
    # 定义分词器的类型
    tokenizer: CLIPTokenizer
    # 定义特征提取器的类型
    feature_extractor: CLIPImageProcessor
    # 定义图像编码器的类型
    image_encoder: CLIPVisionModelWithProjection
    # 定义超分辨率模型(第一步)
    super_res_first: UNet2DModel
    # 定义超分辨率模型(最后一步)
    super_res_last: UNet2DModel

    # 定义解码器和超分辨率过程中的调度器
    decoder_scheduler: UnCLIPScheduler
    super_res_scheduler: UnCLIPScheduler
    # 定义模型的 CPU 卸载顺序,表示组件之间的执行顺序
    model_cpu_offload_seq = "text_encoder->image_encoder->text_proj->decoder->super_res_first->super_res_last"

    # 初始化方法
    def __init__(
        # 定义解码器的输入参数
        self,
        decoder: UNet2DConditionModel,
        # 定义文本编码器的输入参数
        text_encoder: CLIPTextModelWithProjection,
        # 定义分词器的输入参数
        tokenizer: CLIPTokenizer,
        # 定义文本嵌入处理的输入参数
        text_proj: UnCLIPTextProjModel,
        # 定义特征提取器的输入参数
        feature_extractor: CLIPImageProcessor,
        # 定义图像编码器的输入参数
        image_encoder: CLIPVisionModelWithProjection,
        # 定义超分辨率模型(第一步)的输入参数
        super_res_first: UNet2DModel,
        # 定义超分辨率模型(最后一步)的输入参数
        super_res_last: UNet2DModel,
        # 定义解码器调度器的输入参数
        decoder_scheduler: UnCLIPScheduler,
        # 定义超分辨率调度器的输入参数
        super_res_scheduler: UnCLIPScheduler,
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 注册各个模块,便于管理和调用
        self.register_modules(
            decoder=decoder,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            text_proj=text_proj,
            feature_extractor=feature_extractor,
            image_encoder=image_encoder,
            super_res_first=super_res_first,
            super_res_last=super_res_last,
            decoder_scheduler=decoder_scheduler,
            super_res_scheduler=super_res_scheduler,
        )

    # 从 diffusers.pipelines.unclip.pipeline_unclip.UnCLIPPipeline.prepare_latents 复制而来
    # 准备潜在变量,生成或处理给定形状的潜在张量
        def prepare_latents(self, shape, dtype, device, generator, latents, scheduler):
            # 如果潜在张量为 None,则随机生成一个具有指定形状的张量
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 如果给定的潜在张量形状与期望的形状不匹配,则引发错误
                if latents.shape != shape:
                    raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
                # 将潜在张量移动到指定设备
                latents = latents.to(device)
    
            # 将潜在张量乘以调度器的初始噪声标准差
            latents = latents * scheduler.init_noise_sigma
            # 返回处理后的潜在张量
            return latents
    
        # 编码图像以生成图像嵌入
        def _encode_image(self, image, device, num_images_per_prompt, image_embeddings: Optional[torch.Tensor] = None):
            # 获取图像编码器参数的数据类型
            dtype = next(self.image_encoder.parameters()).dtype
    
            # 如果没有提供图像嵌入,则进行图像处理
            if image_embeddings is None:
                # 如果输入的图像不是张量,则使用特征提取器处理图像
                if not isinstance(image, torch.Tensor):
                    image = self.feature_extractor(images=image, return_tensors="pt").pixel_values
    
                # 将图像张量移动到指定设备,并转换为正确的数据类型
                image = image.to(device=device, dtype=dtype)
                # 通过图像编码器生成图像嵌入
                image_embeddings = self.image_encoder(image).image_embeds
    
            # 将图像嵌入按指定的提示数量进行重复
            image_embeddings = image_embeddings.repeat_interleave(num_images_per_prompt, dim=0)
    
            # 返回生成的图像嵌入
            return image_embeddings
    
        # 定义调用方法,禁用梯度计算以节省内存
        @torch.no_grad()
        def __call__(
            # 接收可选的输入图像,可以是单个图像、图像列表或张量
            image: Optional[Union[PIL.Image.Image, List[PIL.Image.Image], torch.Tensor]] = None,
            # 每个提示生成的图像数量
            num_images_per_prompt: int = 1,
            # 解码器推理步骤的数量
            decoder_num_inference_steps: int = 25,
            # 超分辨率推理步骤的数量
            super_res_num_inference_steps: int = 7,
            # 可选的随机数生成器
            generator: Optional[torch.Generator] = None,
            # 可选的解码器潜在张量
            decoder_latents: Optional[torch.Tensor] = None,
            # 可选的超分辨率潜在张量
            super_res_latents: Optional[torch.Tensor] = None,
            # 可选的图像嵌入
            image_embeddings: Optional[torch.Tensor] = None,
            # 解码器引导尺度
            decoder_guidance_scale: float = 8.0,
            # 输出类型,默认为 "pil"
            output_type: Optional[str] = "pil",
            # 是否返回字典格式的结果
            return_dict: bool = True,

.\diffusers\pipelines\unclip\text_proj.py

# 版权声明,标明版权归 Kakao Brain 和 HuggingFace Team 所有
# 
# 根据 Apache 许可证第 2.0 版("许可证")授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获得许可证副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律或书面协议另有规定,依据该许可证分发的软件是以“原样”基础分发的,
# 不提供任何明示或暗示的担保或条件。
# 有关特定权限和限制,请参见许可证。

# 导入 PyTorch 库
import torch
# 从 PyTorch 导入神经网络模块
from torch import nn

# 从配置工具模块导入 ConfigMixin 和 register_to_config
from ...configuration_utils import ConfigMixin, register_to_config
# 从模型模块导入 ModelMixin
from ...models import ModelMixin


class UnCLIPTextProjModel(ModelMixin, ConfigMixin):
    """
    CLIP 嵌入的工具类。用于将图像和文本嵌入组合成解码器可用的格式。

    更多详细信息,请参见原始论文: https://arxiv.org/abs/2204.06125 第 2.1 节
    """

    @register_to_config
    # 初始化函数,设置模型的参数
    def __init__(
        self,
        *,
        clip_extra_context_tokens: int = 4,  # 额外上下文令牌的数量,默认为 4
        clip_embeddings_dim: int = 768,       # CLIP 嵌入的维度,默认为 768
        time_embed_dim: int,                   # 时间嵌入的维度
        cross_attention_dim,                   # 交叉注意力的维度
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 学习的无分类器自由引导嵌入参数,初始化为零
        self.learned_classifier_free_guidance_embeddings = nn.Parameter(torch.zeros(clip_embeddings_dim))

        # 为额外的 CLIP 时间嵌入设置线性变换
        self.embedding_proj = nn.Linear(clip_embeddings_dim, time_embed_dim)
        # 将 CLIP 图像嵌入转换为时间嵌入的线性变换
        self.clip_image_embeddings_project_to_time_embeddings = nn.Linear(clip_embeddings_dim, time_embed_dim)

        # 为编码器的隐藏状态参数
        self.clip_extra_context_tokens = clip_extra_context_tokens  # 保存额外上下文令牌的数量
        # 将 CLIP 嵌入映射到交叉注意力维度的线性变换
        self.clip_extra_context_tokens_proj = nn.Linear(
            clip_embeddings_dim, self.clip_extra_context_tokens * cross_attention_dim
        )
        # 将 CLIP 嵌入映射到编码器隐藏状态的线性变换
        self.encoder_hidden_states_proj = nn.Linear(clip_embeddings_dim, cross_attention_dim)
        # 对编码器隐藏状态进行层归一化
        self.text_encoder_hidden_states_norm = nn.LayerNorm(cross_attention_dim)
    # 定义前向传播方法,接受图像嵌入、提示嵌入、文本编码器隐藏状态和分类器自由引导标志
        def forward(self, *, image_embeddings, prompt_embeds, text_encoder_hidden_states, do_classifier_free_guidance):
            # 如果启用了分类器自由引导
            if do_classifier_free_guidance:
                # 获取图像嵌入的批次大小
                image_embeddings_batch_size = image_embeddings.shape[0]
                # 扩展学习到的分类器自由引导嵌入,以匹配批次大小
                classifier_free_guidance_embeddings = self.learned_classifier_free_guidance_embeddings.unsqueeze(0)
                classifier_free_guidance_embeddings = classifier_free_guidance_embeddings.expand(
                    image_embeddings_batch_size, -1
                )
                # 将分类器自由引导嵌入与图像嵌入拼接
                image_embeddings = torch.cat([classifier_free_guidance_embeddings, image_embeddings], dim=0)
    
            # 确保图像嵌入和提示嵌入的批次大小相等
            assert image_embeddings.shape[0] == prompt_embeds.shape[0]
    
            # 获取批次大小
            batch_size = prompt_embeds.shape[0]
    
            # 修改架构,通过投影并添加 CLIP 嵌入到现有时间步嵌入
            time_projected_prompt_embeds = self.embedding_proj(prompt_embeds)
            time_projected_image_embeddings = self.clip_image_embeddings_project_to_time_embeddings(image_embeddings)
            # 计算添加的 CLIP 时间嵌入
            additive_clip_time_embeddings = time_projected_image_embeddings + time_projected_prompt_embeds
    
            # 投影 CLIP 嵌入到四个额外的上下文标记,并与 GLIDE 文本编码器的输出序列拼接
            clip_extra_context_tokens = self.clip_extra_context_tokens_proj(image_embeddings)
            clip_extra_context_tokens = clip_extra_context_tokens.reshape(batch_size, -1, self.clip_extra_context_tokens)
            clip_extra_context_tokens = clip_extra_context_tokens.permute(0, 2, 1)
    
            # 对文本编码器隐藏状态进行投影和归一化
            text_encoder_hidden_states = self.encoder_hidden_states_proj(text_encoder_hidden_states)
            text_encoder_hidden_states = self.text_encoder_hidden_states_norm(text_encoder_hidden_states)
            # 将额外的上下文标记与文本编码器隐藏状态拼接
            text_encoder_hidden_states = torch.cat([clip_extra_context_tokens, text_encoder_hidden_states], dim=1)
    
            # 返回文本编码器隐藏状态和添加的 CLIP 时间嵌入
            return text_encoder_hidden_states, additive_clip_time_embeddings

.\diffusers\pipelines\unclip\__init__.py

# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING

# 从 utils 模块导入多个工具函数和类
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 导入慢速导入标志
    OptionalDependencyNotAvailable,  # 导入可选依赖不可用异常
    _LazyModule,  # 导入延迟加载模块的工具
    is_torch_available,  # 导入检查 PyTorch 是否可用的函数
    is_transformers_available,  # 导入检查 Transformers 是否可用的函数
    is_transformers_version,  # 导入检查 Transformers 版本的函数
)

# 初始化一个空字典用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存储导入结构
_import_structure = {}

# 尝试进行依赖检查
try:
    # 检查 Transformers 和 PyTorch 是否可用,并且 Transformers 版本是否大于等于 4.25.0
    if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
        # 如果检查不通过,抛出可选依赖不可用异常
        raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用异常
except OptionalDependencyNotAvailable:
    # 从虚拟对象模块中导入虚拟类
    from ...utils.dummy_torch_and_transformers_objects import UnCLIPImageVariationPipeline, UnCLIPPipeline

    # 更新虚拟对象字典,添加虚拟类
    _dummy_objects.update(
        {"UnCLIPImageVariationPipeline": UnCLIPImageVariationPipeline, "UnCLIPPipeline": UnCLIPPipeline}
    )
# 如果没有抛出异常
else:
    # 在导入结构中添加 UnCLIPPipeline 的路径
    _import_structure["pipeline_unclip"] = ["UnCLIPPipeline"]
    # 在导入结构中添加 UnCLIPImageVariationPipeline 的路径
    _import_structure["pipeline_unclip_image_variation"] = ["UnCLIPImageVariationPipeline"]
    # 在导入结构中添加 UnCLIPTextProjModel 的路径
    _import_structure["text_proj"] = ["UnCLIPTextProjModel"]

# 如果处于类型检查状态或慢速导入标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 尝试进行依赖检查
    try:
        # 检查 Transformers 和 PyTorch 是否可用,并且 Transformers 版本是否大于等于 4.25.0
        if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
            # 如果检查不通过,抛出可选依赖不可用异常
            raise OptionalDependencyNotAvailable()
    # 捕获可选依赖不可用异常
    except OptionalDependencyNotAvailable:
        # 从虚拟对象模块中导入所有虚拟类,忽略 F403 错误
        from ...utils.dummy_torch_and_transformers_objects import *  # noqa F403
    # 如果没有抛出异常
    else:
        # 从 pipeline_unclip 模块导入 UnCLIPPipeline
        from .pipeline_unclip import UnCLIPPipeline
        # 从 pipeline_unclip_image_variation 模块导入 UnCLIPImageVariationPipeline
        from .pipeline_unclip_image_variation import UnCLIPImageVariationPipeline
        # 从 text_proj 模块导入 UnCLIPTextProjModel
        from .text_proj import UnCLIPTextProjModel

# 如果不处于类型检查状态且慢速导入标志为假
else:
    # 导入 sys 模块
    import sys

    # 使用 _LazyModule 创建延迟加载模块,替换当前模块
    sys.modules[__name__] = _LazyModule(
        __name__,
        globals()["__file__"],  # 获取当前文件的全局变量
        _import_structure,  # 导入结构
        module_spec=__spec__,  # 模块规格
    )
    # 遍历虚拟对象字典,将虚拟对象添加到当前模块中
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\unidiffuser\modeling_text_decoder.py

# 从 typing 模块导入 Optional 类型
from typing import Optional

# 导入 numpy 库并命名为 np
import numpy as np
# 导入 PyTorch 库
import torch
# 从 PyTorch 中导入 nn 模块
from torch import nn
# 从 transformers 库导入 GPT2Config 和 GPT2LMHeadModel
from transformers import GPT2Config, GPT2LMHeadModel
# 从 transformers.modeling_utils 导入 ModuleUtilsMixin 类
from transformers.modeling_utils import ModuleUtilsMixin

# 从上层目录导入 ConfigMixin 和 register_to_config
from ...configuration_utils import ConfigMixin, register_to_config
# 从上层目录导入 ModelMixin
from ...models import ModelMixin


# 从 https://github.com/thu-ml/unidiffuser/blob/main/libs/caption_decoder.py 修改而来
class UniDiffuserTextDecoder(ModelMixin, ConfigMixin, ModuleUtilsMixin):
    """
    用于图像-文本 [UniDiffuser](https://arxiv.org/pdf/2303.06555.pdf) 模型的文本解码器。此模型用于
    从 UniDiffuser 图像-文本嵌入生成文本。
    """

    # 在加载时忽略的意外键
    _keys_to_ignore_on_load_unexpected = [r"h\.\d+\.attn\.bias", r"h\.\d+\.attn\.masked_bias"]

    # 注册为配置的初始化方法
    @register_to_config
    def __init__(
        # 前缀长度参数
        self,
        prefix_length: int,
        # 前缀内部维度参数
        prefix_inner_dim: int,
        # 可选的前缀隐藏维度参数
        prefix_hidden_dim: Optional[int] = None,
        # 词汇表大小,默认是 GPT2 配置参数
        vocab_size: int = 50257,  # Start of GPT2 config args
        # 位置数量参数
        n_positions: int = 1024,
        # 嵌入维度参数
        n_embd: int = 768,
        # 层数参数
        n_layer: int = 12,
        # 注意力头数量参数
        n_head: int = 12,
        # 可选的内部维度参数
        n_inner: Optional[int] = None,
        # 激活函数类型,默认是 "gelu_new"
        activation_function: str = "gelu_new",
        # 残差丢弃率参数
        resid_pdrop: float = 0.1,
        # 嵌入丢弃率参数
        embd_pdrop: float = 0.1,
        # 注意力丢弃率参数
        attn_pdrop: float = 0.1,
        # 层归一化的 epsilon 值
        layer_norm_epsilon: float = 1e-5,
        # 初始化范围参数
        initializer_range: float = 0.02,
        # 是否缩放注意力权重
        scale_attn_weights: bool = True,
        # 是否使用缓存
        use_cache: bool = True,
        # 是否按层索引的倒数缩放注意力
        scale_attn_by_inverse_layer_idx: bool = False,
        # 是否重排和上溯注意力
        reorder_and_upcast_attn: bool = False,
    ):
        # 初始化父类
        super().__init__()

        # 设置前缀长度
        self.prefix_length = prefix_length

        # 检查前缀内维度与嵌入维度是否一致,且前缀隐藏维度是否为 None
        if prefix_inner_dim != n_embd and prefix_hidden_dim is None:
            # 抛出错误,提示前缀隐藏维度不能为 None
            raise ValueError(
                f"`prefix_hidden_dim` cannot be `None` when `prefix_inner_dim`: {prefix_hidden_dim} and"
                f" `n_embd`: {n_embd} are not equal."
            )

        # 设置前缀内维度
        self.prefix_inner_dim = prefix_inner_dim
        # 设置前缀隐藏维度
        self.prefix_hidden_dim = prefix_hidden_dim

        # 创建编码前缀的线性层,如果前缀隐藏维度不为 None
        self.encode_prefix = (
            nn.Linear(self.prefix_inner_dim, self.prefix_hidden_dim)
            if self.prefix_hidden_dim is not None
            else nn.Identity()
        )
        # 创建解码前缀的线性层,如果前缀隐藏维度不为 None
        self.decode_prefix = (
            nn.Linear(self.prefix_hidden_dim, n_embd) if self.prefix_hidden_dim is not None else nn.Identity()
        )

        # 配置 GPT2 模型的参数
        gpt_config = GPT2Config(
            vocab_size=vocab_size,
            n_positions=n_positions,
            n_embd=n_embd,
            n_layer=n_layer,
            n_head=n_head,
            n_inner=n_inner,
            activation_function=activation_function,
            resid_pdrop=resid_pdrop,
            embd_pdrop=embd_pdrop,
            attn_pdrop=attn_pdrop,
            layer_norm_epsilon=layer_norm_epsilon,
            initializer_range=initializer_range,
            scale_attn_weights=scale_attn_weights,
            use_cache=use_cache,
            scale_attn_by_inverse_layer_idx=scale_attn_by_inverse_layer_idx,
            reorder_and_upcast_attn=reorder_and_upcast_attn,
        )
        # 创建 GPT2 语言模型头
        self.transformer = GPT2LMHeadModel(gpt_config)

    def forward(
        # 定义前向传播的输入参数
        input_ids: torch.Tensor,
        prefix_embeds: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
    ):
        """
        Args:
            input_ids (`torch.Tensor` of shape `(N, max_seq_len)`):
                用于推理的文本标记。
            prefix_embeds (`torch.Tensor` of shape `(N, prefix_length, 768)`):
                预先附加到嵌入标记的前缀嵌入。
            attention_mask (`torch.Tensor` of shape `(N, prefix_length + max_seq_len, 768)`, *optional*):
                前缀嵌入的注意力掩码。
            labels (`torch.Tensor`, *optional*):
                用于语言建模的标签。
        """
        # 获取输入标记的嵌入表示
        embedding_text = self.transformer.transformer.wte(input_ids)
        # 编码前缀嵌入
        hidden = self.encode_prefix(prefix_embeds)
        # 解码前缀嵌入
        prefix_embeds = self.decode_prefix(hidden)
        # 拼接前缀嵌入和文本嵌入
        embedding_cat = torch.cat((prefix_embeds, embedding_text), dim=1)

        # 如果标签不为 None,处理标签
        if labels is not None:
            # 获取虚拟标记,用于标签拼接
            dummy_token = self.get_dummy_token(input_ids.shape[0], input_ids.device)
            # 拼接虚拟标记和输入标记
            labels = torch.cat((dummy_token, input_ids), dim=1)
        # 使用 Transformer 进行前向传播
        out = self.transformer(inputs_embeds=embedding_cat, labels=labels, attention_mask=attention_mask)
        # 如果前缀隐藏维度不为 None,返回输出和隐藏状态
        if self.prefix_hidden_dim is not None:
            return out, hidden
        else:
            # 否则只返回输出
            return out
    # 获取一个全零的张量,形状为 (batch_size, prefix_length),用于生成虚拟的输入令牌
        def get_dummy_token(self, batch_size: int, device: torch.device) -> torch.Tensor:
            return torch.zeros(batch_size, self.prefix_length, dtype=torch.int64, device=device)
    
    # 编码给定的前缀,调用编码前缀的方法
        def encode(self, prefix):
            return self.encode_prefix(prefix)
    
    # 生成文本描述,采用无梯度计算以节省内存
        @torch.no_grad()
        def generate_captions(self, features, eos_token_id, device):
            """
            根据文本嵌入特征生成描述,返回字符串列表。
    
            参数:
                features (`torch.Tensor`,形状为 `(B, L, D)`):
                    用于生成描述的文本嵌入特征。
                eos_token_id (`int`):
                    文本解码模型的 EOS 令牌 ID。
                device:
                    进行文本生成的设备。
    
            返回:
                `List[str]`: 从解码模型生成的字符串列表。
            """
    
            # 将特征张量在第0维度分割成多个单独的特征
            features = torch.split(features, 1, dim=0)
            generated_tokens = []  # 存储生成的令牌
            generated_seq_lengths = []  # 存储生成序列的长度
            for feature in features:
                # 解码前缀特征,转换为 CLIP 特征
                feature = self.decode_prefix(feature.to(device))
                # 当前仅支持束搜索
                output_tokens, seq_lengths = self.generate_beam(
                    input_embeds=feature, device=device, eos_token_id=eos_token_id
                )
                # 添加生成的第一个令牌和序列长度到列表
                generated_tokens.append(output_tokens[0])
                generated_seq_lengths.append(seq_lengths[0])
            # 将生成的令牌和序列长度堆叠成张量
            generated_tokens = torch.stack(generated_tokens)
            generated_seq_lengths = torch.stack(generated_seq_lengths)
            # 返回生成的令牌和序列长度
            return generated_tokens, generated_seq_lengths
    
    # 生成束搜索的描述,采用无梯度计算以节省内存
        @torch.no_grad()
        def generate_beam(
            self,
            input_ids=None,  # 输入的 ID,默认为 None
            input_embeds=None,  # 输入的嵌入,默认为 None
            device=None,  # 设备,默认为 None
            beam_size: int = 5,  # 束搜索的大小,默认为 5
            entry_length: int = 67,  # 入口长度,默认为 67
            temperature: float = 1.0,  # 温度,默认为 1.0
            eos_token_id: Optional[int] = None,  # EOS 令牌 ID,默认为 None

.\diffusers\pipelines\unidiffuser\modeling_uvit.py

# 导入数学库
import math
# 从 typing 模块导入可选和联合类型
from typing import Optional, Union

# 导入 PyTorch 库
import torch
# 从 torch 模块导入神经网络相关类
from torch import nn

# 导入配置混合器和注册配置的工具
from ...configuration_utils import ConfigMixin, register_to_config
# 导入模型混合器
from ...models import ModelMixin
# 从注意力模型导入前馈网络
from ...models.attention import FeedForward
# 从注意力处理器导入注意力机制
from ...models.attention_processor import Attention
# 从嵌入模型导入时间步嵌入、时间步和获取二维正弦余弦位置嵌入的函数
from ...models.embeddings import TimestepEmbedding, Timesteps, get_2d_sincos_pos_embed
# 从建模输出导入 Transformer2DModelOutput
from ...models.modeling_outputs import Transformer2DModelOutput
# 从规范化模型导入自适应层归一化
from ...models.normalization import AdaLayerNorm
# 导入日志工具
from ...utils import logging

# 创建一个名为 __name__ 的日志记录器
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义一个不带梯度的截断正态分布初始化函数
def _no_grad_trunc_normal_(tensor, mean, std, a, b):
    # 从 PyTorch 官方库复制的函数,直到在几个正式版本中被纳入 - RW
    # 基于 https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf 的方法
    def norm_cdf(x):
        # 计算标准正态累积分布函数
        return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0

    # 检查均值是否在区间外,若是则发出警告
    if (mean < a - 2 * std) or (mean > b + 2 * std):
        logger.warning(
            "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. "
            "The distribution of values may be incorrect."
        )

    # 在不计算梯度的上下文中执行以下操作
    with torch.no_grad():
        # 通过使用截断均匀分布生成值,然后使用正态分布的逆CDF转换
        # 获取上下累积分布函数值
        l = norm_cdf((a - mean) / std)
        u = norm_cdf((b - mean) / std)

        # 在区间 [l, u] 内均匀填充张量,然后转换为 [2l-1, 2u-1]。
        tensor.uniform_(2 * l - 1, 2 * u - 1)

        # 使用逆CDF变换获取截断的标准正态分布
        tensor.erfinv_()

        # 转换为指定的均值和标准差
        tensor.mul_(std * math.sqrt(2.0))
        tensor.add_(mean)

        # 限制张量值确保在适当范围内
        tensor.clamp_(min=a, max=b)
        # 返回处理后的张量
        return tensor

# 定义截断正态分布初始化的公共接口函数
def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0):
    # 指定参数类型
    # type: (torch.Tensor, float, float, float, float) -> torch.Tensor
    r"""用从截断正态分布中抽取的值填充输入张量。值实际上是从正态分布 :math:`\mathcal{N}(\text{mean},
    \text{std}^2)` 中抽取的,超出 :math:`[a, b]` 的值会重新抽取,直到它们在范围内。用于生成随机值的方法在 :math:`a \leq \text{mean} \leq b` 时效果最佳。

    参数:
        tensor: n 维的 `torch.Tensor`
        mean: 正态分布的均值
        std: 正态分布的标准差
        a: 最小截断值
        b: 最大截断值
    示例:
        >>> w = torch.empty(3, 5) >>> nn.init.trunc_normal_(w)
    """
    # 调用内部函数生成截断正态分布值
    return _no_grad_trunc_normal_(tensor, mean, std, a, b)

# 定义一个图像到补丁嵌入的模块类
class PatchEmbed(nn.Module):
    """2D Image to Patch Embedding"""
    # 初始化类的构造函数
        def __init__(
            # 图像的高度,默认为224
            height=224,
            # 图像的宽度,默认为224
            width=224,
            # 每个patch的大小,默认为16
            patch_size=16,
            # 输入通道数,默认为3(RGB图像)
            in_channels=3,
            # 嵌入维度,默认为768
            embed_dim=768,
            # 是否使用层归一化,默认为False
            layer_norm=False,
            # 是否将输入展平,默认为True
            flatten=True,
            # 卷积是否使用偏置,默认为True
            bias=True,
            # 是否使用位置嵌入,默认为True
            use_pos_embed=True,
        ):
            # 调用父类的构造函数
            super().__init__()
    
            # 计算总patch的数量
            num_patches = (height // patch_size) * (width // patch_size)
            # 存储是否展平的标志
            self.flatten = flatten
            # 存储是否使用层归一化的标志
            self.layer_norm = layer_norm
    
            # 创建卷积层用于特征提取
            self.proj = nn.Conv2d(
                # 输入通道数
                in_channels, 
                # 输出嵌入维度
                embed_dim, 
                # 卷积核的大小
                kernel_size=(patch_size, patch_size), 
                # 步幅等于patch_size
                stride=patch_size, 
                # 是否使用偏置
                bias=bias
            )
            # 如果使用层归一化,则初始化层归一化对象
            if layer_norm:
                self.norm = nn.LayerNorm(embed_dim, elementwise_affine=False, eps=1e-6)
            # 否则将归一化对象设置为None
            else:
                self.norm = None
    
            # 存储是否使用位置嵌入的标志
            self.use_pos_embed = use_pos_embed
            # 如果使用位置嵌入,生成并注册位置嵌入
            if self.use_pos_embed:
                pos_embed = get_2d_sincos_pos_embed(embed_dim, int(num_patches**0.5))
                # 将位置嵌入注册为模型的缓冲区
                self.register_buffer("pos_embed", torch.from_numpy(pos_embed).float().unsqueeze(0), persistent=False)
    
        # 定义前向传播方法
        def forward(self, latent):
            # 通过卷积层处理输入数据
            latent = self.proj(latent)
            # 如果需要展平,执行展平和转置操作
            if self.flatten:
                latent = latent.flatten(2).transpose(1, 2)  # BCHW -> BNC
            # 如果使用层归一化,则应用层归一化
            if self.layer_norm:
                latent = self.norm(latent)
            # 如果使用位置嵌入,则返回加上位置嵌入的结果
            if self.use_pos_embed:
                return latent + self.pos_embed
            # 否则返回处理后的结果
            else:
                return latent
# 定义一个名为 SkipBlock 的类,继承自 nn.Module
class SkipBlock(nn.Module):
    # 初始化方法,接收一个整数参数 dim
    def __init__(self, dim: int):
        # 调用父类的初始化方法
        super().__init__()

        # 定义一个线性变换层,输入维度为 2 * dim,输出维度为 dim
        self.skip_linear = nn.Linear(2 * dim, dim)

        # 使用 torch.nn.LayerNorm 进行层归一化,处理维度为 dim 的张量
        self.norm = nn.LayerNorm(dim)

    # 前向传播方法,接收输入 x 和跳跃连接 skip
    def forward(self, x, skip):
        # 将 x 和 skip 沿最后一个维度连接,并通过线性变换层处理
        x = self.skip_linear(torch.cat([x, skip], dim=-1))
        # 对处理后的张量进行层归一化
        x = self.norm(x)

        # 返回处理后的结果
        return x


# 定义一个名为 UTransformerBlock 的类,继承自 nn.Module
# 这是对 BasicTransformerBlock 的修改,支持 pre-LayerNorm 和 post-LayerNorm 配置
class UTransformerBlock(nn.Module):
    r"""
    对 BasicTransformerBlock 的修改,支持 pre-LayerNorm 和 post-LayerNorm 配置。

    参数:
        dim (`int`): 输入和输出的通道数。
        num_attention_heads (`int`): 用于多头注意力的头数。
        attention_head_dim (`int`): 每个头的通道数。
        dropout (`float`, *可选*, 默认值为 0.0): 使用的 dropout 概率。
        cross_attention_dim (`int`, *可选*): 用于交叉注意力的 encoder_hidden_states 向量的大小。
        activation_fn (`str`, *可选*, 默认值为 `"geglu"`):
            在前馈网络中使用的激活函数。
        num_embeds_ada_norm (:obj: `int`, *可选*):
            训练期间使用的扩散步骤数。参见 `Transformer2DModel`。
        attention_bias (:obj: `bool`, *可选*, 默认值为 `False`):
            配置注意力是否包含偏置参数。
        only_cross_attention (`bool`, *可选*):
            是否仅使用交叉注意力层。在这种情况下使用两个交叉注意力层。
        double_self_attention (`bool`, *可选*):
            是否使用两个自注意力层。在这种情况下不使用交叉注意力层。
        upcast_attention (`bool`, *可选*):
            在执行注意力计算时,是否将查询和键的类型提升为 float32。
        norm_elementwise_affine (`bool`, *可选*):
            在层归一化期间是否使用可学习的逐元素仿射参数。
        norm_type (`str`, 默认值为 `"layer_norm"`):
            使用的层归一化实现类型。
        pre_layer_norm (`bool`, *可选*):
            是否在注意力和前馈操作之前执行层归一化("pre-LayerNorm"),
            而不是之后("post-LayerNorm")。注意 `BasicTransformerBlock` 使用 pre-LayerNorm,例如
            `pre_layer_norm = True`。
        final_dropout (`bool`, *可选*):
            是否在前馈网络后使用最终的 Dropout 层。
    """
    # 初始化方法,设置模型的各种参数
    def __init__(
            # 模型的维度
            self,
            dim: int,
            # 注意力头的数量
            num_attention_heads: int,
            # 每个注意力头的维度
            attention_head_dim: int,
            # dropout 概率,默认值为 0.0
            dropout=0.0,
            # 交叉注意力的维度,可选
            cross_attention_dim: Optional[int] = None,
            # 激活函数的类型,默认为 "geglu"
            activation_fn: str = "geglu",
            # 可选的自适应归一化的嵌入数量
            num_embeds_ada_norm: Optional[int] = None,
            # 是否使用注意力偏置
            attention_bias: bool = False,
            # 是否仅使用交叉注意力
            only_cross_attention: bool = False,
            # 是否双重自注意力
            double_self_attention: bool = False,
            # 是否提升注意力精度
            upcast_attention: bool = False,
            # 归一化时是否使用元素级仿射变换
            norm_elementwise_affine: bool = True,
            # 归一化的类型,默认为 "layer_norm"
            norm_type: str = "layer_norm",
            # 是否使用预层归一化
            pre_layer_norm: bool = True,
            # 是否在最终阶段使用 dropout
            final_dropout: bool = False,
    ):
        # 调用父类的初始化方法
        super().__init__()
        # 仅使用交叉注意力的标志
        self.only_cross_attention = only_cross_attention

        # 确定是否使用 AdaLayerNorm,依据 num_embeds_ada_norm 和 norm_type
        self.use_ada_layer_norm = (num_embeds_ada_norm is not None) and norm_type == "ada_norm"

        # 预先进行层归一化的标志
        self.pre_layer_norm = pre_layer_norm

        # 如果 norm_type 是 "ada_norm" 或 "ada_norm_zero",且未定义 num_embeds_ada_norm,抛出错误
        if norm_type in ("ada_norm", "ada_norm_zero") and num_embeds_ada_norm is None:
            raise ValueError(
                f"`norm_type` is set to {norm_type}, but `num_embeds_ada_norm` is not defined. Please make sure to"
                f" define `num_embeds_ada_norm` if setting `norm_type` to {norm_type}."
            )

        # 1. 自注意力层
        self.attn1 = Attention(
            # 查询向量维度
            query_dim=dim,
            # 注意力头的数量
            heads=num_attention_heads,
            # 每个注意力头的维度
            dim_head=attention_head_dim,
            # Dropout 比例
            dropout=dropout,
            # 是否使用偏置
            bias=attention_bias,
            # 交叉注意力的维度(仅在只使用交叉注意力时设定)
            cross_attention_dim=cross_attention_dim if only_cross_attention else None,
            # 是否上溯注意力计算
            upcast_attention=upcast_attention,
        )

        # 2. 交叉注意力层
        if cross_attention_dim is not None or double_self_attention:
            self.attn2 = Attention(
                # 查询向量维度
                query_dim=dim,
                # 交叉注意力的维度(在双自注意力的情况下设为 None)
                cross_attention_dim=cross_attention_dim if not double_self_attention else None,
                # 注意力头的数量
                heads=num_attention_heads,
                # 每个注意力头的维度
                dim_head=attention_head_dim,
                # Dropout 比例
                dropout=dropout,
                # 是否使用偏置
                bias=attention_bias,
                # 是否上溯注意力计算
                upcast_attention=upcast_attention,
            )  # 如果 encoder_hidden_states 为 None 则视为自注意力
        else:
            # 若不需要交叉注意力,则将其设置为 None
            self.attn2 = None

        # 根据是否使用 AdaLayerNorm 来选择层归一化的实现
        if self.use_ada_layer_norm:
            self.norm1 = AdaLayerNorm(dim, num_embeds_ada_norm)
        else:
            # 使用标准的层归一化
            self.norm1 = nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)

        # 如果有交叉注意力维度或使用双自注意力
        if cross_attention_dim is not None or double_self_attention:
            # 目前只在自注意力中使用 AdaLayerNormZero,因为只有一个注意力块
            # 如果在第二个交叉注意力块返回的调制块数目将没有意义
            self.norm2 = (
                AdaLayerNorm(dim, num_embeds_ada_norm)
                if self.use_ada_layer_norm
                else nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)
            )
        else:
            # 如果没有交叉注意力,则将其设置为 None
            self.norm2 = None

        # 3. 前馈层
        # 对前馈层的输出进行标准层归一化
        self.norm3 = nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)
        # 初始化前馈神经网络
        self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn, final_dropout=final_dropout)

    def forward(
        # 输入的隐藏状态
        hidden_states,
        # 注意力掩码
        attention_mask=None,
        # 编码器的隐藏状态
        encoder_hidden_states=None,
        # 编码器的注意力掩码
        encoder_attention_mask=None,
        # 时间步
        timestep=None,
        # 交叉注意力的额外参数
        cross_attention_kwargs=None,
        # 类别标签
        class_labels=None,
    ):
        # 预处理层归一化
        if self.pre_layer_norm:
            # 如果使用自适应层归一化,则传递时间步
            if self.use_ada_layer_norm:
                norm_hidden_states = self.norm1(hidden_states, timestep)
            else:
                # 否则直接进行层归一化
                norm_hidden_states = self.norm1(hidden_states)
        else:
            # 如果不使用预处理层归一化,直接使用输入的隐藏状态
            norm_hidden_states = hidden_states

        # 1. 自注意力机制
        # 如果没有提供交叉注意力的参数,则使用空字典
        cross_attention_kwargs = cross_attention_kwargs if cross_attention_kwargs is not None else {}
        # 进行自注意力计算,可能会传入编码器的隐藏状态和注意力掩码
        attn_output = self.attn1(
            norm_hidden_states,
            encoder_hidden_states=encoder_hidden_states if self.only_cross_attention else None,
            attention_mask=attention_mask,
            **cross_attention_kwargs,
        )

        # 后处理层归一化
        if not self.pre_layer_norm:
            # 如果不使用预处理层归一化,进行归一化处理
            if self.use_ada_layer_norm:
                attn_output = self.norm1(attn_output, timestep)
            else:
                attn_output = self.norm1(attn_output)

        # 将自注意力的输出与输入的隐藏状态相加
        hidden_states = attn_output + hidden_states

        if self.attn2 is not None:
            # 预处理层归一化
            if self.pre_layer_norm:
                # 如果使用自适应层归一化,则传递时间步
                norm_hidden_states = (
                    self.norm2(hidden_states, timestep) if self.use_ada_layer_norm else self.norm2(hidden_states)
                )
            else:
                # 否则直接使用输入的隐藏状态
                norm_hidden_states = hidden_states
            # TODO (Birch-San): 这里应该正确准备编码器注意力掩码
            # 准备注意力掩码

            # 2. 交叉注意力机制
            attn_output = self.attn2(
                norm_hidden_states,
                encoder_hidden_states=encoder_hidden_states,
                attention_mask=encoder_attention_mask,
                **cross_attention_kwargs,
            )

            # 后处理层归一化
            if not self.pre_layer_norm:
                attn_output = self.norm2(attn_output, timestep) if self.use_ada_layer_norm else self.norm2(attn_output)

            # 将交叉注意力的输出与输入的隐藏状态相加
            hidden_states = attn_output + hidden_states

        # 3. 前馈神经网络
        # 预处理层归一化
        if self.pre_layer_norm:
            norm_hidden_states = self.norm3(hidden_states)
        else:
            # 否则直接使用输入的隐藏状态
            norm_hidden_states = hidden_states

        # 进行前馈神经网络计算
        ff_output = self.ff(norm_hidden_states)

        # 后处理层归一化
        if not self.pre_layer_norm:
            ff_output = self.norm3(ff_output)

        # 将前馈神经网络的输出与输入的隐藏状态相加
        hidden_states = ff_output + hidden_states

        # 返回最终的隐藏状态
        return hidden_states
# 类似于 UTransformerBlock,但在块的残差路径上使用 LayerNorm
# 从 diffusers.models.attention.BasicTransformerBlock 修改而来
class UniDiffuserBlock(nn.Module):
    r"""
    对 BasicTransformerBlock 的修改,支持 pre-LayerNorm 和 post-LayerNorm 配置,并将
    LayerNorm 应用在块的残差路径上。这与 [original UniDiffuser
    implementation](https://github.com/thu-ml/unidiffuser/blob/main/libs/uvit_multi_post_ln_v1.py#L104) 中的 transformer 块相匹配。

    参数:
        dim (`int`): 输入和输出的通道数。
        num_attention_heads (`int`): 用于多头注意力的头数。
        attention_head_dim (`int`): 每个头的通道数。
        dropout (`float`, *可选*, 默认为 0.0): 使用的丢弃概率。
        cross_attention_dim (`int`, *可选*): 跨注意力的 encoder_hidden_states 向量的大小。
        activation_fn (`str`, *可选*, 默认为 `"geglu"`):
            在前馈网络中使用的激活函数。
        num_embeds_ada_norm (:obj: `int`, *可选*):
            训练期间使用的扩散步骤数量。见 `Transformer2DModel`。
        attention_bias (:obj: `bool`, *可选*, 默认为 `False`):
            配置注意力是否包含偏置参数。
        only_cross_attention (`bool`, *可选*):
            是否仅使用跨注意力层。在这种情况下,使用两个跨注意力层。
        double_self_attention (`bool`, *可选*):
            是否使用两个自注意力层。在这种情况下,不使用跨注意力层。
        upcast_attention (`bool`, *可选*):
            在执行注意力计算时,是否将查询和键上升到 float() 类型。
        norm_elementwise_affine (`bool`, *可选*):
            在层归一化期间,是否使用可学习的逐元素仿射参数。
        norm_type (`str`, 默认为 `"layer_norm"`):
            使用的层归一化实现。
        pre_layer_norm (`bool`, *可选*):
            是否在注意力和前馈操作之前执行层归一化(“pre-LayerNorm”),
            而不是之后(“post-LayerNorm”)。原始 UniDiffuser 实现是 post-LayerNorm
            (`pre_layer_norm = False`)。
        final_dropout (`bool`, *可选*):
            在前馈网络之后是否使用最终的 Dropout 层。
    """
    # 初始化方法,用于设置模型的基本参数
        def __init__(
            # 模型的维度
            self,
            dim: int,
            # 注意力头的数量
            num_attention_heads: int,
            # 每个注意力头的维度
            attention_head_dim: int,
            # dropout 概率,默认值为 0.0
            dropout=0.0,
            # 可选的交叉注意力维度
            cross_attention_dim: Optional[int] = None,
            # 激活函数的类型,默认使用 "geglu"
            activation_fn: str = "geglu",
            # 可选的自适应归一化的嵌入数量
            num_embeds_ada_norm: Optional[int] = None,
            # 是否使用注意力偏置,默认值为 False
            attention_bias: bool = False,
            # 是否仅使用交叉注意力,默认值为 False
            only_cross_attention: bool = False,
            # 是否使用双重自注意力,默认值为 False
            double_self_attention: bool = False,
            # 是否上溯注意力,默认值为 False
            upcast_attention: bool = False,
            # 归一化时是否使用元素-wise 仿射变换,默认值为 True
            norm_elementwise_affine: bool = True,
            # 归一化的类型,默认使用 "layer_norm"
            norm_type: str = "layer_norm",
            # 是否在前面使用层归一化,默认值为 False
            pre_layer_norm: bool = False,
            # 最终是否使用 dropout,默认值为 True
            final_dropout: bool = True,
    ):
        # 初始化父类
        super().__init__()
        # 设置是否仅使用交叉注意力
        self.only_cross_attention = only_cross_attention

        # 判断是否使用自适应层归一化
        self.use_ada_layer_norm = (num_embeds_ada_norm is not None) and norm_type == "ada_norm"

        # 设置预层归一化
        self.pre_layer_norm = pre_layer_norm

        # 检查归一化类型和自适应嵌入数量的有效性
        if norm_type in ("ada_norm", "ada_norm_zero") and num_embeds_ada_norm is None:
            raise ValueError(
                # 抛出异常信息,提示未定义自适应嵌入数量
                f"`norm_type` is set to {norm_type}, but `num_embeds_ada_norm` is not defined. Please make sure to"
                f" define `num_embeds_ada_norm` if setting `norm_type` to {norm_type}."
            )

        # 1. 自注意力层
        self.attn1 = Attention(
            # 设置查询维度、头数、头维度和其他参数
            query_dim=dim,
            heads=num_attention_heads,
            dim_head=attention_head_dim,
            dropout=dropout,
            bias=attention_bias,
            # 根据是否仅使用交叉注意力选择交叉注意力维度
            cross_attention_dim=cross_attention_dim if only_cross_attention else None,
            upcast_attention=upcast_attention,
        )

        # 2. 交叉注意力层
        if cross_attention_dim is not None or double_self_attention:
            self.attn2 = Attention(
                # 设置查询和交叉注意力的维度及其他参数
                query_dim=dim,
                cross_attention_dim=cross_attention_dim if not double_self_attention else None,
                heads=num_attention_heads,
                dim_head=attention_head_dim,
                dropout=dropout,
                bias=attention_bias,
                upcast_attention=upcast_attention,
            )  # 如果 encoder_hidden_states 为 None,则为自注意力
        else:
            # 如果没有交叉注意力维度,设置为 None
            self.attn2 = None

        # 如果使用自适应层归一化,初始化相应的归一化层
        if self.use_ada_layer_norm:
            self.norm1 = AdaLayerNorm(dim, num_embeds_ada_norm)
        else:
            # 否则使用标准层归一化
            self.norm1 = nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)

        # 如果有交叉注意力维度或双自注意力
        if cross_attention_dim is not None or double_self_attention:
            # 目前仅在自注意力中使用 AdaLayerNormZero
            self.norm2 = (
                # 根据是否使用自适应层归一化选择归一化层
                AdaLayerNorm(dim, num_embeds_ada_norm)
                if self.use_ada_layer_norm
                else nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)
            )
        else:
            # 如果没有交叉注意力,则设置为 None
            self.norm2 = None

        # 3. 前馈层
        # 初始化第三层的标准层归一化
        self.norm3 = nn.LayerNorm(dim, elementwise_affine=norm_elementwise_affine)
        # 初始化前馈网络
        self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn, final_dropout=final_dropout)

    def forward(
        # 定义前向传播函数的输入参数
        hidden_states,
        attention_mask=None,
        encoder_hidden_states=None,
        encoder_attention_mask=None,
        timestep=None,
        cross_attention_kwargs=None,
        class_labels=None,
    ):
        # 按照 diffusers transformer block 实现,将 LayerNorm 放在残差连接上
        # 预 LayerNorm
        if self.pre_layer_norm:
            # 如果使用自适应 LayerNorm,应用它
            if self.use_ada_layer_norm:
                hidden_states = self.norm1(hidden_states, timestep)
            else:
                # 否则,直接应用 LayerNorm
                hidden_states = self.norm1(hidden_states)

        # 1. 自注意力
        # 如果 cross_attention_kwargs 为 None,则初始化为空字典
        cross_attention_kwargs = cross_attention_kwargs if cross_attention_kwargs is not None else {}
        # 执行自注意力操作,获取输出
        attn_output = self.attn1(
            hidden_states,
            # 根据条件选择 encoder_hidden_states
            encoder_hidden_states=encoder_hidden_states if self.only_cross_attention else None,
            # 应用注意力掩码
            attention_mask=attention_mask,
            **cross_attention_kwargs,
        )

        # 将自注意力输出与输入隐藏状态相加
        hidden_states = attn_output + hidden_states

        # 按照 diffusers transformer block 实现,将 LayerNorm 放在残差连接上
        # 后 LayerNorm
        if not self.pre_layer_norm:
            # 如果使用自适应 LayerNorm,应用它
            if self.use_ada_layer_norm:
                hidden_states = self.norm1(hidden_states, timestep)
            else:
                # 否则,直接应用 LayerNorm
                hidden_states = self.norm1(hidden_states)

        # 如果 attn2 存在
        if self.attn2 is not None:
            # 预 LayerNorm
            if self.pre_layer_norm:
                # 根据条件应用 norm2
                hidden_states = (
                    self.norm2(hidden_states, timestep) if self.use_ada_layer_norm else self.norm2(hidden_states)
                )
            # TODO (Birch-San): 这里应该正确准备 encoder_attention 掩码
            # 在这里准备注意力掩码

            # 2. 跨注意力
            # 执行跨注意力操作,获取输出
            attn_output = self.attn2(
                hidden_states,
                encoder_hidden_states=encoder_hidden_states,
                # 应用 encoder 的注意力掩码
                attention_mask=encoder_attention_mask,
                **cross_attention_kwargs,
            )

            # 将跨注意力输出与输入隐藏状态相加
            hidden_states = attn_output + hidden_states

            # 后 LayerNorm
            if not self.pre_layer_norm:
                hidden_states = (
                    self.norm2(hidden_states, timestep) if self.use_ada_layer_norm else self.norm2(hidden_states)
                )

        # 3. 前馈网络
        # 预 LayerNorm
        if self.pre_layer_norm:
            # 应用 norm3
            hidden_states = self.norm3(hidden_states)

        # 通过前馈网络获取输出
        ff_output = self.ff(hidden_states)

        # 将前馈网络输出与输入隐藏状态相加
        hidden_states = ff_output + hidden_states

        # 后 LayerNorm
        if not self.pre_layer_norm:
            # 应用 norm3
            hidden_states = self.norm3(hidden_states)

        # 返回最终的隐藏状态
        return hidden_states
# 从 diffusers.models.transformer_2d.Transformer2DModel 修改而来
# 修改变换块结构,使其类似 U-Net,遵循 U-ViT 的设计
# 目前仅支持补丁样式输入和 torch.nn.LayerNorm
# 相关链接: https://github.com/baofff/U-ViT
class UTransformer2DModel(ModelMixin, ConfigMixin):
    """
    基于 [U-ViT](https://github.com/baofff/U-ViT) 架构的变换器模型,适用于图像数据。
    与 [`Transformer2DModel`] 相比,此模型在变换块之间具有跳跃连接,以 "U" 形状连接,
    类似于 U-Net。仅支持连续(实际嵌入)输入,这些输入通过 [`PatchEmbed`] 层嵌入,
    然后重塑为 (b, t, d) 的形状。
    """

    @register_to_config
    # 初始化 UTransformer2DModel 的构造函数
    def __init__(
        # 注意力头的数量,默认为 16
        num_attention_heads: int = 16,
        # 每个注意力头的维度,默认为 88
        attention_head_dim: int = 88,
        # 输入通道数,默认为 None
        in_channels: Optional[int] = None,
        # 输出通道数,默认为 None
        out_channels: Optional[int] = None,
        # 变换层的数量,默认为 1
        num_layers: int = 1,
        # dropout 的比例,默认为 0.0
        dropout: float = 0.0,
        # 规范化时的组数量,默认为 32
        norm_num_groups: int = 32,
        # 跨注意力维度,默认为 None
        cross_attention_dim: Optional[int] = None,
        # 是否使用注意力偏置,默认为 False
        attention_bias: bool = False,
        # 样本大小,默认为 None
        sample_size: Optional[int] = None,
        # 向量嵌入的数量,默认为 None
        num_vector_embeds: Optional[int] = None,
        # 补丁大小,默认为 2
        patch_size: Optional[int] = 2,
        # 激活函数,默认为 "geglu"
        activation_fn: str = "geglu",
        # 自适应规范化的嵌入数量,默认为 None
        num_embeds_ada_norm: Optional[int] = None,
        # 是否使用线性投影,默认为 False
        use_linear_projection: bool = False,
        # 是否仅使用跨注意力,默认为 False
        only_cross_attention: bool = False,
        # 是否上溯注意力,默认为 False
        upcast_attention: bool = False,
        # 规范化类型,默认为 "layer_norm"
        norm_type: str = "layer_norm",
        # 块类型,默认为 "unidiffuser"
        block_type: str = "unidiffuser",
        # 是否使用预层规范化,默认为 False
        pre_layer_norm: bool = False,
        # 规范化时是否使用元素级的仿射,默认为 True
        norm_elementwise_affine: bool = True,
        # 是否使用补丁位置嵌入,默认为 False
        use_patch_pos_embed=False,
        # 前馈层的最终 dropout,默认为 False
        ff_final_dropout: bool = False,
    # 前向传播函数定义
    def forward(
        # 隐藏状态输入
        hidden_states,
        # 编码器隐藏状态,默认为 None
        encoder_hidden_states=None,
        # 时间步,默认为 None
        timestep=None,
        # 类别标签,默认为 None
        class_labels=None,
        # 跨注意力相关参数,默认为 None
        cross_attention_kwargs=None,
        # 是否返回字典格式的输出,默认为 True
        return_dict: bool = True,
        # 隐藏状态是否为嵌入,默认为 False
        hidden_states_is_embedding: bool = False,
        # 是否进行反补丁操作,默认为 True
        unpatchify: bool = True,
class UniDiffuserModel(ModelMixin, ConfigMixin):
    """
    图像-文本 [UniDiffuser](https://arxiv.org/pdf/2303.06555.pdf) 模型的变换器模型。
    这是 [`UTransformer2DModel`] 的修改版本,具有用于 VAE 嵌入潜图像、CLIP 嵌入图像和 CLIP 嵌入提示的输入和输出头(详见论文)。
    """

    @register_to_config
    # 初始化方法,用于设置类的属性和参数
        def __init__(
            # 文本的维度,默认为768
            text_dim: int = 768,
            # CLIP图像的维度,默认为512
            clip_img_dim: int = 512,
            # 文本标记的数量,默认为77
            num_text_tokens: int = 77,
            # 注意力头的数量,默认为16
            num_attention_heads: int = 16,
            # 每个注意力头的维度,默认为88
            attention_head_dim: int = 88,
            # 输入通道的数量,可选
            in_channels: Optional[int] = None,
            # 输出通道的数量,可选
            out_channels: Optional[int] = None,
            # 网络层的数量,默认为1
            num_layers: int = 1,
            # dropout比率,默认为0.0
            dropout: float = 0.0,
            # 规范化的组数量,默认为32
            norm_num_groups: int = 32,
            # 跨注意力的维度,可选
            cross_attention_dim: Optional[int] = None,
            # 注意力偏差,默认为False
            attention_bias: bool = False,
            # 采样大小,可选
            sample_size: Optional[int] = None,
            # 向量嵌入的数量,可选
            num_vector_embeds: Optional[int] = None,
            # 图像块的大小,可选
            patch_size: Optional[int] = None,
            # 激活函数,默认为"geglu"
            activation_fn: str = "geglu",
            # 自适应规范化嵌入的数量,可选
            num_embeds_ada_norm: Optional[int] = None,
            # 是否使用线性投影,默认为False
            use_linear_projection: bool = False,
            # 仅使用跨注意力,默认为False
            only_cross_attention: bool = False,
            # 是否上调注意力,默认为False
            upcast_attention: bool = False,
            # 规范化类型,默认为"layer_norm"
            norm_type: str = "layer_norm",
            # 块类型,默认为"unidiffuser"
            block_type: str = "unidiffuser",
            # 是否使用预层规范化,默认为False
            pre_layer_norm: bool = False,
            # 是否使用时间步嵌入,默认为False
            use_timestep_embedding=False,
            # 规范化的元素逐项仿射,默认为True
            norm_elementwise_affine: bool = True,
            # 是否使用块位置嵌入,默认为False
            use_patch_pos_embed=False,
            # 前馈层的最终dropout,默认为True
            ff_final_dropout: bool = True,
            # 是否使用数据类型嵌入,默认为False
            use_data_type_embedding: bool = False,
        # 装饰器,表示该方法在Torch JIT编译时会被忽略
        @torch.jit.ignore
        def no_weight_decay(self):
            # 返回不需要权重衰减的参数
            return {"pos_embed"}
    
        # 前向传播方法,定义输入和计算流程
        def forward(
            # 潜在图像嵌入的张量
            latent_image_embeds: torch.Tensor,
            # 图像嵌入的张量
            image_embeds: torch.Tensor,
            # 提示嵌入的张量
            prompt_embeds: torch.Tensor,
            # 时间步图像的张量或数值
            timestep_img: Union[torch.Tensor, float, int],
            # 时间步文本的张量或数值
            timestep_text: Union[torch.Tensor, float, int],
            # 数据类型,可选,默认为1
            data_type: Optional[Union[torch.Tensor, float, int]] = 1,
            # 编码器隐藏状态,默认为None
            encoder_hidden_states=None,
            # 跨注意力的额外参数,默认为None
            cross_attention_kwargs=None,

.\diffusers\pipelines\unidiffuser\pipeline_unidiffuser.py

# 导入 inspect 模块,用于检查对象的内部结构和属性
import inspect
# 从 dataclasses 模块导入 dataclass 装饰器,用于简化类的定义
from dataclasses import dataclass
# 从 typing 模块导入类型注释工具
from typing import Callable, List, Optional, Union

# 导入 numpy 库,用于数值计算和数组操作
import numpy as np
# 导入 PIL.Image,用于图像处理
import PIL.Image
# 导入 torch 库,用于深度学习和张量操作
import torch
# 从 transformers 库导入多个类,用于处理 CLIP 模型
from transformers import (
    CLIPImageProcessor,  # 处理图像的 CLIP 处理器
    CLIPTextModel,       # 处理文本的 CLIP 模型
    CLIPTokenizer,       # CLIP 模型的分词器
    CLIPVisionModelWithProjection,  # CLIP 视觉模型
    GPT2Tokenizer,       # GPT-2 模型的分词器
)

# 从相对路径导入 VaeImageProcessor 类,用于变分自编码器图像处理
from ...image_processor import VaeImageProcessor
# 从相对路径导入加载器混合类,用于加载不同模型
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 从相对路径导入自编码器类
from ...models import AutoencoderKL
# 从相对路径导入调整 Lora 的函数,用于文本编码器
from ...models.lora import adjust_lora_scale_text_encoder
# 从相对路径导入 Karras 扩散调度器类
from ...schedulers import KarrasDiffusionSchedulers
# 从相对路径导入工具函数和常量
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
# 从相对路径导入基础输出类
from ...utils.outputs import BaseOutput
# 从相对路径导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从相对路径导入扩散管道类
from ..pipeline_utils import DiffusionPipeline
# 从相对路径导入文本解码器模型类
from .modeling_text_decoder import UniDiffuserTextDecoder
# 从相对路径导入 UViT 模型类
from .modeling_uvit import UniDiffuserModel

# 创建一个日志记录器实例,用于记录该模块的日志信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义一个新的基类输出子类,用于联合图像-文本输出
@dataclass
class ImageTextPipelineOutput(BaseOutput):
    """
    联合图像-文本管道的输出类。

    参数:
        images (`List[PIL.Image.Image]` 或 `np.ndarray`)
            长度为 `batch_size` 的去噪 PIL 图像列表或形状为 `(batch_size, height, width,
            num_channels)` 的 NumPy 数组。
        text (`List[str]` 或 `List[List[str]]`)
            长度为 `batch_size` 的生成文本字符串列表或外层列表长度为
            `batch_size` 的字符串列表。
    """

    # 可选的图像输出,可以是 PIL 图像列表或 NumPy 数组
    images: Optional[Union[List[PIL.Image.Image], np.ndarray]]
    # 可选的文本输出,可以是字符串列表或字符串列表的列表
    text: Optional[Union[List[str], List[List[str]]]]

# 定义联合扩散管道类,继承自 DiffusionPipeline
class UniDiffuserPipeline(DiffusionPipeline):
    r"""
    用于双模态图像-文本模型的管道,支持无条件文本和图像生成、文本条件图像生成、
    图像条件文本生成以及联合图像-文本生成。

    该模型继承自 [`DiffusionPipeline`]。查看超类文档以了解所有管道实现的通用方法
    (下载、保存、在特定设备上运行等)。
    # 定义参数的文档字符串,描述每个参数的作用
        Args:
            vae ([`AutoencoderKL`]):
                # 变分自编码器模型,用于将图像编码和解码为潜在表示
                Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. This
                is part of the UniDiffuser image representation along with the CLIP vision encoding.
            text_encoder ([`CLIPTextModel`]):
                # 冻结的文本编码器,使用特定的 CLIP 模型进行文本编码
                Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
            image_encoder ([`CLIPVisionModel`]):
                # CLIP 视觉模型,用于将图像编码为其表示的一部分
                A [`~transformers.CLIPVisionModel`] to encode images as part of its image representation along with the VAE
                latent representation.
            image_processor ([`CLIPImageProcessor`]):
                # CLIP 图像处理器,用于在编码之前对图像进行预处理
                [`~transformers.CLIPImageProcessor`] to preprocess an image before CLIP encoding it with `image_encoder`.
            clip_tokenizer ([`CLIPTokenizer`]):
                # CLIP 分词器,用于在文本编码之前对提示进行分词
                 A [`~transformers.CLIPTokenizer`] to tokenize the prompt before encoding it with `text_encoder`.
            text_decoder ([`UniDiffuserTextDecoder`]):
                # 冻结的文本解码器,用于从 UniDiffuser 嵌入生成文本
                Frozen text decoder. This is a GPT-style model which is used to generate text from the UniDiffuser
                embedding.
            text_tokenizer ([`GPT2Tokenizer`]):
                # GPT2 分词器,用于文本生成的解码,与文本解码器一起使用
                A [`~transformers.GPT2Tokenizer`] to decode text for text generation; used along with the `text_decoder`.
            unet ([`UniDiffuserModel`]):
                # U-ViT 模型,具有 UNNet 风格的跳跃连接,用于去噪编码的图像潜在表示
                A [U-ViT](https://github.com/baofff/U-ViT) model with UNNet-style skip connections between transformer
                layers to denoise the encoded image latents.
            scheduler ([`SchedulerMixin`]):
                # 调度器,与 UNet 一起使用以去噪编码的图像和/或文本潜在表示
                A scheduler to be used in combination with `unet` to denoise the encoded image and/or text latents. The
                original UniDiffuser paper uses the [`DPMSolverMultistepScheduler`] scheduler.
        """
    
        # TODO: 支持启用模型 CPU 离线加载的组件的子模块移动
        model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae->text_decoder"
    
        # 初始化方法,接受多个模型组件作为参数
        def __init__(
            self,
            # 变分自编码器模型
            vae: AutoencoderKL,
            # 文本编码器模型
            text_encoder: CLIPTextModel,
            # 图像编码器模型
            image_encoder: CLIPVisionModelWithProjection,
            # CLIP 图像处理器
            clip_image_processor: CLIPImageProcessor,
            # CLIP 分词器
            clip_tokenizer: CLIPTokenizer,
            # 文本解码器模型
            text_decoder: UniDiffuserTextDecoder,
            # GPT2 分词器
            text_tokenizer: GPT2Tokenizer,
            # U-ViT 模型
            unet: UniDiffuserModel,
            # 调度器模型
            scheduler: KarrasDiffusionSchedulers,
    ):
        # 初始化父类
        super().__init__()

        # 检查文本编码器的隐藏层大小与文本解码器的前缀内维度是否相同
        if text_encoder.config.hidden_size != text_decoder.prefix_inner_dim:
            # 抛出值错误,提示二者不匹配
            raise ValueError(
                f"The text encoder hidden size and text decoder prefix inner dim must be the same, but"
                f" `text_encoder.config.hidden_size`: {text_encoder.config.hidden_size} and `text_decoder.prefix_inner_dim`: {text_decoder.prefix_inner_dim}"
            )

        # 注册模块,包括 VAE、文本编码器、图像编码器等
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            image_encoder=image_encoder,
            clip_image_processor=clip_image_processor,
            clip_tokenizer=clip_tokenizer,
            text_decoder=text_decoder,
            text_tokenizer=text_tokenizer,
            unet=unet,
            scheduler=scheduler,
        )

        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建 VAE 图像处理器
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

        # 获取潜在空间的通道数
        self.num_channels_latents = vae.config.latent_channels
        # 获取文本编码器的最大序列长度
        self.text_encoder_seq_len = text_encoder.config.max_position_embeddings
        # 获取文本编码器的隐藏层大小
        self.text_encoder_hidden_size = text_encoder.config.hidden_size
        # 获取图像编码器的投影维度
        self.image_encoder_projection_dim = image_encoder.config.projection_dim
        # 获取 U-Net 的分辨率
        self.unet_resolution = unet.config.sample_size

        # 设置文本中间维度,默认为文本编码器的隐藏层大小
        self.text_intermediate_dim = self.text_encoder_hidden_size
        # 如果文本解码器的前缀隐藏维度不为 None,则使用该维度
        if self.text_decoder.prefix_hidden_dim is not None:
            self.text_intermediate_dim = self.text_decoder.prefix_hidden_dim

        # 初始化模式属性为 None
        self.mode = None

        # TODO: 处理安全检查?
        self.safety_checker = None

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
    def prepare_extra_step_kwargs(self, generator, eta):
        # 准备调度器步骤的额外参数,因为并非所有调度器都有相同的签名
        # eta(η)仅用于 DDIMScheduler,对于其他调度器将被忽略
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 应该在 [0, 1] 之间

        # 检查调度器是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        # 如果接受 eta,添加到额外参数中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,添加到额外参数中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回额外参数字典
        return extra_step_kwargs
    # 定义一个方法,用于根据输入推断生成任务的模式
    def _infer_mode(self, prompt, prompt_embeds, image, latents, prompt_latents, vae_latents, clip_latents):
        r"""
        从 `__call__` 的输入中推断生成任务('mode')。如果模式已手动设置,则使用设置的模式。
        """
        # 检查 prompt 或 prompt_embeds 是否可用
        prompt_available = (prompt is not None) or (prompt_embeds is not None)
        # 检查 image 是否可用
        image_available = image is not None
        # 判断输入是否可用(prompt 或 image 至少一个可用)
        input_available = prompt_available or image_available

        # 检查 prompt_latents 是否可用
        prompt_latents_available = prompt_latents is not None
        # 检查 vae_latents 是否可用
        vae_latents_available = vae_latents is not None
        # 检查 clip_latents 是否可用
        clip_latents_available = clip_latents is not None
        # 检查 latents 是否可用
        full_latents_available = latents is not None
        # 判断图像 latents 是否可用(同时有 vae_latents 和 clip_latents)
        image_latents_available = vae_latents_available and clip_latents_available
        # 判断所有单独的 latents 是否可用(有 prompt_latents 和图像 latents)
        all_indv_latents_available = prompt_latents_available and image_latents_available

        # 如果用户已设置模式,则优先使用该模式
        if self.mode is not None:
            mode = self.mode
        # 如果 prompt 可用,则设置模式为 "text2img"
        elif prompt_available:
            mode = "text2img"
        # 如果 image 可用,则设置模式为 "img2text"
        elif image_available:
            mode = "img2text"
        else:
            # 如果既没有提供 prompt 也没有提供 image,则根据 latents 的可用性推断模式
            if full_latents_available or all_indv_latents_available:
                mode = "joint"
            elif prompt_latents_available:
                mode = "text"
            elif image_latents_available:
                mode = "img"
            else:
                # 没有可用的输入或 latents
                mode = "joint"

        # 对模糊的情况给予警告
        if self.mode is None and prompt_available and image_available:
            logger.warning(
                f"You have supplied both a text prompt and image to the pipeline and mode has not been set manually,"
                f" defaulting to mode '{mode}'."
            )

        # 如果没有设置模式且没有输入可用
        if self.mode is None and not input_available:
            if vae_latents_available != clip_latents_available:
                # 只有一个 vae_latents 或 clip_latents 被提供
                logger.warning(
                    f"You have supplied exactly one of `vae_latents` and `clip_latents`, whereas either both or none"
                    f" are expected to be supplied. Defaulting to mode '{mode}'."
                )
            elif not prompt_latents_available and not vae_latents_available and not clip_latents_available:
                # 没有提供输入或 latents
                logger.warning(
                    f"No inputs or latents have been supplied, and mode has not been manually set,"
                    f" defaulting to mode '{mode}'."
                )

        # 返回推断得到的模式
        return mode

    # 从 diffusers.pipelines.pipeline_utils.StableDiffusionMixin.enable_vae_slicing 复制
    # 启用切片 VAE 解码
        def enable_vae_slicing(self):
            r""" 
            启用切片 VAE 解码。当该选项启用时,VAE 将输入张量拆分为多个切片,以
            分步计算解码。这有助于节省内存并允许更大的批量大小。
            """
            # 调用 VAE 的 enable_slicing 方法启用切片解码
            self.vae.enable_slicing()
    
        # 从 diffusers.pipelines.pipeline_utils.StableDiffusionMixin.disable_vae_slicing 复制
        def disable_vae_slicing(self):
            r""" 
            禁用切片 VAE 解码。如果之前启用了 `enable_vae_slicing`,该方法将恢复为
            一步计算解码。
            """
            # 调用 VAE 的 disable_slicing 方法禁用切片解码
            self.vae.disable_slicing()
    
        # 从 diffusers.pipelines.pipeline_utils.StableDiffusionMixin.enable_vae_tiling 复制
        def enable_vae_tiling(self):
            r""" 
            启用平铺 VAE 解码。当该选项启用时,VAE 将输入张量拆分为平铺以
            分步计算解码和编码。这有助于节省大量内存并允许处理更大图像。
            """
            # 调用 VAE 的 enable_tiling 方法启用平铺解码
            self.vae.enable_tiling()
    
        # 从 diffusers.pipelines.pipeline_utils.StableDiffusionMixin.disable_vae_tiling 复制
        def disable_vae_tiling(self):
            r""" 
            禁用平铺 VAE 解码。如果之前启用了 `enable_vae_tiling`,该方法将恢复为
            一步计算解码。
            """
            # 调用 VAE 的 disable_tiling 方法禁用平铺解码
            self.vae.disable_tiling()
    
        # 手动设置模式的函数
        def set_text_mode(self):
            r""" 
            手动将生成模式设置为无条件("边际")文本生成。
            """
            # 将模式属性设置为 "text"
            self.mode = "text"
    
        def set_image_mode(self):
            r""" 
            手动将生成模式设置为无条件("边际")图像生成。
            """
            # 将模式属性设置为 "img"
            self.mode = "img"
    
        def set_text_to_image_mode(self):
            r""" 
            手动将生成模式设置为基于文本的图像生成。
            """
            # 将模式属性设置为 "text2img"
            self.mode = "text2img"
    
        def set_image_to_text_mode(self):
            r""" 
            手动将生成模式设置为基于图像的文本生成。
            """
            # 将模式属性设置为 "img2text"
            self.mode = "img2text"
    
        def set_joint_mode(self):
            r""" 
            手动将生成模式设置为无条件联合图像-文本生成。
            """
            # 将模式属性设置为 "joint"
            self.mode = "joint"
    
        def reset_mode(self):
            r""" 
            移除手动设置的模式;调用此方法后,管道将从输入推断模式。
            """
            # 将模式属性重置为 None
            self.mode = None
    
        def _infer_batch_size(
            self,
            mode,
            prompt,
            prompt_embeds,
            image,
            num_images_per_prompt,
            num_prompts_per_image,
            latents,
            prompt_latents,
            vae_latents,
            clip_latents,
    # 定义文档字符串,说明该函数用于推断批处理大小和乘数
    ):
        r"""Infers the batch size and multiplier depending on mode and supplied arguments to `__call__`."""
        # 如果每个提示的图像数量未指定,则默认为1
        if num_images_per_prompt is None:
            num_images_per_prompt = 1
        # 如果每个图像的提示数量未指定,则默认为1
        if num_prompts_per_image is None:
            num_prompts_per_image = 1

        # 确保每个提示的图像数量为正整数
        assert num_images_per_prompt > 0, "num_images_per_prompt must be a positive integer"
        # 确保每个图像的提示数量为正整数
        assert num_prompts_per_image > 0, "num_prompts_per_image must be a positive integer"

        # 如果模式为“text2img”
        if mode in ["text2img"]:
            # 如果提供了提示且类型为字符串,则批处理大小为1
            if prompt is not None and isinstance(prompt, str):
                batch_size = 1
            # 如果提供了提示且类型为列表,则批处理大小为提示数量
            elif prompt is not None and isinstance(prompt, list):
                batch_size = len(prompt)
            else:
                # 对于“text2img”,必须提供提示或提示嵌入
                batch_size = prompt_embeds.shape[0]
            # 乘数设为每个提示的图像数量
            multiplier = num_images_per_prompt
        # 如果模式为“img2text”
        elif mode in ["img2text"]:
            # 如果图像为PIL图像,则批处理大小为1
            if isinstance(image, PIL.Image.Image):
                batch_size = 1
            else:
                # 图像必须是PIL图像或torch.Tensor类型,不支持image_embeds
                batch_size = image.shape[0]
            # 乘数设为每个图像的提示数量
            multiplier = num_prompts_per_image
        # 如果模式为“img”
        elif mode in ["img"]:
            # 如果VAE潜变量存在,则批处理大小为VAE潜变量的数量
            if vae_latents is not None:
                batch_size = vae_latents.shape[0]
            # 如果CLIP潜变量存在,则批处理大小为CLIP潜变量的数量
            elif clip_latents is not None:
                batch_size = clip_latents.shape[0]
            else:
                # 否则,默认为1
                batch_size = 1
            # 乘数设为每个提示的图像数量
            multiplier = num_images_per_prompt
        # 如果模式为“text”
        elif mode in ["text"]:
            # 如果提示潜变量存在,则批处理大小为提示潜变量的数量
            if prompt_latents is not None:
                batch_size = prompt_latents.shape[0]
            else:
                # 否则,默认为1
                batch_size = 1
            # 乘数设为每个图像的提示数量
            multiplier = num_prompts_per_image
        # 如果模式为“joint”
        elif mode in ["joint"]:
            # 如果潜变量存在,则批处理大小为潜变量的数量
            if latents is not None:
                batch_size = latents.shape[0]
            elif prompt_latents is not None:
                batch_size = prompt_latents.shape[0]
            elif vae_latents is not None:
                batch_size = vae_latents.shape[0]
            elif clip_latents is not None:
                batch_size = clip_latents.shape[0]
            else:
                # 否则,默认为1
                batch_size = 1

            # 如果每个提示的图像数量与每个图像的提示数量相等,则乘数等于该数量
            if num_images_per_prompt == num_prompts_per_image:
                multiplier = num_images_per_prompt
            else:
                # 否则,乘数为二者中的较小值,并发出警告
                multiplier = min(num_images_per_prompt, num_prompts_per_image)
                logger.warning(
                    f"You are using mode `{mode}` and `num_images_per_prompt`: {num_images_per_prompt} and"
                    f" num_prompts_per_image: {num_prompts_per_image} are not equal. Using batch size equal to"
                    f" `min(num_images_per_prompt, num_prompts_per_image) = {batch_size}."
                )
        # 返回计算得出的批处理大小和乘数
        return batch_size, multiplier

    # 从diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt复制的代码
    # 定义编码提示的私有方法,接收多个参数用于处理提示信息
        def _encode_prompt(
            self,
            prompt,  # 提示文本
            device,  # 设备类型(如 CPU 或 GPU)
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否使用无分类器引导
            negative_prompt=None,  # 负面提示文本(可选)
            prompt_embeds: Optional[torch.Tensor] = None,  # 提示的嵌入表示(可选)
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 负面提示的嵌入表示(可选)
            lora_scale: Optional[float] = None,  # LoRA 的缩放因子(可选)
            **kwargs,  # 其他关键字参数
        ):
            # 警告信息,表明该方法已弃用,并建议使用 `encode_prompt()` 代替
            deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
            # 调用弃用警告函数,显示该方法的弃用信息
            deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
    
            # 调用编码提示的方法,获取提示嵌入的元组
            prompt_embeds_tuple = self.encode_prompt(
                prompt=prompt,  # 提示文本
                device=device,  # 设备类型
                num_images_per_prompt=num_images_per_prompt,  # 图像数量
                do_classifier_free_guidance=do_classifier_free_guidance,  # 无分类器引导标志
                negative_prompt=negative_prompt,  # 负面提示
                prompt_embeds=prompt_embeds,  # 提示嵌入
                negative_prompt_embeds=negative_prompt_embeds,  # 负面提示嵌入
                lora_scale=lora_scale,  # LoRA 缩放因子
                **kwargs,  # 其他参数
            )
    
            # 将提示嵌入元组中的两个部分进行拼接,以支持向后兼容
            prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
    
            # 返回拼接后的提示嵌入
            return prompt_embeds
    
        # 从 StableDiffusionPipeline 的 encode_prompt 方法复制,替换了 tokenizer 为 clip_tokenizer
        def encode_prompt(
            self,
            prompt,  # 提示文本
            device,  # 设备类型
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否使用无分类器引导
            negative_prompt=None,  # 负面提示文本(可选)
            prompt_embeds: Optional[torch.Tensor] = None,  # 提示的嵌入表示(可选)
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 负面提示的嵌入表示(可选)
            lora_scale: Optional[float] = None,  # LoRA 的缩放因子(可选)
            clip_skip: Optional[int] = None,  # 可选的跳过步骤(可选)
        # 从 StableDiffusionInstructPix2PixPipeline 的 prepare_image_latents 方法修改而来
        # 添加 num_prompts_per_image 参数,从自动编码器的瞬时分布中采样
        def encode_image_vae_latents(
            self,
            image,  # 输入图像
            batch_size,  # 批量大小
            num_prompts_per_image,  # 每个图像的提示数量
            dtype,  # 数据类型
            device,  # 设备类型
            do_classifier_free_guidance,  # 是否使用无分类器引导
            generator=None,  # 随机数生成器(可选)
    # 定义一个函数,以便对图像进行处理和编码
        ):
            # 检查输入的图像类型是否为指定的几种类型之一
            if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)):
                # 抛出错误,提示图像类型不正确
                raise ValueError(
                    f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}"
                )
    
            # 将图像转换到指定的设备和数据类型
            image = image.to(device=device, dtype=dtype)
    
            # 计算有效批量大小
            batch_size = batch_size * num_prompts_per_image
            # 检查生成器列表的长度是否与批量大小匹配
            if isinstance(generator, list) and len(generator) != batch_size:
                # 抛出错误,提示生成器长度与请求的批量大小不符
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 如果生成器是列表,则逐个编码图像并生成潜在向量
            if isinstance(generator, list):
                image_latents = [
                    self.vae.encode(image[i : i + 1]).latent_dist.sample(generator=generator[i])
                    * self.vae.config.scaling_factor
                    for i in range(batch_size)
                ]
                # 将潜在向量按维度0拼接成一个大的张量
                image_latents = torch.cat(image_latents, dim=0)
            else:
                # 否则直接编码图像并生成潜在向量
                image_latents = self.vae.encode(image).latent_dist.sample(generator=generator)
                # 按照 VAE 的缩放因子对潜在向量进行缩放
                image_latents = image_latents * self.vae.config.scaling_factor
    
            # 检查批量大小是否大于潜在向量的形状并且能够被整除
            if batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] == 0:
                # 如果条件满足,则构建弃用警告信息
                deprecation_message = (
                    f"You have passed {batch_size} text prompts (`prompt`), but only {image_latents.shape[0]} initial"
                    " images (`image`). Initial images are now duplicating to match the number of text prompts. Note"
                    " that this behavior is deprecated and will be removed in a version 1.0.0. Please make sure to update"
                    " your script to pass as many initial images as text prompts to suppress this warning."
                )
                # 记录弃用信息
                deprecate("len(prompt) != len(image)", "1.0.0", deprecation_message, standard_warn=False)
                # 计算每个提示所需的额外图像数量
                additional_image_per_prompt = batch_size // image_latents.shape[0]
                # 将潜在向量按额外图像数量进行重复拼接
                image_latents = torch.cat([image_latents] * additional_image_per_prompt, dim=0)
            # 如果批量大小大于潜在向量的形状且不能被整除,抛出错误
            elif batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] != 0:
                raise ValueError(
                    f"Cannot duplicate `image` of batch size {image_latents.shape[0]} to {batch_size} text prompts."
                )
            else:
                # 将潜在向量按维度0拼接
                image_latents = torch.cat([image_latents], dim=0)
    
            # 如果启用了无分类器自由引导,则构造无条件潜在向量
            if do_classifier_free_guidance:
                uncond_image_latents = torch.zeros_like(image_latents)
                # 拼接无条件潜在向量以形成最终的潜在向量
                image_latents = torch.cat([image_latents, image_latents, uncond_image_latents], dim=0)
    
            # 返回最终的图像潜在向量
            return image_latents
    
        # 定义编码图像 CLIP 潜在向量的函数
        def encode_image_clip_latents(
            self,
            image,
            batch_size,
            num_prompts_per_image,
            dtype,
            device,
            generator=None,
    ):
        # 将图像映射到 CLIP 嵌入。
        # 检查输入的 image 是否为有效类型:torch.Tensor、PIL.Image.Image 或 list
        if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)):
            # 如果类型不匹配,抛出值错误并显示当前类型
            raise ValueError(
                f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}"
            )

        # 使用 clip_image_processor 预处理图像,并返回张量格式
        preprocessed_image = self.clip_image_processor.preprocess(
            image,
            return_tensors="pt",
        )
        # 将预处理后的图像移动到指定设备并设置数据类型
        preprocessed_image = preprocessed_image.to(device=device, dtype=dtype)

        # 根据提示数和每个图像的提示数计算批处理大小
        batch_size = batch_size * num_prompts_per_image
        # 如果生成器是列表,逐个处理每个预处理图像
        if isinstance(generator, list):
            image_latents = [
                # 使用 image_encoder 对每个预处理图像进行编码,获取图像嵌入
                self.image_encoder(**preprocessed_image[i : i + 1]).image_embeds for i in range(batch_size)
            ]
            # 将所有图像嵌入在第0维上拼接成一个张量
            image_latents = torch.cat(image_latents, dim=0)
        else:
            # 如果生成器不是列表,直接对预处理图像进行编码
            image_latents = self.image_encoder(**preprocessed_image).image_embeds

        # 如果批处理大小大于图像嵌入数量并且可以整除
        if batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] == 0:
            # 扩展 image_latents 以匹配批处理大小
            deprecation_message = (
                f"You have passed {batch_size} text prompts (`prompt`), but only {image_latents.shape[0]} initial"
                " images (`image`). Initial images are now duplicating to match the number of text prompts. Note"
                " that this behavior is deprecated and will be removed in a version 1.0.0. Please make sure to update"
                " your script to pass as many initial images as text prompts to suppress this warning."
            )
            # 发出弃用警告,提示用户更新代码
            deprecate("len(prompt) != len(image)", "1.0.0", deprecation_message, standard_warn=False)
            # 计算每个提示需要的额外图像数量
            additional_image_per_prompt = batch_size // image_latents.shape[0]
            # 将图像嵌入重复以匹配批处理大小
            image_latents = torch.cat([image_latents] * additional_image_per_prompt, dim=0)
        # 如果批处理大小大于图像嵌入数量但不能整除,抛出值错误
        elif batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] != 0:
            raise ValueError(
                f"Cannot duplicate `image` of batch size {image_latents.shape[0]} to {batch_size} text prompts."
            )
        else:
            # 将 image_latents 包装成一个张量
            image_latents = torch.cat([image_latents], dim=0)

        # 如果生成器是列表且其长度与批处理大小不匹配,抛出值错误
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 返回最终的图像嵌入张量
        return image_latents

    def prepare_text_latents(
        # 准备文本嵌入的函数定义,参数包括批处理大小、每个提示的图像数、序列长度、隐藏层大小、数据类型、设备、生成器和潜在变量
        self, batch_size, num_images_per_prompt, seq_len, hidden_size, dtype, device, generator, latents=None
    ):
        # 准备用于 CLIP 嵌入提示的潜在变量
        shape = (batch_size * num_images_per_prompt, seq_len, hidden_size)  # 定义潜在变量的形状
        # 检查生成器是否为列表且长度是否与批大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )  # 抛出错误,提示生成器长度与批大小不匹配

        # 如果潜在变量为 None,则生成随机潜在变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)  # 生成随机潜在变量
        else:
            # 假设潜在变量具有形状 (B, L, D)
            latents = latents.repeat(num_images_per_prompt, 1, 1)  # 根据每个提示的图像数量重复潜在变量
            latents = latents.to(device=device, dtype=dtype)  # 将潜在变量转移到指定设备和数据类型

        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma  # 缩放潜在变量
        return latents  # 返回处理后的潜在变量

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 修改而来
    # 将 prepare_latents 重命名为 prepare_image_vae_latents,并添加 num_prompts_per_image 参数。
    def prepare_image_vae_latents(
        self,
        batch_size,
        num_prompts_per_image,
        num_channels_latents,
        height,
        width,
        dtype,
        device,
        generator,
        latents=None,
    ):
        # 定义潜在变量的形状
        shape = (
            batch_size * num_prompts_per_image,
            num_channels_latents,
            height // self.vae_scale_factor,
            width // self.vae_scale_factor,
        )
        # 检查生成器是否为列表且长度是否与批大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )  # 抛出错误,提示生成器长度与批大小不匹配

        # 如果潜在变量为 None,则生成随机潜在变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)  # 生成随机潜在变量
        else:
            # 假设潜在变量具有形状 (B, C, H, W)
            latents = latents.repeat(num_prompts_per_image, 1, 1, 1)  # 根据每个图像的提示数量重复潜在变量
            latents = latents.to(device=device, dtype=dtype)  # 将潜在变量转移到指定设备和数据类型

        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma  # 缩放潜在变量
        return latents  # 返回处理后的潜在变量

    def prepare_image_clip_latents(
        self, batch_size, num_prompts_per_image, clip_img_dim, dtype, device, generator, latents=None
    ):
        # 准备 CLIP 嵌入图像的潜在表示
        shape = (batch_size * num_prompts_per_image, 1, clip_img_dim)  # 定义潜在张量的形状
        # 检查生成器列表长度是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )  # 抛出值错误,说明生成器列表与批量大小不匹配

        # 如果潜在张量为 None,则生成随机潜在张量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)  # 生成随机张量
        else:
            # 假设潜在张量的形状为 (B, L, D)
            latents = latents.repeat(num_prompts_per_image, 1, 1)  # 按提示数量重复潜在张量
            latents = latents.to(device=device, dtype=dtype)  # 将潜在张量转移到指定设备和数据类型

        # 按调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma  # 缩放潜在张量
        return latents  # 返回处理后的潜在张量

    def decode_text_latents(self, text_latents, device):
        # 生成输出标记列表和序列长度
        output_token_list, seq_lengths = self.text_decoder.generate_captions(
            text_latents, self.text_tokenizer.eos_token_id, device=device
        )  # 调用文本解码器生成文本输出
        output_list = output_token_list.cpu().numpy()  # 将输出转移到 CPU 并转换为 NumPy 数组
        # 解码输出标记,生成文本
        generated_text = [
            self.text_tokenizer.decode(output[: int(length)], skip_special_tokens=True)
            for output, length in zip(output_list, seq_lengths)
        ]  # 逐个解码每个输出
        return generated_text  # 返回生成的文本列表

    def _split(self, x, height, width):
        r"""
        将形状为 (B, C * H * W + clip_img_dim) 的扁平化嵌入 x 拆分为两个张量,形状为 (B, C, H, W)
        和 (B, 1, clip_img_dim)
        """
        batch_size = x.shape[0]  # 获取批量大小
        latent_height = height // self.vae_scale_factor  # 计算潜在高度
        latent_width = width // self.vae_scale_factor  # 计算潜在宽度
        img_vae_dim = self.num_channels_latents * latent_height * latent_width  # 计算 VAE 图像维度

        # 根据指定维度拆分张量
        img_vae, img_clip = x.split([img_vae_dim, self.image_encoder_projection_dim], dim=1)  # 拆分为 VAE 和 CLIP 图像

        img_vae = torch.reshape(img_vae, (batch_size, self.num_channels_latents, latent_height, latent_width))  # 重塑 VAE 图像
        img_clip = torch.reshape(img_clip, (batch_size, 1, self.image_encoder_projection_dim))  # 重塑 CLIP 图像
        return img_vae, img_clip  # 返回拆分后的两个张量

    def _combine(self, img_vae, img_clip):
        r"""
        将形状为 (B, C, H, W) 的潜在图像 img_vae 和形状为 (B, 1,
        clip_img_dim) 的 CLIP 嵌入图像 img_clip 组合成一个形状为 (B, C * H * W + clip_img_dim) 的张量。
        """
        img_vae = torch.reshape(img_vae, (img_vae.shape[0], -1))  # 将 VAE 图像重塑为一维张量
        img_clip = torch.reshape(img_clip, (img_clip.shape[0], -1))  # 将 CLIP 图像重塑为一维张量
        return torch.concat([img_vae, img_clip], dim=-1)  # 按最后一个维度连接两个张量
    # 将扁平化的嵌入 x 拆分为 (img_vae, img_clip, text)
    def _split_joint(self, x, height, width):
        r"""
        拆分形状为 (B, C * H * W + clip_img_dim + text_seq_len * text_dim) 的扁平化嵌入 x 为 (img_vae,
        img_clip, text),其中 img_vae 形状为 (B, C, H, W),img_clip 形状为 (B, 1, clip_img_dim),text 形状为
        (B, text_seq_len, text_dim)。
        """
        # 获取输入 x 的批量大小
        batch_size = x.shape[0]
        # 计算潜在空间的高度
        latent_height = height // self.vae_scale_factor
        # 计算潜在空间的宽度
        latent_width = width // self.vae_scale_factor
        # 计算 img_vae 的维度
        img_vae_dim = self.num_channels_latents * latent_height * latent_width
        # 计算 text 的维度
        text_dim = self.text_encoder_seq_len * self.text_intermediate_dim

        # 根据指定的维度拆分 x
        img_vae, img_clip, text = x.split([img_vae_dim, self.image_encoder_projection_dim, text_dim], dim=1)

        # 将 img_vae 重新塑形为 (B, C, H, W)
        img_vae = torch.reshape(img_vae, (batch_size, self.num_channels_latents, latent_height, latent_width))
        # 将 img_clip 重新塑形为 (B, 1, clip_img_dim)
        img_clip = torch.reshape(img_clip, (batch_size, 1, self.image_encoder_projection_dim))
        # 将 text 重新塑形为 (B, text_seq_len, text_intermediate_dim)
        text = torch.reshape(text, (batch_size, self.text_encoder_seq_len, self.text_intermediate_dim))
        # 返回拆分后的 img_vae、img_clip 和 text
        return img_vae, img_clip, text

    # 将 img_vae、img_clip 和 text 组合成一个单一的嵌入 x
    def _combine_joint(self, img_vae, img_clip, text):
        r"""
        将形状为 (B, C, H, W) 的潜在图像 img_vae,形状为 (B, L_img,
        clip_img_dim) 的 CLIP 嵌入图像 img_clip,以及形状为 (B, L_text, text_dim) 的文本嵌入 text
        组合成形状为 (B, C * H * W + L_img * clip_img_dim + L_text * text_dim) 的单一嵌入 x。
        """
        # 将 img_vae 重塑为 (B, C * H * W)
        img_vae = torch.reshape(img_vae, (img_vae.shape[0], -1))
        # 将 img_clip 重塑为 (B, L_img * clip_img_dim)
        img_clip = torch.reshape(img_clip, (img_clip.shape[0], -1))
        # 将 text 重塑为 (B, L_text * text_dim)
        text = torch.reshape(text, (text.shape[0], -1))
        # 将 img_vae、img_clip 和 text 沿最后一个维度连接
        return torch.concat([img_vae, img_clip, text], dim=-1)

    # 获取噪声预测的核心函数
    def _get_noise_pred(
        self,
        mode,
        latents,
        t,
        prompt_embeds,
        img_vae,
        img_clip,
        max_timestep,
        data_type,
        guidance_scale,
        generator,
        device,
        height,
        width,
    # 检查潜在变量的形状是否符合预期
    def check_latents_shape(self, latents_name, latents, expected_shape):
        # 获取潜在变量的形状
        latents_shape = latents.shape
        # 计算预期维度,包括批量维度
        expected_num_dims = len(expected_shape) + 1  # expected dimensions plus the batch dimension
        # 生成预期形状的字符串
        expected_shape_str = ", ".join(str(dim) for dim in expected_shape)
        # 检查潜在变量维度数量是否符合预期
        if len(latents_shape) != expected_num_dims:
            raise ValueError(
                f"`{latents_name}` 应具有形状 (batch_size, {expected_shape_str}),但当前形状"
                f" {latents_shape} 有 {len(latents_shape)} 维度。"
            )
        # 遍历每个维度进行逐一检查
        for i in range(1, expected_num_dims):
            # 检查每个维度是否与预期匹配
            if latents_shape[i] != expected_shape[i - 1]:
                raise ValueError(
                    f"`{latents_name}` 应具有形状 (batch_size, {expected_shape_str}),但当前形状"
                    f" {latents_shape} 在维度 {i} 有 {latents_shape[i]} != {expected_shape[i - 1]}。"
                )
    # 定义输入检查方法
        def check_inputs(
            self,  # 当前实例对象
            mode,  # 模式参数,指示当前操作的类型
            prompt,  # 提示文本,用于生成内容
            image,  # 输入图像,可能用于处理或生成
            height,  # 输出图像的高度
            width,  # 输出图像的宽度
            callback_steps,  # 回调步骤的频率
            negative_prompt=None,  # 可选的负面提示,用于限制生成内容
            prompt_embeds=None,  # 可选的提示嵌入,用于直接输入嵌入向量
            negative_prompt_embeds=None,  # 可选的负面提示嵌入
            latents=None,  # 可选的潜在变量,用于生成过程
            prompt_latents=None,  # 可选的提示潜在变量
            vae_latents=None,  # 可选的变分自编码器潜在变量
            clip_latents=None,  # 可选的 CLIP 潜在变量
        @torch.no_grad()  # 禁用梯度计算,以节省内存和加快推理速度
        def __call__(  # 定义调用方法,使对象可调用
            self,  # 当前实例对象
            prompt: Optional[Union[str, List[str]]] = None,  # 可选的提示文本,可以是字符串或字符串列表
            image: Optional[Union[torch.Tensor, PIL.Image.Image]] = None,  # 可选的输入图像,可以是张量或图像对象
            height: Optional[int] = None,  # 可选的输出图像高度
            width: Optional[int] = None,  # 可选的输出图像宽度
            data_type: Optional[int] = 1,  # 数据类型,默认值为 1
            num_inference_steps: int = 50,  # 推理步骤的数量,默认 50
            guidance_scale: float = 8.0,  # 引导尺度,控制生成内容的自由度
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 可选的负面提示
            num_images_per_prompt: Optional[int] = 1,  # 每个提示生成的图像数量,默认 1
            num_prompts_per_image: Optional[int] = 1,  # 每个图像的提示数量,默认 1
            eta: float = 0.0,  # 噪声参数,控制生成过程的随机性
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,  # 可选的随机数生成器
            latents: Optional[torch.Tensor] = None,  # 可选的潜在变量
            prompt_latents: Optional[torch.Tensor] = None,  # 可选的提示潜在变量
            vae_latents: Optional[torch.Tensor] = None,  # 可选的变分自编码器潜在变量
            clip_latents: Optional[torch.Tensor] = None,  # 可选的 CLIP 潜在变量
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
            output_type: Optional[str] = "pil",  # 输出类型,默认为 PIL 图像
            return_dict: bool = True,  # 是否返回字典格式的结果,默认是
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,  # 可选的回调函数
            callback_steps: int = 1,  # 回调的步骤数,默认是 1

.\diffusers\pipelines\unidiffuser\__init__.py

# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING

# 从上级模块导入所需的工具和常量
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 慢导入标志
    OptionalDependencyNotAvailable,  # 可选依赖不可用异常
    _LazyModule,  # 延迟加载模块类
    is_torch_available,  # 检查是否可用 PyTorch
    is_transformers_available,  # 检查是否可用 Transformers
)

# 初始化空字典用于存储虚拟对象
_dummy_objects = {}
# 初始化空字典用于存储导入结构
_import_structure = {}

# 尝试块,用于处理可选依赖
try:
    # 如果 Transformers 和 PyTorch 不可用,则抛出异常
    if not (is_transformers_available() and is_torch_available()):
        raise OptionalDependencyNotAvailable()
# 异常处理块
except OptionalDependencyNotAvailable:
    # 从虚拟对象模块导入必要的类
    from ...utils.dummy_torch_and_transformers_objects import (
        ImageTextPipelineOutput,  # 图像文本管道输出
        UniDiffuserPipeline,  # UniDiffuser 管道
    )

    # 更新虚拟对象字典
    _dummy_objects.update(
        {"ImageTextPipelineOutput": ImageTextPipelineOutput, "UniDiffuserPipeline": UniDiffuserPipeline}
    )
# 否则块
else:
    # 更新导入结构以包含文本解码模型
    _import_structure["modeling_text_decoder"] = ["UniDiffuserTextDecoder"]
    # 更新导入结构以包含 UVIT 模型
    _import_structure["modeling_uvit"] = ["UniDiffuserModel", "UTransformer2DModel"]
    # 更新导入结构以包含图像文本管道
    _import_structure["pipeline_unidiffuser"] = ["ImageTextPipelineOutput", "UniDiffuserPipeline"]

# 检查类型是否在检查中或是否启用慢导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 尝试块,用于处理可选依赖
    try:
        # 如果 Transformers 和 PyTorch 不可用,则抛出异常
        if not (is_transformers_available() and is_torch_available()):
            raise OptionalDependencyNotAvailable()
    # 异常处理块
    except OptionalDependencyNotAvailable:
        # 从虚拟对象模块导入必要的类
        from ...utils.dummy_torch_and_transformers_objects import (
            ImageTextPipelineOutput,  # 图像文本管道输出
            UniDiffuserPipeline,  # UniDiffuser 管道
        )
    # 否则块
    else:
        # 从文本解码模型模块导入
        from .modeling_text_decoder import UniDiffuserTextDecoder
        # 从 UVIT 模型模块导入
        from .modeling_uvit import UniDiffuserModel, UTransformer2DModel
        # 从管道模块导入图像文本管道输出和 UniDiffuser 管道
        from .pipeline_unidiffuser import ImageTextPipelineOutput, UniDiffuserPipeline

# 否则块
else:
    # 导入系统模块以进行模块操作
    import sys

    # 用于延迟加载模块的类创建模块实例
    sys.modules[__name__] = _LazyModule(
        __name__,
        globals()["__file__"],
        _import_structure,
        module_spec=__spec__,
    )

    # 将虚拟对象字典中的对象设置到当前模块
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\wuerstchen\modeling_paella_vq_model.py

# Copyright (c) 2022 Dominic Rampas MIT License
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 从 typing 模块导入 Union 类型,用于类型注解
from typing import Union

# 导入 PyTorch 库及其神经网络模块
import torch
import torch.nn as nn

# 从配置相关的工具模块导入 ConfigMixin 和 register_to_config
from ...configuration_utils import ConfigMixin, register_to_config
# 从 VAE 模型中导入 DecoderOutput 和 VectorQuantizer
from ...models.autoencoders.vae import DecoderOutput, VectorQuantizer
# 从模型工具中导入 ModelMixin
from ...models.modeling_utils import ModelMixin
# 从 VQ 模型中导入 VQEncoderOutput
from ...models.vq_model import VQEncoderOutput
# 从加速工具中导入 apply_forward_hook
from ...utils.accelerate_utils import apply_forward_hook


class MixingResidualBlock(nn.Module):
    """
    Residual block with mixing used by Paella's VQ-VAE.
    """  
    # 定义 MixingResidualBlock 类,继承自 nn.Module

    def __init__(self, inp_channels, embed_dim):
        # 构造函数,初始化输入通道数和嵌入维度
        super().__init__()
        # depthwise
        # 对输入通道进行层归一化,设置为不使用可学习的仿射变换,防止除以零的情况
        self.norm1 = nn.LayerNorm(inp_channels, elementwise_affine=False, eps=1e-6)
        # 使用深度可分离卷积,增加卷积的有效性和计算效率
        self.depthwise = nn.Sequential(
            # 对输入进行填充以保持卷积后的尺寸
            nn.ReplicationPad2d(1), 
            # 创建深度可分离卷积层
            nn.Conv2d(inp_channels, inp_channels, kernel_size=3, groups=inp_channels)
        )

        # channelwise
        # 对输入通道进行第二次层归一化
        self.norm2 = nn.LayerNorm(inp_channels, elementwise_affine=False, eps=1e-6)
        # 定义一个全连接层的序列,用于通道混合
        self.channelwise = nn.Sequential(
            # 第一个线性层将输入通道数映射到嵌入维度
            nn.Linear(inp_channels, embed_dim), 
            # 使用 GELU 激活函数
            nn.GELU(), 
            # 第二个线性层将嵌入维度映射回输入通道数
            nn.Linear(embed_dim, inp_channels)
        )

        # 定义可学习的参数 gammas,初始化为零,允许模型在训练中更新这些值
        self.gammas = nn.Parameter(torch.zeros(6), requires_grad=True)

    def forward(self, x):
        # 定义前向传播函数,接收输入 x
        mods = self.gammas  # 获取可学习的 gammas 参数
        # 对输入进行第一层归一化和变换,并应用 gammas[0] 和 mods[1]
        x_temp = self.norm1(x.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) * (1 + mods[0]) + mods[1]
        # 将经过深度卷积处理的 x_temp 加入原始输入 x,乘以 gammas[2]
        x = x + self.depthwise(x_temp) * mods[2]
        # 对当前的 x 进行第二层归一化和变换,并应用 gammas[3] 和 mods[4]
        x_temp = self.norm2(x.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) * (1 + mods[3]) + mods[4]
        # 将经过通道混合处理的 x_temp 加入当前的 x,乘以 gammas[5]
        x = x + self.channelwise(x_temp.permute(0, 2, 3, 1)).permute(0, 3, 1, 2) * mods[5]
        # 返回处理后的 x
        return x


class PaellaVQModel(ModelMixin, ConfigMixin):
    r"""VQ-VAE model from Paella model.

    This model inherits from [`ModelMixin`]. Check the superclass documentation for the generic methods the library
    implements for all the model (such as downloading or saving, etc.)
    # 参数说明部分,描述构造函数的各个参数
    Parameters:
        # 输入图像的通道数,默认为3(RGB图像)
        in_channels (int, *optional*, defaults to 3): Number of channels in the input image.
        # 输出图像的通道数,默认为3(RGB图像)
        out_channels (int,  *optional*, defaults to 3): Number of channels in the output.
        # 输入图像的上下缩放因子,默认为2
        up_down_scale_factor (int, *optional*, defaults to 2): Up and Downscale factor of the input image.
        # 模型中的层数,默认为2
        levels  (int, *optional*, defaults to 2): Number of levels in the model.
        # 模型中的瓶颈块数,默认为12
        bottleneck_blocks (int, *optional*, defaults to 12): Number of bottleneck blocks in the model.
        # 模型中隐藏通道的数量,默认为384
        embed_dim (int, *optional*, defaults to 384): Number of hidden channels in the model.
        # VQ-VAE模型中的潜在通道数量,默认为4
        latent_channels (int, *optional*, defaults to 4): Number of latent channels in the VQ-VAE model.
        # VQ-VAE中的代码簿向量数量,默认为8192
        num_vq_embeddings (int, *optional*, defaults to 8192): Number of codebook vectors in the VQ-VAE.
        # 潜在空间的缩放因子,默认为0.3764
        scale_factor (float, *optional*, defaults to 0.3764): Scaling factor of the latent space.
    """

    # 初始化方法的装饰器,用于注册配置
    @register_to_config
    # 构造函数,定义模型初始化参数及其默认值
    def __init__(
        # 输入图像的通道数,默认为3
        self,
        in_channels: int = 3,
        # 输出图像的通道数,默认为3
        out_channels: int = 3,
        # 上下缩放因子,默认为2
        up_down_scale_factor: int = 2,
        # 模型的层数,默认为2
        levels: int = 2,
        # 瓶颈块的数量,默认为12
        bottleneck_blocks: int = 12,
        # 隐藏通道的数量,默认为384
        embed_dim: int = 384,
        # 潜在通道的数量,默认为4
        latent_channels: int = 4,
        # VQ-VAE中的代码簿向量数量,默认为8192
        num_vq_embeddings: int = 8192,
        # 潜在空间的缩放因子,默认为0.3764
        scale_factor: float = 0.3764,
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 计算每个层级的通道数,使用倒序以便后续操作
        c_levels = [embed_dim // (2**i) for i in reversed(range(levels))]
        # 创建编码器块
        self.in_block = nn.Sequential(
            # 像素不规则拆分,改变输入的空间分辨率
            nn.PixelUnshuffle(up_down_scale_factor),
            # 1x1卷积,将输入通道数变换为第一个层级的通道数
            nn.Conv2d(in_channels * up_down_scale_factor**2, c_levels[0], kernel_size=1),
        )
        down_blocks = []  # 初始化下采样块列表
        for i in range(levels):  # 遍历每一层级
            if i > 0:  # 如果不是第一层级
                # 添加卷积层,用于下采样,改变通道数
                down_blocks.append(nn.Conv2d(c_levels[i - 1], c_levels[i], kernel_size=4, stride=2, padding=1))
            # 创建混合残差块,增加网络深度
            block = MixingResidualBlock(c_levels[i], c_levels[i] * 4)
            down_blocks.append(block)  # 添加残差块到下采样列表
        down_blocks.append(
            nn.Sequential(
                # 1x1卷积,将最后一层的通道数转变为潜在通道数
                nn.Conv2d(c_levels[-1], latent_channels, kernel_size=1, bias=False),
                # 批归一化,确保数据均值为0,方差为1
                nn.BatchNorm2d(latent_channels),  # then normalize them to have mean 0 and std 1
            )
        )
        # 将下采样块列表封装成序列
        self.down_blocks = nn.Sequential(*down_blocks)

        # 向量量化器,使用指定数量的嵌入向量
        self.vquantizer = VectorQuantizer(num_vq_embeddings, vq_embed_dim=latent_channels, legacy=False, beta=0.25)

        # 创建解码器块
        up_blocks = [nn.Sequential(nn.Conv2d(latent_channels, c_levels[-1], kernel_size=1))]  # 第一层解码
        for i in range(levels):  # 遍历每一层级
            for j in range(bottleneck_blocks if i == 0 else 1):  # 添加瓶颈块
                block = MixingResidualBlock(c_levels[levels - 1 - i], c_levels[levels - 1 - i] * 4)
                up_blocks.append(block)  # 添加混合残差块到上采样列表
            if i < levels - 1:  # 如果不是最后一层级
                up_blocks.append(
                    nn.ConvTranspose2d(
                        # 转置卷积层,用于上采样,改变通道数
                        c_levels[levels - 1 - i], c_levels[levels - 2 - i], kernel_size=4, stride=2, padding=1
                    )
                )
        # 将上采样块列表封装成序列
        self.up_blocks = nn.Sequential(*up_blocks)
        self.out_block = nn.Sequential(
            # 1x1卷积,将第一层的通道数变为输出通道数
            nn.Conv2d(c_levels[0], out_channels * up_down_scale_factor**2, kernel_size=1),
            # 像素重排,恢复到原始的空间分辨率
            nn.PixelShuffle(up_down_scale_factor),
        )

    # 应用前向钩子,定义编码过程
    @apply_forward_hook
    def encode(self, x: torch.Tensor, return_dict: bool = True) -> VQEncoderOutput:
        # 通过输入块处理输入数据
        h = self.in_block(x)
        # 通过下采样块处理数据
        h = self.down_blocks(h)

        # 如果不需要返回字典形式
        if not return_dict:
            return (h,)

        # 返回 VQ 编码输出,包含潜在表示
        return VQEncoderOutput(latents=h)

    # 应用前向钩子,定义解码过程
    @apply_forward_hook
    def decode(
        self, h: torch.Tensor, force_not_quantize: bool = True, return_dict: bool = True
    ) -> Union[DecoderOutput, torch.Tensor]:
        # 如果不强制不量化,使用向量量化器
        if not force_not_quantize:
            quant, _, _ = self.vquantizer(h)
        else:
            # 否则直接使用输入作为量化结果
            quant = h

        # 通过上采样块处理量化结果
        x = self.up_blocks(quant)
        # 通过输出块生成最终解码结果
        dec = self.out_block(x)
        # 如果不需要返回字典形式
        if not return_dict:
            return (dec,)

        # 返回解码输出,包含样本数据
        return DecoderOutput(sample=dec)
    # 定义一个前向传播的方法,接受输入样本并选择返回格式
    def forward(self, sample: torch.Tensor, return_dict: bool = True) -> Union[DecoderOutput, torch.Tensor]:
        # 文档字符串,描述参数及其用途
        r"""
        Args:
            sample (`torch.Tensor`): Input sample.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`DecoderOutput`] instead of a plain tuple.
        """
        # 将输入样本赋值给变量 x
        x = sample
        # 对输入样本进行编码,提取潜在变量
        h = self.encode(x).latents
        # 对潜在变量进行解码,获取样本
        dec = self.decode(h).sample
    
        # 如果不返回字典格式
        if not return_dict:
            # 返回解码样本作为元组
            return (dec,)
    
        # 返回包含解码样本的 DecoderOutput 对象
        return DecoderOutput(sample=dec)