diffusers-源码解析-二十-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(二十)

.\diffusers\pipelines\blip_diffusion\pipeline_blip_diffusion.py

# 版权所有 2024 Salesforce.com, inc.  # 指明版权归属
# 版权所有 2024 The HuggingFace Team. All rights reserved. # 指明另一个版权归属
# 根据 Apache License 2.0 许可协议进行授权; # 说明代码的许可协议
# 除非符合许可协议,否则不可使用此文件。 # 指出使用条件
# 可以在以下地址获取许可协议的副本: # 提供许可协议的获取方式
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则软件按"原样"提供。 # 指出软件不提供任何保证
# 请参见许可协议了解特定的权限和限制。 # 指出许可协议的内容
from typing import List, Optional, Union  # 从 typing 模块导入类型提示工具

import PIL.Image  # 导入 PIL 库的图像处理功能
import torch  # 导入 PyTorch 库
from transformers import CLIPTokenizer  # 从 transformers 导入 CLIPTokenizer

from ...models import AutoencoderKL, UNet2DConditionModel  # 从相对路径导入模型
from ...schedulers import PNDMScheduler  # 从相对路径导入调度器
from ...utils import (  # 从相对路径导入工具函数
    logging,  # 导入日志记录工具
    replace_example_docstring,  # 导入替换示例文档字符串的工具
)
from ...utils.torch_utils import randn_tensor  # 从工具模块导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput  # 从相对路径导入管道工具
from .blip_image_processing import BlipImageProcessor  # 从当前目录导入图像处理工具
from .modeling_blip2 import Blip2QFormerModel  # 从当前目录导入 Blip2 模型
from .modeling_ctx_clip import ContextCLIPTextModel  # 从当前目录导入 ContextCLIP 文本模型

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器实例,pylint 禁用无效名称警告

EXAMPLE_DOC_STRING = """  # 示例文档字符串,展示如何使用 BlipDiffusionPipeline
    Examples:  # 示例部分的开始
        ```py  # 开始代码块
        >>> from diffusers.pipelines import BlipDiffusionPipeline  # 导入 BlipDiffusionPipeline
        >>> from diffusers.utils import load_image  # 导入加载图像的工具
        >>> import torch  # 导入 PyTorch 库

        >>> blip_diffusion_pipe = BlipDiffusionPipeline.from_pretrained(  # 创建 BlipDiffusionPipeline 的实例
        ...     "Salesforce/blipdiffusion", torch_dtype=torch.float16  # 从预训练模型加载,设置数据类型为 float16
        ... ).to("cuda")  # 将模型转移到 GPU

        >>> cond_subject = "dog"  # 定义条件主题为“狗”
        >>> tgt_subject = "dog"  # 定义目标主题为“狗”
        >>> text_prompt_input = "swimming underwater"  # 定义文本提示输入

        >>> cond_image = load_image(  # 加载条件图像
        ...     "https://huggingface.co/datasets/ayushtues/blipdiffusion_images/resolve/main/dog.jpg"  # 图像的 URL
        ... )  # 结束加载图像的函数调用
        >>> guidance_scale = 7.5  # 设置引导尺度
        >>> num_inference_steps = 25  # 设置推理步骤数
        >>> negative_prompt = "over-exposure, under-exposure, saturated, duplicate, out of frame, lowres, cropped, worst quality, low quality, jpeg artifacts, morbid, mutilated, out of frame, ugly, bad anatomy, bad proportions, deformed, blurry, duplicate"  # 定义负面提示

        >>> output = blip_diffusion_pipe(  # 调用管道生成输出
        ...     text_prompt_input,  # 传入文本提示输入
        ...     cond_image,  # 传入条件图像
        ...     cond_subject,  # 传入条件主题
        ...     tgt_subject,  # 传入目标主题
        ...     guidance_scale=guidance_scale,  # 传入引导尺度
        ...     num_inference_steps=num_inference_steps,  # 传入推理步骤数
        ...     neg_prompt=negative_prompt,  # 传入负面提示
        ...     height=512,  # 设置输出图像高度
        ...     width=512,  # 设置输出图像宽度
        ... ).images  # 获取生成的图像
        >>> output[0].save("image.png")  # 保存生成的第一张图像为 "image.png"
        ```py  # 结束代码块
"""

class BlipDiffusionPipeline(DiffusionPipeline):  # 定义 BlipDiffusionPipeline 类,继承自 DiffusionPipeline
    """
    Pipeline for Zero-Shot Subject Driven Generation using Blip Diffusion.  # 说明该管道用于零-shot 主题驱动生成

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the  # 指出该模型继承自 DiffusionPipeline,并建议查看超类文档以获取通用方法
    # 库实现所有管道的功能(例如下载或保存,在特定设备上运行等)

    Args:
        tokenizer ([`CLIPTokenizer`]):
            文本编码器的分词器
        text_encoder ([`ContextCLIPTextModel`]):
            用于编码文本提示的文本编码器
        vae ([`AutoencoderKL`]):
            VAE 模型,用于将潜在变量映射到图像
        unet ([`UNet2DConditionModel`]):
            条件 U-Net 架构,用于去噪图像嵌入
        scheduler ([`PNDMScheduler`]):
             与 `unet` 一起使用以生成图像潜在变量的调度器
        qformer ([`Blip2QFormerModel`]):
            QFormer 模型,用于从文本和图像中获取多模态嵌入
        image_processor ([`BlipImageProcessor`]):
            图像处理器,用于图像的预处理和后处理
        ctx_begin_pos (int, `optional`, defaults to 2):
            文本编码器中上下文标记的位置
    """

    # 定义模型的 CPU 卸载顺序
    model_cpu_offload_seq = "qformer->text_encoder->unet->vae"

    def __init__(
        self,
        tokenizer: CLIPTokenizer,
        text_encoder: ContextCLIPTextModel,
        vae: AutoencoderKL,
        unet: UNet2DConditionModel,
        scheduler: PNDMScheduler,
        qformer: Blip2QFormerModel,
        image_processor: BlipImageProcessor,
        ctx_begin_pos: int = 2,
        mean: List[float] = None,
        std: List[float] = None,
    ):
        # 调用父类构造函数
        super().__init__()

        # 注册模块,包括分词器、文本编码器、VAE、U-Net、调度器、QFormer 和图像处理器
        self.register_modules(
            tokenizer=tokenizer,
            text_encoder=text_encoder,
            vae=vae,
            unet=unet,
            scheduler=scheduler,
            qformer=qformer,
            image_processor=image_processor,
        )
        # 将上下文开始位置、均值和标准差注册到配置中
        self.register_to_config(ctx_begin_pos=ctx_begin_pos, mean=mean, std=std)

    # 获取查询嵌入的方法,输入图像和源主题
    def get_query_embeddings(self, input_image, src_subject):
        # 使用 QFormer 获取图像输入和文本输入的嵌入,返回字典
        return self.qformer(image_input=input_image, text_input=src_subject, return_dict=False)

    # 从原始 Blip Diffusion 代码复制,指定目标主题并通过重复增强提示
    def _build_prompt(self, prompts, tgt_subjects, prompt_strength=1.0, prompt_reps=20):
        # 初始化一个空列表,用于存放构建的提示
        rv = []
        # 遍历每个提示和目标主题
        for prompt, tgt_subject in zip(prompts, tgt_subjects):
            # 构建包含目标主题的提示
            prompt = f"a {tgt_subject} {prompt.strip()}"
            # 一个技巧来增强提示的效果
            rv.append(", ".join([prompt] * int(prompt_strength * prompt_reps)))

        # 返回构建的提示列表
        return rv

    # 从 diffusers.pipelines.consistency_models.pipeline_consistency_models.ConsistencyModelPipeline.prepare_latents 复制的代码
    # 准备潜在变量,包含批量大小、通道数、高度和宽度等参数
        def prepare_latents(self, batch_size, num_channels, height, width, dtype, device, generator, latents=None):
            # 定义潜在变量的形状
            shape = (batch_size, num_channels, height, width)
            # 检查生成器是否为列表且长度与批量大小不匹配,抛出值错误
            if isinstance(generator, list) and len(generator) != batch_size:
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 如果未提供潜在变量,则生成新的随机潜在变量
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 将提供的潜在变量转移到指定设备和数据类型
                latents = latents.to(device=device, dtype=dtype)
    
            # 将初始噪声缩放到调度器所需的标准差
            latents = latents * self.scheduler.init_noise_sigma
            # 返回处理后的潜在变量
            return latents
    
        # 编码提示,生成文本嵌入
        def encode_prompt(self, query_embeds, prompt, device=None):
            # 如果未指定设备,则使用执行设备
            device = device or self._execution_device
    
            # 获取最大长度,考虑查询嵌入的上下文
            max_len = self.text_encoder.text_model.config.max_position_embeddings
            max_len -= self.qformer.config.num_query_tokens
    
            # 将提示进行分词处理,并调整为最大长度
            tokenized_prompt = self.tokenizer(
                prompt,
                padding="max_length",
                truncation=True,
                max_length=max_len,
                return_tensors="pt",
            ).to(device)
    
            # 获取查询嵌入的批量大小
            batch_size = query_embeds.shape[0]
            # 为每个样本设置上下文起始位置
            ctx_begin_pos = [self.config.ctx_begin_pos] * batch_size
    
            # 使用文本编码器获取文本嵌入
            text_embeddings = self.text_encoder(
                input_ids=tokenized_prompt.input_ids,
                ctx_embeddings=query_embeds,
                ctx_begin_pos=ctx_begin_pos,
            )[0]
    
            # 返回生成的文本嵌入
            return text_embeddings
    
        # 禁用梯度计算,并替换示例文档字符串
        @torch.no_grad()
        @replace_example_docstring(EXAMPLE_DOC_STRING)
        def __call__(
            # 定义调用方法的输入参数,包括提示、参考图像等
            prompt: List[str],
            reference_image: PIL.Image.Image,
            source_subject_category: List[str],
            target_subject_category: List[str],
            latents: Optional[torch.Tensor] = None,
            guidance_scale: float = 7.5,
            height: int = 512,
            width: int = 512,
            num_inference_steps: int = 50,
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            neg_prompt: Optional[str] = "",
            prompt_strength: float = 1.0,
            prompt_reps: int = 20,
            output_type: Optional[str] = "pil",
            return_dict: bool = True,

.\diffusers\pipelines\blip_diffusion\__init__.py

# 从 dataclasses 模块导入 dataclass 装饰器,用于简化数据类的定义
from dataclasses import dataclass
# 从 typing 模块导入 List、Optional 和 Union 类型注解
from typing import List, Optional, Union

# 导入 numpy 库并简化为 np
import numpy as np
# 导入 PIL 库
import PIL
# 从 PIL 中导入 Image 类,用于处理图像
from PIL import Image

# 从上层模块导入可选依赖检查和可用性函数
from ...utils import OptionalDependencyNotAvailable, is_torch_available, is_transformers_available

# 尝试检查 Transformers 和 Torch 库是否可用
try:
    # 如果任一库不可用,则抛出可选依赖不可用异常
    if not (is_transformers_available() and is_torch_available()):
        raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用异常
except OptionalDependencyNotAvailable:
    # 从 dummy 模块导入 ShapEPipeline 类作为替代
    from ...utils.dummy_torch_and_transformers_objects import ShapEPipeline
# 如果两个库都可用,则导入相关模型和处理器
else:
    # 从 blip_image_processing 模块导入 BlipImageProcessor 类
    from .blip_image_processing import BlipImageProcessor
    # 从 modeling_blip2 模块导入 Blip2QFormerModel 类
    from .modeling_blip2 import Blip2QFormerModel
    # 从 modeling_ctx_clip 模块导入 ContextCLIPTextModel 类
    from .modeling_ctx_clip import ContextCLIPTextModel
    # 从 pipeline_blip_diffusion 模块导入 BlipDiffusionPipeline 类
    from .pipeline_blip_diffusion import BlipDiffusionPipeline

.\diffusers\pipelines\cogvideo\pipeline_cogvideox.py

# 版权声明,表明此文件的所有权和使用许可
# Copyright 2024 The CogVideoX team, Tsinghua University & ZhipuAI and The HuggingFace Team.
# All rights reserved.
#
# 根据 Apache 2.0 许可证许可,使用条款
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 你可以在以下地址获得许可证副本
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律要求或书面协议另有约定,否则此文件以“按原样”方式分发,不提供任何明示或暗示的担保或条件。
# See the License for the specific language governing permissions and
# limitations under the License.

# 导入用于检查函数和方法的模块
import inspect
# 导入数学库以使用数学函数
import math
# 从 typing 模块导入类型注释工具
from typing import Callable, Dict, List, Optional, Tuple, Union

# 导入 PyTorch 库
import torch
# 从 transformers 库导入 T5 编码器模型和分词器
from transformers import T5EncoderModel, T5Tokenizer

# 从相对路径导入回调相关的类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 从相对路径导入模型相关的类
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel
# 从相对路径导入获取 3D 旋转位置嵌入的函数
from ...models.embeddings import get_3d_rotary_pos_embed
# 从相对路径导入扩散管道工具
from ...pipelines.pipeline_utils import DiffusionPipeline
# 从相对路径导入调度器
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler
# 从相对路径导入日志工具和替换示例文档字符串的函数
from ...utils import logging, replace_example_docstring
# 从相对路径导入生成随机张量的工具
from ...utils.torch_utils import randn_tensor
# 从相对路径导入视频处理器
from ...video_processor import VideoProcessor
# 从当前包导入管道输出相关的类
from .pipeline_output import CogVideoXPipelineOutput

# 创建一个日志记录器,用于记录当前模块的信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串,展示如何使用该模块的功能
EXAMPLE_DOC_STRING = """
    Examples:
        ```python
        >>> import torch
        >>> from diffusers import CogVideoXPipeline
        >>> from diffusers.utils import export_to_video

        >>> # 模型: "THUDM/CogVideoX-2b" 或 "THUDM/CogVideoX-5b"
        >>> pipe = CogVideoXPipeline.from_pretrained("THUDM/CogVideoX-2b", torch_dtype=torch.float16).to("cuda")
        >>> prompt = (
        ...     "一只穿着小红外套和小帽子的熊猫,坐在宁静的竹林中的木凳上。"
        ...     "熊猫的毛茸茸的爪子拨动着微型木吉他,演奏出柔和的旋律。附近,几只其他的熊猫好奇地聚集,"
        ...     "有些还在节奏中鼓掌。阳光透过高大的竹子,洒下柔和的光辉,"
        ...     "照亮了这个场景。熊猫的脸上流露出专注和快乐,随着音乐的演奏而展现。"
        ...     "背景中有一条小溪流和生机勃勃的绿叶,增强了这个独特音乐表演的宁静和魔幻气氛。"
        ... )
        >>> video = pipe(prompt=prompt, guidance_scale=6, num_inference_steps=50).frames[0]
        >>> export_to_video(video, "output.mp4", fps=8)
        ```py
"""

# 定义一个函数,用于计算调整大小和裁剪区域,以适应网格
# 该函数类似于 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.get_resize_crop_region_for_grid
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
    # 目标宽度赋值给变量 tw
    tw = tgt_width
    # 目标高度赋值给变量 th
    th = tgt_height
    # 从源图像的尺寸中提取高度和宽度
    h, w = src
    # 计算源图像的高宽比
    r = h / w
    # 检查缩放比例 r 是否大于给定阈值 th 和 tw 的比值
        if r > (th / tw):
            # 如果是,则设定新的高度为 th
            resize_height = th
            # 计算对应的宽度,保持宽高比
            resize_width = int(round(th / h * w))
        else:
            # 否则,设定新的宽度为 tw
            resize_width = tw
            # 计算对应的高度,保持宽高比
            resize_height = int(round(tw / w * h))
    
        # 计算裁剪的上边缘位置,以居中显示
        crop_top = int(round((th - resize_height) / 2.0))
        # 计算裁剪的左边缘位置,以居中显示
        crop_left = int(round((tw - resize_width) / 2.0))
    
        # 返回裁剪区域的坐标,包含左上角和右下角
        return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 中复制
def retrieve_timesteps(
    # 调度器对象,用于获取时间步
    scheduler,
    # 用于生成样本的推理步骤数(可选)
    num_inference_steps: Optional[int] = None,
    # 指定设备(可选),可以是字符串或 torch.device
    device: Optional[Union[str, torch.device]] = None,
    # 自定义时间步列表(可选)
    timesteps: Optional[List[int]] = None,
    # 自定义 sigma 列表(可选)
    sigmas: Optional[List[float]] = None,
    # 额外的关键字参数,传递给调度器的 set_timesteps 方法
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法并从调度器中检索时间步。处理自定义时间步。
    任何关键字参数都将传递给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            用于获取时间步的调度器。
        num_inference_steps (`int`):
            在生成样本时使用的扩散步骤数。如果使用,`timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`, *可选*):
            时间步移动到的设备。如果为 `None`,时间步不会被移动。
        timesteps (`List[int]`, *可选*):
            用于覆盖调度器的时间步间隔策略的自定义时间步。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`, *可选*):
            用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

    返回:
        `Tuple[torch.Tensor, int]`: 一个元组,其中第一个元素是来自调度器的时间步安排,第二个元素是推理步骤的数量。
    """
    # 检查是否同时传递了时间步和 sigma,若是则抛出错误
    if timesteps is not None and sigmas is not None:
        raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
    # 如果传递了自定义时间步
    if timesteps is not None:
        # 检查调度器的 set_timesteps 方法是否接受时间步参数
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受,则抛出错误
        if not accepts_timesteps:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" timestep schedules. Please check whether you are using the correct scheduler."
            )
        # 调用调度器的 set_timesteps 方法,传递自定义时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 从调度器获取时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果传递了自定义 sigma
    elif sigmas is not None:
        # 检查调度器的 set_timesteps 方法是否接受 sigma 参数
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受,则抛出错误
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 调用调度器的 set_timesteps 方法,传递自定义 sigma
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 从调度器获取时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 否则,设置推理步骤数以及相关设备和额外参数
        else:
            scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
            # 获取调度器中的时间步
            timesteps = scheduler.timesteps
        # 返回时间步和推理步骤数
        return timesteps, num_inference_steps
# 定义一个名为 CogVideoXPipeline 的类,继承自 DiffusionPipeline 类
class CogVideoXPipeline(DiffusionPipeline):
    r"""
    使用 CogVideoX 进行文本到视频生成的管道。

    此模型继承自 [`DiffusionPipeline`]。有关库为所有管道实现的通用方法(例如下载或保存,运行在特定设备等),请查看超类文档。

    参数:
        vae ([`AutoencoderKL`]):
            变分自编码器 (VAE) 模型,用于将视频编码和解码为潜在表示。
        text_encoder ([`T5EncoderModel`]):
            冻结的文本编码器。CogVideoX 使用
            [T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);具体使用
            [t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
        tokenizer (`T5Tokenizer`):
            类
            [T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer) 的标记器。
        transformer ([`CogVideoXTransformer3DModel`]):
            一个文本条件的 `CogVideoXTransformer3DModel` 用于去噪编码的视频潜在。
        scheduler ([`SchedulerMixin`]):
            与 `transformer` 结合使用的调度器,用于去噪编码的视频潜在。
    """

    # 定义可选组件的列表,初始化为空
    _optional_components = []
    # 定义模型 CPU 卸载顺序
    model_cpu_offload_seq = "text_encoder->transformer->vae"

    # 定义回调张量输入的列表
    _callback_tensor_inputs = [
        "latents",  # 潜在张量输入
        "prompt_embeds",  # 提示嵌入张量输入
        "negative_prompt_embeds",  # 负提示嵌入张量输入
    ]

    # 初始化函数,接受多个参数以构建管道
    def __init__(
        self,
        tokenizer: T5Tokenizer,  # T5 标记器实例
        text_encoder: T5EncoderModel,  # T5 文本编码器实例
        vae: AutoencoderKLCogVideoX,  # 变分自编码器实例
        transformer: CogVideoXTransformer3DModel,  # CogVideoX 3D 转换器实例
        scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler],  # 调度器实例,支持多种类型
    ):
        # 调用超类的初始化函数
        super().__init__()

        # 注册模块,整合各个组件
        self.register_modules(
            tokenizer=tokenizer, text_encoder=text_encoder, vae=vae, transformer=transformer, scheduler=scheduler
        )
        # 根据 VAE 的配置计算空间缩放因子
        self.vae_scale_factor_spatial = (
            2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
        )
        # 根据 VAE 的配置计算时间缩放因子
        self.vae_scale_factor_temporal = (
            self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
        )

        # 初始化视频处理器,使用空间缩放因子
        self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)

    # 定义获取 T5 提示嵌入的函数
    def _get_t5_prompt_embeds(
        self,
        prompt: Union[str, List[str]] = None,  # 输入的提示,可以是字符串或字符串列表
        num_videos_per_prompt: int = 1,  # 每个提示生成的视频数量,默认为 1
        max_sequence_length: int = 226,  # 最大序列长度,默认为 226
        device: Optional[torch.device] = None,  # 设备类型,默认为 None
        dtype: Optional[torch.dtype] = None,  # 数据类型,默认为 None
    # 处理输入参数,优先使用已设置的设备
        ):
            device = device or self._execution_device
            # 使用已定义的 dtype,默认取文本编码器的 dtype
            dtype = dtype or self.text_encoder.dtype
    
            # 如果输入 prompt 是字符串,则将其转换为列表
            prompt = [prompt] if isinstance(prompt, str) else prompt
            # 获取 prompt 的批大小
            batch_size = len(prompt)
    
            # 使用 tokenizer 处理 prompt,并返回张量格式的输入
            text_inputs = self.tokenizer(
                prompt,
                padding="max_length",  # 填充到最大长度
                max_length=max_sequence_length,  # 最大序列长度
                truncation=True,  # 超出部分截断
                add_special_tokens=True,  # 添加特殊标记
                return_tensors="pt",  # 返回 PyTorch 张量
            )
            # 提取输入 ID
            text_input_ids = text_inputs.input_ids
            # 获取未截断的输入 ID
            untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
    
            # 检查是否需要警告用户输入被截断
            if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
                # 解码被移除的文本并记录警告
                removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
                logger.warning(
                    "The following part of your input was truncated because `max_sequence_length` is set to "
                    f" {max_sequence_length} tokens: {removed_text}"
                )
    
            # 获取文本输入的嵌入表示
            prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
            # 转换嵌入的 dtype 和 device
            prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
    
            # 为每个生成的提示重复文本嵌入,使用适合 MPS 的方法
            _, seq_len, _ = prompt_embeds.shape
            # 重复嵌入以匹配视频生成数量
            prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
            # 调整嵌入的形状以符合批处理
            prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)
    
            # 返回最终的文本嵌入
            return prompt_embeds
    
        # 定义编码提示的函数
        def encode_prompt(
            self,
            # 输入的提示,可以是字符串或字符串列表
            prompt: Union[str, List[str]],
            # 可选的负提示
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 控制分类器自由引导的开关
            do_classifier_free_guidance: bool = True,
            # 每个提示生成的视频数量
            num_videos_per_prompt: int = 1,
            # 可选的提示嵌入
            prompt_embeds: Optional[torch.Tensor] = None,
            # 可选的负提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 最大序列长度
            max_sequence_length: int = 226,
            # 可选的设备
            device: Optional[torch.device] = None,
            # 可选的数据类型
            dtype: Optional[torch.dtype] = None,
        # 准备潜在变量的函数
        def prepare_latents(
            self, batch_size, num_channels_latents, num_frames, height, width, dtype, device, generator, latents=None
    ):
        # 定义形状元组,包含批次大小、帧数、通道数、高度和宽度
        shape = (
            batch_size,
            (num_frames - 1) // self.vae_scale_factor_temporal + 1,  # 计算处理后的帧数
            num_channels_latents,  # 潜在通道数
            height // self.vae_scale_factor_spatial,  # 根据空间缩放因子调整高度
            width // self.vae_scale_factor_spatial,  # 根据空间缩放因子调整宽度
        )
        # 检查生成器是否是列表,且长度与批次大小不匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            # 抛出值错误,说明生成器列表长度与批次大小不匹配
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果潜在变量为 None,则生成随机潜在变量
        if latents is None:
            # 使用给定形状生成随机张量,指定生成器、设备和数据类型
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果潜在变量不为 None,则将其移动到指定设备
            latents = latents.to(device)

        # 按调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    # 解码潜在变量,返回解码后的帧
    def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
        # 重新排列潜在变量的维度,以适应解码器的输入格式
        latents = latents.permute(0, 2, 1, 3, 4)  # [batch_size, num_channels, num_frames, height, width]
        # 根据缩放因子调整潜在变量的值
        latents = 1 / self.vae.config.scaling_factor * latents

        # 解码潜在变量并获取样本帧
        frames = self.vae.decode(latents).sample
        # 返回解码后的帧
        return frames

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的关键字参数,因为并非所有调度器的签名相同
        # eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
        # eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
        # 应在 [0, 1] 范围内

        # 检查调度器的步骤方法是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外步骤参数字典
        extra_step_kwargs = {}
        # 如果接受 eta,则将其添加到额外参数中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器的步骤方法是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,则将其添加到额外参数中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外参数
        return extra_step_kwargs

    # 从 diffusers.pipelines.latte.pipeline_latte.LattePipeline.check_inputs 复制
    def check_inputs(
        self,
        prompt,  # 输入的提示文本
        height,  # 生成图像的高度
        width,   # 生成图像的宽度
        negative_prompt,  # 负提示文本,用于引导生成
        callback_on_step_end_tensor_inputs,  # 每步结束时的回调,用于处理张量输入
        prompt_embeds=None,  # 可选的提示嵌入
        negative_prompt_embeds=None,  # 可选的负提示嵌入
    ):
        # 检查高度和宽度是否能被8整除,若不能则抛出错误
        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # 检查回调输入是否不为空且是否都在已注册的回调输入中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
        ):
            raise ValueError(
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )
        # 检查是否同时提供了提示和提示嵌入,若是则抛出错误
        if prompt is not None and prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        # 检查是否同时未提供提示和提示嵌入,若是则抛出错误
        elif prompt is None and prompt_embeds is None:
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        # 检查提示的类型是否为字符串或列表,若不是则抛出错误
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查是否同时提供了提示和负提示嵌入,若是则抛出错误
        if prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查是否同时提供了负提示和负提示嵌入,若是则抛出错误
        if negative_prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查提示嵌入和负提示嵌入是否都不为空,且它们的形状是否相同,若不同则抛出错误
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

    def fuse_qkv_projections(self) -> None:
        # 启用融合 QKV 投影
        r"""Enables fused QKV projections."""
        self.fusing_transformer = True
        # 调用变换器进行 QKV 投影的融合
        self.transformer.fuse_qkv_projections()

    def unfuse_qkv_projections(self) -> None:
        # 禁用 QKV 投影融合(如果已启用)
        r"""Disable QKV projection fusion if enabled."""
        # 如果没有启用融合,则记录警告信息
        if not self.fusing_transformer:
            logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
        else:
            # 调用变换器进行 QKV 投影的取消融合
            self.transformer.unfuse_qkv_projections()
            # 更新状态为未融合
            self.fusing_transformer = False
    # 准备旋转位置嵌入的函数
        def _prepare_rotary_positional_embeddings(
            self,
            height: int,  # 输入的高度
            width: int,   # 输入的宽度
            num_frames: int,  # 输入的帧数
            device: torch.device,  # 计算设备(如CPU或GPU)
        ) -> Tuple[torch.Tensor, torch.Tensor]:  # 返回两个张量的元组
            # 根据 VAE 缩放因子和变换器的补丁大小计算网格高度
            grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
            # 根据 VAE 缩放因子和变换器的补丁大小计算网格宽度
            grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
            # 计算基础宽度大小
            base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
            # 计算基础高度大小
            base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
    
            # 获取网格的裁剪区域坐标
            grid_crops_coords = get_resize_crop_region_for_grid(
                (grid_height, grid_width), base_size_width, base_size_height
            )
            # 获取三维旋转位置嵌入的余弦和正弦频率
            freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
                embed_dim=self.transformer.config.attention_head_dim,  # 嵌入维度
                crops_coords=grid_crops_coords,  # 裁剪坐标
                grid_size=(grid_height, grid_width),  # 网格大小
                temporal_size=num_frames,  # 时间维度大小
                use_real=True,  # 是否使用实数
            )
    
            # 将余弦频率移动到指定设备
            freqs_cos = freqs_cos.to(device=device)
            # 将正弦频率移动到指定设备
            freqs_sin = freqs_sin.to(device=device)
            # 返回余弦和正弦频率
            return freqs_cos, freqs_sin
    
        # 获取指导缩放比例的属性
        @property
        def guidance_scale(self):
            return self._guidance_scale
    
        # 获取时间步数的属性
        @property
        def num_timesteps(self):
            return self._num_timesteps
    
        # 获取中断状态的属性
        @property
        def interrupt(self):
            return self._interrupt
    
        # 关闭梯度计算并替换文档字符串
        @torch.no_grad()
        @replace_example_docstring(EXAMPLE_DOC_STRING)
        # 定义调用方法
        def __call__(
            self,
            prompt: Optional[Union[str, List[str]]] = None,  # 输入提示
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 负面提示
            height: int = 480,  # 默认高度
            width: int = 720,  # 默认宽度
            num_frames: int = 49,  # 默认帧数
            num_inference_steps: int = 50,  # 默认推理步骤
            timesteps: Optional[List[int]] = None,  # 可选的时间步
            guidance_scale: float = 6,  # 默认指导缩放比例
            use_dynamic_cfg: bool = False,  # 是否使用动态配置
            num_videos_per_prompt: int = 1,  # 每个提示生成的视频数量
            eta: float = 0.0,  # 控制噪声的参数
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,  # 随机数生成器
            latents: Optional[torch.FloatTensor] = None,  # 可选的潜变量
            prompt_embeds: Optional[torch.FloatTensor] = None,  # 提示嵌入
            negative_prompt_embeds: Optional[torch.FloatTensor] = None,  # 负面提示嵌入
            output_type: str = "pil",  # 输出类型
            return_dict: bool = True,  # 是否返回字典格式
            callback_on_step_end: Optional[  # 步骤结束时的回调
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],  # 步骤结束时的张量输入
            max_sequence_length: int = 226,  # 最大序列长度

.\diffusers\pipelines\cogvideo\pipeline_cogvideox_image2video.py

# 版权声明,说明文件的版权归属及使用许可
# Copyright 2024 The CogVideoX team, Tsinghua University & ZhipuAI and The HuggingFace Team.
# All rights reserved.
#
# 授权条款,说明使用该文件的条件
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 允许用户在遵守许可的情况下使用该文件
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非另有协议,否则软件以“原样”方式分发,不提供任何明示或暗示的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以了解权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.

# 导入所需的模块
import inspect  # 用于获取对象的活跃信息
import math  # 提供数学函数
from typing import Callable, Dict, List, Optional, Tuple, Union  # 类型提示的支持

# 导入图像处理库
import PIL  # 图像处理库
import torch  # 深度学习框架
from transformers import T5EncoderModel, T5Tokenizer  # 导入 T5 模型和分词器

# 导入自定义回调和处理器
from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 回调相关
from ...image_processor import PipelineImageInput  # 图像输入处理器
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel  # 模型定义
from ...models.embeddings import get_3d_rotary_pos_embed  # 获取 3D 旋转位置嵌入
from ...pipelines.pipeline_utils import DiffusionPipeline  # 扩散管道
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler  # 调度器
from ...utils import (
    logging,  # 日志工具
    replace_example_docstring,  # 替换示例文档字符串
)
from ...utils.torch_utils import randn_tensor  # 随机张量生成工具
from ...video_processor import VideoProcessor  # 视频处理器
from .pipeline_output import CogVideoXPipelineOutput  # 管道输出定义


# 创建日志记录器
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name


# 示例文档字符串,提供使用示例
EXAMPLE_DOC_STRING = """
    Examples:
        ```py
        >>> import torch
        >>> from diffusers import CogVideoXImageToVideoPipeline
        >>> from diffusers.utils import export_to_video, load_image

        >>> pipe = CogVideoXImageToVideoPipeline.from_pretrained("THUDM/CogVideoX-5b-I2V", torch_dtype=torch.bfloat16)  # 从预训练模型创建管道
        >>> pipe.to("cuda")  # 将管道移动到 GPU

        >>> prompt = "An astronaut hatching from an egg, on the surface of the moon, the darkness and depth of space realised in the background. High quality, ultrarealistic detail and breath-taking movie-like camera shot."  # 定义生成视频的提示
        >>> image = load_image(  # 加载输入图像
        ...     "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/astronaut.jpg"
        ... )
        >>> video = pipe(image, prompt, use_dynamic_cfg=True)  # 生成视频
        >>> export_to_video(video.frames[0], "output.mp4", fps=8)  # 导出生成的视频
        ```py
"""


# 定义调整图像大小和裁剪区域的函数
# 类似于 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.get_resize_crop_region_for_grid
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
    tw = tgt_width  # 目标宽度
    th = tgt_height  # 目标高度
    h, w = src  # 源图像的高度和宽度
    r = h / w  # 计算源图像的纵横比
    # 根据纵横比决定调整后的高度和宽度
    if r > (th / tw):  
        resize_height = th  # 设置调整后的高度为目标高度
        resize_width = int(round(th / h * w))  # 根据比例计算调整后的宽度
    else:
        resize_width = tw  # 设置调整后的宽度为目标宽度
        resize_height = int(round(tw / w * h))  # 根据比例计算调整后的高度

    # 计算裁剪区域的起始位置
    crop_top = int(round((th - resize_height) / 2.0))  # 上边裁剪位置
    crop_left = int(round((tw - resize_width) / 2.0))  # 左边裁剪位置

    # 返回裁剪区域的起始和结束坐标
    return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制而来
def retrieve_timesteps(
    # 调度器对象
    scheduler,
    # 推理步骤的数量(可选)
    num_inference_steps: Optional[int] = None,
    # 设备信息(可选)
    device: Optional[Union[str, torch.device]] = None,
    # 自定义时间步(可选)
    timesteps: Optional[List[int]] = None,
    # 自定义 sigma 值(可选)
    sigmas: Optional[List[float]] = None,
    # 其他关键字参数
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法并在调用后从调度器检索时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            用于获取时间步的调度器。
        num_inference_steps (`int`):
            生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`, *可选*):
            时间步应移动到的设备。如果为 `None`,则不移动时间步。
        timesteps (`List[int]`, *可选*):
            自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递了 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`, *可选*):
            自定义 sigma,用于覆盖调度器的时间步间隔策略。如果传递了 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

    返回:
        `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
    """
    # 检查是否同时传递了自定义时间步和 sigma
    if timesteps is not None and sigmas is not None:
        raise ValueError("只能传递 `timesteps` 或 `sigmas` 中的一个。请选择一个设置自定义值")
    # 如果传递了自定义时间步
    if timesteps is not None:
        # 检查调度器是否接受时间步参数
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持,则抛出错误
        if not accepts_timesteps:
            raise ValueError(
                f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
                f" 时间步计划。请检查您是否使用了正确的调度器。"
            )
        # 调用调度器的 `set_timesteps` 方法设置自定义时间步
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 从调度器获取设置的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果传递了自定义 sigma
    elif sigmas is not None:
        # 检查调度器是否接受 sigma 参数
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不支持,则抛出错误
        if not accept_sigmas:
            raise ValueError(
                f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
                f" sigma 计划。请检查您是否使用了正确的调度器。"
            )
        # 调用调度器的 `set_timesteps` 方法设置自定义 sigma
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 从调度器获取设置的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 如果不是前一个条件的情况,执行以下代码
        else:
            # 设置推理步骤数,并指定设备和其他关键字参数
            scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
            # 获取当前调度器的时间步长
            timesteps = scheduler.timesteps
        # 返回时间步长和推理步骤数
        return timesteps, num_inference_steps
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的代码
def retrieve_latents(
    # 输入的编码器输出,类型为 torch.Tensor
    encoder_output: torch.Tensor, 
    # 可选的随机数生成器,用于采样
    generator: Optional[torch.Generator] = None, 
    # 采样模式,默认为 "sample"
    sample_mode: str = "sample"
):
    # 检查 encoder_output 是否有 latent_dist 属性且模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从 latent_dist 中采样并返回结果
        return encoder_output.latent_dist.sample(generator)
    # 检查 encoder_output 是否有 latent_dist 属性且模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回 latent_dist 的模式
        return encoder_output.latent_dist.mode()
    # 检查 encoder_output 是否有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 返回 encoder_output 中的 latents
        return encoder_output.latents
    # 如果以上条件都不满足,抛出 AttributeError
    else:
        raise AttributeError("Could not access latents of provided encoder_output")


class CogVideoXImageToVideoPipeline(DiffusionPipeline):
    r"""
    使用 CogVideoX 的图像到视频生成的管道。

    该模型继承自 [`DiffusionPipeline`]。请查看父类文档以获取库实现的通用方法
    (例如下载或保存,运行在特定设备等)。

    参数:
        vae ([`AutoencoderKL`]):
            变分自编码器(VAE)模型,用于将视频编码和解码为潜在表示。
        text_encoder ([`T5EncoderModel`]):
            冻结的文本编码器。CogVideoX 使用
            [T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);特别是
            [t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
        tokenizer (`T5Tokenizer`):
            类的分词器
            [T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer)。
        transformer ([`CogVideoXTransformer3DModel`]):
            一个文本条件的 `CogVideoXTransformer3DModel`,用于去噪编码的视频潜在。
        scheduler ([`SchedulerMixin`]):
            一个调度器,结合 `transformer` 用于去噪编码的视频潜在。
    """

    # 可选组件列表,初始化为空
    _optional_components = []
    # 指定 CPU 卸载顺序
    model_cpu_offload_seq = "text_encoder->transformer->vae"

    # 需要回调的张量输入列表
    _callback_tensor_inputs = [
        # 潜在张量
        "latents",
        # 提示嵌入
        "prompt_embeds",
        # 负面提示嵌入
        "negative_prompt_embeds",
    ]

    def __init__(
        # 初始化方法的参数:分词器
        self,
        tokenizer: T5Tokenizer,
        # 文本编码器
        text_encoder: T5EncoderModel,
        # VAE 模型
        vae: AutoencoderKLCogVideoX,
        # 变换模型
        transformer: CogVideoXTransformer3DModel,
        # 调度器
        scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler],
    ):
        # 调用父类的构造函数以初始化基类部分
        super().__init__()

        # 注册各个模块,传入相关参数
        self.register_modules(
            tokenizer=tokenizer,  # 注册分词器
            text_encoder=text_encoder,  # 注册文本编码器
            vae=vae,  # 注册变分自编码器
            transformer=transformer,  # 注册变换器
            scheduler=scheduler,  # 注册调度器
        )
        # 计算空间缩放因子,默认值为8
        self.vae_scale_factor_spatial = (
            2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
        )
        # 获取时间压缩比,如果 VAE 存在则使用其配置
        self.vae_scale_factor_temporal = (
            self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
        )

        # 创建视频处理器,使用空间缩放因子
        self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._get_t5_prompt_embeds 复制而来
    def _get_t5_prompt_embeds(
        self,
        prompt: Union[str, List[str]] = None,  # 输入提示,支持单个字符串或字符串列表
        num_videos_per_prompt: int = 1,  # 每个提示生成的视频数量
        max_sequence_length: int = 226,  # 最大序列长度
        device: Optional[torch.device] = None,  # 设备类型,默认为 None
        dtype: Optional[torch.dtype] = None,  # 数据类型,默认为 None
    ):
        # 如果未指定设备,则使用执行设备
        device = device or self._execution_device
        # 如果未指定数据类型,则使用文本编码器的数据类型
        dtype = dtype or self.text_encoder.dtype

        # 如果提示是字符串,则将其转为列表
        prompt = [prompt] if isinstance(prompt, str) else prompt
        # 获取批处理大小
        batch_size = len(prompt)

        # 对提示进行编码,返回张量,填充到最大长度
        text_inputs = self.tokenizer(
            prompt,
            padding="max_length",  # 填充到最大长度
            max_length=max_sequence_length,  # 最大长度限制
            truncation=True,  # 允许截断
            add_special_tokens=True,  # 添加特殊标记
            return_tensors="pt",  # 返回 PyTorch 张量
        )
        # 获取编码后的输入 ID
        text_input_ids = text_inputs.input_ids
        # 获取未截断的输入 ID
        untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids

        # 如果未截断的 ID 长度大于等于文本输入 ID 长度且两者不相等,则进行警告
        if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
            # 解码被截断的文本部分并记录警告
            removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
            logger.warning(
                "The following part of your input was truncated because `max_sequence_length` is set to "
                f" {max_sequence_length} tokens: {removed_text}"  # 输出截断的提示文本
            )

        # 获取提示的嵌入表示
        prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
        # 转换嵌入为指定的数据类型和设备
        prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)

        # 复制文本嵌入以生成每个提示的视频,使用适合 MPS 的方法
        _, seq_len, _ = prompt_embeds.shape  # 获取嵌入的形状
        prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)  # 重复嵌入以适应视频数量
        prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)  # 变形为合适的形状

        # 返回处理后的提示嵌入
        return prompt_embeds

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.encode_prompt 复制而来
    # 定义一个用于编码提示信息的函数,接受多种参数
        def encode_prompt(
            self,
            prompt: Union[str, List[str]],  # 输入的提示,可以是字符串或字符串列表
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 可选的负提示,类似格式
            do_classifier_free_guidance: bool = True,  # 是否启用无分类器引导
            num_videos_per_prompt: int = 1,  # 每个提示生成的视频数量
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负提示嵌入
            max_sequence_length: int = 226,  # 最大序列长度
            device: Optional[torch.device] = None,  # 指定的设备类型
            dtype: Optional[torch.dtype] = None,  # 指定的数据类型
        def prepare_latents(
            self,
            image: torch.Tensor,  # 输入图像的张量
            batch_size: int = 1,  # 每批次的样本数量
            num_channels_latents: int = 16,  # 潜在变量的通道数
            num_frames: int = 13,  # 视频的帧数
            height: int = 60,  # 图像的高度
            width: int = 90,  # 图像的宽度
            dtype: Optional[torch.dtype] = None,  # 指定的数据类型
            device: Optional[torch.device] = None,  # 指定的设备类型
            generator: Optional[torch.Generator] = None,  # 随机数生成器
            latents: Optional[torch.Tensor] = None,  # 可选的潜在变量张量
        ):
            # 计算有效的帧数,以适应 VAE 的时间缩放因子
            num_frames = (num_frames - 1) // self.vae_scale_factor_temporal + 1
            # 定义张量的形状,包括批次、帧数和空间维度
            shape = (
                batch_size,
                num_frames,
                num_channels_latents,
                height // self.vae_scale_factor_spatial,
                width // self.vae_scale_factor_spatial,
            )
    
            # 检查生成器列表的长度是否与批次大小匹配
            if isinstance(generator, list) and len(generator) != batch_size:
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 在图像张量中插入一个维度,以适配后续处理
            image = image.unsqueeze(2)  # [B, C, F, H, W]
    
            # 如果生成器是列表,逐个处理每个图像
            if isinstance(generator, list):
                image_latents = [
                    retrieve_latents(self.vae.encode(image[i].unsqueeze(0)), generator[i]) for i in range(batch_size)
                ]
            else:
                # 使用单一生成器处理所有图像
                image_latents = [retrieve_latents(self.vae.encode(img.unsqueeze(0)), generator) for img in image]
    
            # 合并图像潜在变量,并调整维度
            image_latents = torch.cat(image_latents, dim=0).to(dtype).permute(0, 2, 1, 3, 4)  # [B, F, C, H, W]
            # 按比例缩放图像潜在变量
            image_latents = self.vae.config.scaling_factor * image_latents
    
            # 定义潜在变量的填充形状
            padding_shape = (
                batch_size,
                num_frames - 1,
                num_channels_latents,
                height // self.vae_scale_factor_spatial,
                width // self.vae_scale_factor_spatial,
            )
            # 创建填充张量
            latent_padding = torch.zeros(padding_shape, device=device, dtype=dtype)
            # 将填充与图像潜在变量合并
            image_latents = torch.cat([image_latents, latent_padding], dim=1)
    
            # 如果没有提供潜在变量,则生成随机潜在变量
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 将提供的潜在变量移动到指定设备
                latents = latents.to(device)
    
            # 按照调度器要求的标准差缩放初始噪声
            latents = latents * self.scheduler.init_noise_sigma
            # 返回潜在变量和图像潜在变量
            return latents, image_latents
    
        # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.decode_latents 复制的代码
    # 解码潜在变量并返回张量格式的帧
        def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
            # 重新排列潜在变量的维度为 [batch_size, num_channels, num_frames, height, width]
            latents = latents.permute(0, 2, 1, 3, 4)  # [batch_size, num_channels, num_frames, height, width]
            # 将潜在变量缩放为 VAE 配置中的因子
            latents = 1 / self.vae.config.scaling_factor * latents
    
            # 解码潜在变量并获取采样帧
            frames = self.vae.decode(latents).sample
            # 返回解码得到的帧
            return frames
    
        # 从 diffusers.pipelines.animatediff.pipeline_animatediff_video2video 导入的方法
        def get_timesteps(self, num_inference_steps, timesteps, strength, device):
            # 根据 init_timestep 获取原始时间步
            init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
    
            # 计算时间步的起始位置,确保不小于零
            t_start = max(num_inference_steps - init_timestep, 0)
            # 根据调度器的顺序获取相关时间步
            timesteps = timesteps[t_start * self.scheduler.order :]
    
            # 返回过滤后的时间步和剩余的推理步骤数
            return timesteps, num_inference_steps - t_start
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 导入的方法
        def prepare_extra_step_kwargs(self, generator, eta):
            # 准备额外的调度器步骤参数,因为不同调度器的参数不尽相同
            # eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
            # eta 对应 DDIM 论文中的 η,范围应在 [0, 1] 之间
    
            # 检查调度器是否接受 eta 参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 初始化额外参数字典
            extra_step_kwargs = {}
            # 如果调度器接受 eta,则将其添加到额外参数中
            if accepts_eta:
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器是否接受 generator 参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 如果调度器接受 generator,则将其添加到额外参数中
            if accepts_generator:
                extra_step_kwargs["generator"] = generator
            # 返回准备好的额外参数
            return extra_step_kwargs
    
        # 检查输入参数的有效性
        def check_inputs(
            self,
            image,
            prompt,
            height,
            width,
            negative_prompt,
            callback_on_step_end_tensor_inputs,
            video=None,
            latents=None,
            prompt_embeds=None,
            negative_prompt_embeds=None,
    ):
        # 检查 image 是否是合法类型:torch.Tensor、PIL.Image.Image 或 list
        if (
            not isinstance(image, torch.Tensor)  # 如果 image 不是 torch.Tensor
            and not isinstance(image, PIL.Image.Image)  # 并且不是 PIL.Image.Image
            and not isinstance(image, list)  # 并且不是 list
        ):
            # 抛出类型错误,提示 image 的类型不正确
            raise ValueError(
                "`image` has to be of type `torch.Tensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is"
                f" {type(image)}"  # 显示当前 image 的类型
            )

        # 检查 height 和 width 是否能被 8 整除
        if height % 8 != 0 or width % 8 != 0:
            # 抛出值错误,提示 height 和 width 不符合要求
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # 检查回调输入是否存在,并且是否全在 _callback_tensor_inputs 中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs  # 确保每个 k 都在 _callback_tensor_inputs 中
        ):
            # 抛出值错误,提示回调输入不符合要求
            raise ValueError(
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )
        
        # 检查 prompt 和 prompt_embeds 是否同时存在
        if prompt is not None and prompt_embeds is not None:
            # 抛出值错误,提示不能同时提供 prompt 和 prompt_embeds
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        # 检查 prompt 和 prompt_embeds 是否都为 None
        elif prompt is None and prompt_embeds is None:
            # 抛出值错误,提示至少提供一个
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        # 检查 prompt 是否为合法类型
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            # 抛出值错误,提示 prompt 的类型不正确
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查 prompt 和 negative_prompt_embeds 是否同时存在
        if prompt is not None and negative_prompt_embeds is not None:
            # 抛出值错误,提示不能同时提供 prompt 和 negative_prompt_embeds
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查 negative_prompt 和 negative_prompt_embeds 是否同时存在
        if negative_prompt is not None and negative_prompt_embeds is not None:
            # 抛出值错误,提示不能同时提供 negative_prompt 和 negative_prompt_embeds
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查 prompt_embeds 和 negative_prompt_embeds 是否都存在
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            # 检查它们的形状是否一致
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                # 抛出值错误,提示它们的形状不匹配
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

        # 检查 video 和 latents 是否同时存在
        if video is not None and latents is not None:
            # 抛出值错误,提示只能提供一个
            raise ValueError("Only one of `video` or `latents` should be provided")

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.fuse_qkv_projections 复制而来
    # 定义一个启用融合 QKV 投影的方法,不返回任何值
    def fuse_qkv_projections(self) -> None:
        # 方法的文档字符串,描述其功能
        r"""Enables fused QKV projections."""
        # 设置属性 fusing_transformer 为 True,表示启用融合
        self.fusing_transformer = True
        # 调用 transformer 对象的方法,进行 QKV 投影融合
        self.transformer.fuse_qkv_projections()

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.unfuse_qkv_projections 复制的方法
    # 定义一个禁用 QKV 投影融合的方法,不返回任何值
    def unfuse_qkv_projections(self) -> None:
        # 方法的文档字符串,描述其功能
        r"""Disable QKV projection fusion if enabled."""
        # 检查属性 fusing_transformer 是否为 False
        if not self.fusing_transformer:
            # 如果没有融合,记录警告日志
            logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
        else:
            # 调用 transformer 对象的方法,解除 QKV 投影的融合
            self.transformer.unfuse_qkv_projections()
            # 设置属性 fusing_transformer 为 False,表示禁用融合
            self.fusing_transformer = False

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._prepare_rotary_positional_embeddings 复制的方法
    # 定义一个准备旋转位置嵌入的方法,返回两个张量
    def _prepare_rotary_positional_embeddings(
        self,
        height: int,
        width: int,
        num_frames: int,
        device: torch.device,
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # 根据输入高度和宽度,计算网格的高度
        grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 根据输入高度和宽度,计算网格的宽度
        grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 计算基础宽度,固定为 720
        base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 计算基础高度,固定为 480
        base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)

        # 获取网格裁剪区域的坐标
        grid_crops_coords = get_resize_crop_region_for_grid(
            (grid_height, grid_width), base_size_width, base_size_height
        )
        # 获取旋转位置嵌入的余弦和正弦频率
        freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
            embed_dim=self.transformer.config.attention_head_dim,
            crops_coords=grid_crops_coords,
            grid_size=(grid_height, grid_width),
            temporal_size=num_frames,
        )

        # 将余弦频率张量转移到指定设备
        freqs_cos = freqs_cos.to(device=device)
        # 将正弦频率张量转移到指定设备
        freqs_sin = freqs_sin.to(device=device)
        # 返回余弦和正弦频率张量
        return freqs_cos, freqs_sin

    # 定义一个属性,获取指导尺度的值
    @property
    def guidance_scale(self):
        # 返回私有属性 _guidance_scale 的值
        return self._guidance_scale

    # 定义一个属性,获取时间步数的值
    @property
    def num_timesteps(self):
        # 返回私有属性 _num_timesteps 的值
        return self._num_timesteps

    # 定义一个属性,获取中断状态的值
    @property
    def interrupt(self):
        # 返回私有属性 _interrupt 的值
        return self._interrupt

    # 采用无梯度上下文装饰器,避免计算梯度
    @torch.no_grad()
    # 替换示例文档字符串的装饰器
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    # 定义可调用方法,允许实例像函数一样被调用
        def __call__(
            self,
            image: PipelineImageInput,  # 输入图像,类型为PipelineImageInput
            prompt: Optional[Union[str, List[str]]] = None,  # 提示文本,可以是字符串或字符串列表
            negative_prompt: Optional[Union[str, List[str]]] = None,  # 负面提示文本,可以是字符串或字符串列表
            height: int = 480,  # 输出图像的高度,默认为480
            width: int = 720,  # 输出图像的宽度,默认为720
            num_frames: int = 49,  # 生成的视频帧数,默认为49
            num_inference_steps: int = 50,  # 推理步骤的数量,默认为50
            timesteps: Optional[List[int]] = None,  # 可选的时间步列表
            guidance_scale: float = 6,  # 引导尺度,影响生成图像的质量,默认为6
            use_dynamic_cfg: bool = False,  # 是否使用动态配置,默认为False
            num_videos_per_prompt: int = 1,  # 每个提示生成的视频数量,默认为1
            eta: float = 0.0,  # 影响采样过程的参数,默认为0.0
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,  # 可选的随机数生成器
            latents: Optional[torch.FloatTensor] = None,  # 可选的潜在变量,类型为浮点张量
            prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的提示嵌入,类型为浮点张量
            negative_prompt_embeds: Optional[torch.FloatTensor] = None,  # 可选的负面提示嵌入,类型为浮点张量
            output_type: str = "pil",  # 输出类型,默认为"PIL"格式
            return_dict: bool = True,  # 是否返回字典格式的结果,默认为True
            callback_on_step_end: Optional[  # 在步骤结束时调用的可选回调函数
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],  # 在步骤结束时的张量输入列表,默认为["latents"]
            max_sequence_length: int = 226,  # 最大序列长度,默认为226

.\diffusers\pipelines\cogvideo\pipeline_cogvideox_video2video.py

# 版权声明,指明版权归 CogVideoX 团队、清华大学、ZhipuAI 和 HuggingFace 团队所有
# 所有权利保留
#
# 根据 Apache License 2.0(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件
# 您可以在以下网址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件是以“原样”基础分发,
# 不提供任何形式的担保或条件,无论是明示或暗示的
# 有关许可证的特定权限和限制,请参见许可证

import inspect  # 导入 inspect 模块,用于获取对象的信息
import math  # 导入 math 模块,提供数学函数
from typing import Callable, Dict, List, Optional, Tuple, Union  # 导入类型提示相关的类

import torch  # 导入 PyTorch 库,进行深度学习
from PIL import Image  # 从 PIL 库导入 Image,用于图像处理
from transformers import T5EncoderModel, T5Tokenizer  # 导入 T5 模型及其分词器

from ...callbacks import MultiPipelineCallbacks, PipelineCallback  # 导入回调相关类
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel  # 导入模型类
from ...models.embeddings import get_3d_rotary_pos_embed  # 导入获取 3D 旋转位置嵌入的函数
from ...pipelines.pipeline_utils import DiffusionPipeline  # 导入扩散管道类
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler  # 导入调度器类
from ...utils import (  # 导入工具模块中的函数
    logging,  # 导入日志记录模块
    replace_example_docstring,  # 导入替换示例文档字符串的函数
)
from ...utils.torch_utils import randn_tensor  # 从工具模块导入生成随机张量的函数
from ...video_processor import VideoProcessor  # 导入视频处理器类
from .pipeline_output import CogVideoXPipelineOutput  # 导入管道输出类

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器

EXAMPLE_DOC_STRING = """  # 示例文档字符串,展示用法
    Examples:  # 示例部分
        ```python  # Python 代码块开始
        >>> import torch  # 导入 PyTorch 库
        >>> from diffusers import CogVideoXDPMScheduler, CogVideoXVideoToVideoPipeline  # 导入特定模块
        >>> from diffusers.utils import export_to_video, load_video  # 导入工具函数

        >>> # 模型:可以选择 "THUDM/CogVideoX-2b" 或 "THUDM/CogVideoX-5b"
        >>> pipe = CogVideoXVideoToVideoPipeline.from_pretrained("THUDM/CogVideoX-5b", torch_dtype=torch.bfloat16)  # 加载预训练管道
        >>> pipe.to("cuda")  # 将管道移动到 GPU
        >>> pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config)  # 配置调度器

        >>> input_video = load_video(  # 加载输入视频
        ...     "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/hiker.mp4"  # 视频链接
        ... )
        >>> prompt = (  # 定义生成视频的提示
        ...     "An astronaut stands triumphantly at the peak of a towering mountain. Panorama of rugged peaks and "
        ...     "valleys. Very futuristic vibe and animated aesthetic. Highlights of purple and golden colors in "
        ...     "the scene. The sky is looks like an animated/cartoonish dream of galaxies, nebulae, stars, planets, "
        ...     "moons, but the remainder of the scene is mostly realistic."
        ... )

        >>> video = pipe(  # 调用管道生成视频
        ...     video=input_video, prompt=prompt, strength=0.8, guidance_scale=6, num_inference_steps=50  # 传入参数
        ... ).frames[0]  # 获取生成的视频帧
        >>> export_to_video(video, "output.mp4", fps=8)  # 导出生成的视频
        ```py  # Python 代码块结束
"""
# 根据源图像的大小和目标宽高计算缩放裁剪区域
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
    # 设置目标宽度和高度
    tw = tgt_width
    th = tgt_height
    # 解构源图像的高度和宽度
    h, w = src
    # 计算源图像的宽高比
    r = h / w
    # 判断源图像的宽高比与目标宽高比的关系
    if r > (th / tw):
        # 如果源图像更高,则以目标高度缩放
        resize_height = th
        # 计算相应的宽度
        resize_width = int(round(th / h * w))
    else:
        # 否则以目标宽度缩放
        resize_width = tw
        # 计算相应的高度
        resize_height = int(round(tw / w * h))

    # 计算裁剪区域的顶部坐标
    crop_top = int(round((th - resize_height) / 2.0))
    # 计算裁剪区域的左侧坐标
    crop_left = int(round((tw - resize_width) / 2.0))

    # 返回裁剪区域的坐标和调整后的尺寸
    return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)


# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制的函数
def retrieve_timesteps(
    scheduler,
    num_inference_steps: Optional[int] = None,
    device: Optional[Union[str, torch.device]] = None,
    timesteps: Optional[List[int]] = None,
    sigmas: Optional[List[float]] = None,
    **kwargs,
):
    """
    调用调度器的 `set_timesteps` 方法并在调用后从调度器检索时间步。处理自定义时间步。任何额外参数将传递给 `scheduler.set_timesteps`。

    参数:
        scheduler (`SchedulerMixin`):
            用于获取时间步的调度器。
        num_inference_steps (`int`):
            生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
        device (`str` 或 `torch.device`, *可选*):
            时间步应移动到的设备。如果为 `None`,则不移动时间步。
        timesteps (`List[int]`, *可选*):
            用于覆盖调度器的时间步间隔策略的自定义时间步。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
        sigmas (`List[float]`, *可选*):
            用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。

    返回:
        `Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
    """
    # 检查是否同时传递了时间步和 sigma
    if timesteps is not None and sigmas is not None:
        raise ValueError("只能传递 `timesteps` 或 `sigmas` 之一。请选择一个设置自定义值")
    # 如果传递了时间步
    if timesteps is not None:
        # 检查调度器的 set_timesteps 方法是否接受时间步
        accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        if not accepts_timesteps:
            raise ValueError(
                f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
                f" 时间步计划。请检查您是否使用了正确的调度器。"
            )
        # 调用调度器的 set_timesteps 方法
        scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
        # 获取调度器中的时间步
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量
        num_inference_steps = len(timesteps)
    # 检查 sigmas 是否不为 None,即是否提供了自定义 sigma 值
    elif sigmas is not None:
        # 检查当前调度器的 set_timesteps 方法是否接受 sigmas 参数
        accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
        # 如果不接受 sigmas,抛出值错误异常,并提示用户
        if not accept_sigmas:
            raise ValueError(
                f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
                f" sigmas schedules. Please check whether you are using the correct scheduler."
            )
        # 设置调度器的时间步长,使用提供的 sigmas、设备和其他参数
        scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
        # 获取当前调度器的时间步长
        timesteps = scheduler.timesteps
        # 计算推理步骤的数量,即时间步长的长度
        num_inference_steps = len(timesteps)
    else:
        # 如果没有提供 sigmas,使用推理步骤的数量设置调度器的时间步长
        scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
        # 获取当前调度器的时间步长
        timesteps = scheduler.timesteps
    # 返回时间步长和推理步骤的数量
    return timesteps, num_inference_steps
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img 模块复制的函数
def retrieve_latents(
    encoder_output: torch.Tensor,  # 输入参数,编码器输出,类型为 torch.Tensor
    generator: Optional[torch.Generator] = None,  # 可选的随机数生成器,用于采样
    sample_mode: str = "sample"  # 采样模式,默认为 "sample"
):
    # 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "sample"
    if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
        # 从 latent_dist 中采样,并返回样本
        return encoder_output.latent_dist.sample(generator)
    # 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "argmax"
    elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
        # 返回 latent_dist 的众数(最可能的值)
        return encoder_output.latent_dist.mode()
    # 检查 encoder_output 是否具有 latents 属性
    elif hasattr(encoder_output, "latents"):
        # 返回 latents 属性
        return encoder_output.latents
    # 如果都不满足,则引发 AttributeError
    else:
        raise AttributeError("Could not access latents of provided encoder_output")


class CogVideoXVideoToVideoPipeline(DiffusionPipeline):
    r""" 
    使用 CogVideoX 的视频到视频生成管道。

    此模型继承自 [`DiffusionPipeline`]。请查看超类文档,以获取库实现的所有管道的通用方法 
    (例如下载或保存,运行在特定设备等)。

    Args:
        vae ([`AutoencoderKL`]): 
            用于将视频编码和解码到潜在表示的变分自编码器(VAE)模型。
        text_encoder ([`T5EncoderModel`]): 
            冻结的文本编码器。CogVideoX 使用 
            [T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);特别是 
            [t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
        tokenizer (`T5Tokenizer`): 
            类的标记器 
            [T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer)。
        transformer ([`CogVideoXTransformer3DModel`]): 
            一个文本条件的 `CogVideoXTransformer3DModel`,用于去噪编码的视频潜在表示。
        scheduler ([`SchedulerMixin`]): 
            用于与 `transformer` 结合使用的调度器,以去噪编码的视频潜在表示。
    """

    _optional_components = []  # 可选组件的列表,当前为空
    model_cpu_offload_seq = "text_encoder->transformer->vae"  # 模型的 CPU 卸载顺序

    _callback_tensor_inputs = [  # 用于回调的张量输入列表
        "latents",  # 潜在表示
        "prompt_embeds",  # 提示嵌入
        "negative_prompt_embeds",  # 负面提示嵌入
    ]

    def __init__(
        self,
        tokenizer: T5Tokenizer,  # 标记器实例
        text_encoder: T5EncoderModel,  # 文本编码器实例
        vae: AutoencoderKLCogVideoX,  # VAE 实例
        transformer: CogVideoXTransformer3DModel,  # 转换器实例
        scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler],  # 调度器实例,可为两种类型之一
    # 初始化父类
        ):
            super().__init__()
    
            # 注册所需模块,包括tokenizer、text_encoder、vae、transformer和scheduler
            self.register_modules(
                tokenizer=tokenizer, text_encoder=text_encoder, vae=vae, transformer=transformer, scheduler=scheduler
            )
            # 计算空间的vae缩放因子,如果vae存在则根据块的输出通道数计算,否则默认为8
            self.vae_scale_factor_spatial = (
                2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
            )
            # 计算时间的vae缩放因子,如果vae存在则使用其时间压缩比,否则默认为4
            self.vae_scale_factor_temporal = (
                self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
            )
    
            # 初始化视频处理器,使用空间的vae缩放因子
            self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)
    
        # 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._get_t5_prompt_embeds复制的方法
        def _get_t5_prompt_embeds(
            self,
            prompt: Union[str, List[str]] = None,
            num_videos_per_prompt: int = 1,
            max_sequence_length: int = 226,
            device: Optional[torch.device] = None,
            dtype: Optional[torch.dtype] = None,
        ):
            # 设置执行设备,若未指定则使用默认执行设备
            device = device or self._execution_device
            # 设置数据类型,若未指定则使用text_encoder的数据类型
            dtype = dtype or self.text_encoder.dtype
    
            # 将输入的prompt转换为列表格式
            prompt = [prompt] if isinstance(prompt, str) else prompt
            # 计算批次大小
            batch_size = len(prompt)
    
            # 使用tokenizer处理文本输入,返回张量格式,并进行填充、截断和添加特殊标记
            text_inputs = self.tokenizer(
                prompt,
                padding="max_length",
                max_length=max_sequence_length,
                truncation=True,
                add_special_tokens=True,
                return_tensors="pt",
            )
            # 获取处理后的输入ID
            text_input_ids = text_inputs.input_ids
            # 获取未截断的ID
            untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
    
            # 检查未截断ID是否大于等于处理后的ID,并且两者不相等
            if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
                # 解码被截断的文本,并发出警告
                removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
                logger.warning(
                    "The following part of your input was truncated because `max_sequence_length` is set to "
                    f" {max_sequence_length} tokens: {removed_text}"
                )
    
            # 通过text_encoder生成prompt的嵌入表示,并将其移动到指定设备
            prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
            # 转换嵌入的dtype和设备
            prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
    
            # 为每个生成的prompt重复文本嵌入,使用适合MPS的方法
            _, seq_len, _ = prompt_embeds.shape
            prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
            # 重新调整嵌入的形状,以适应批次大小和生成数量
            prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)
    
            # 返回最终的文本嵌入
            return prompt_embeds
    
        # 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.encode_prompt复制的方法
    # 定义编码提示的函数,接受多种参数以设置提示信息和生成参数
        def encode_prompt(
            self,
            # 提示内容,可以是字符串或字符串列表
            prompt: Union[str, List[str]],
            # 负提示内容,可选
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 是否进行分类器自由引导
            do_classifier_free_guidance: bool = True,
            # 每个提示生成的视频数量
            num_videos_per_prompt: int = 1,
            # 提示的嵌入向量,可选
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负提示的嵌入向量,可选
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 最大序列长度
            max_sequence_length: int = 226,
            # 设备类型,可选
            device: Optional[torch.device] = None,
            # 数据类型,可选
            dtype: Optional[torch.dtype] = None,
        # 定义准备潜在变量的函数,接受视频和其他参数
        def prepare_latents(
            self,
            # 输入视频,可选
            video: Optional[torch.Tensor] = None,
            # 批次大小
            batch_size: int = 1,
            # 潜在通道数量
            num_channels_latents: int = 16,
            # 视频高度
            height: int = 60,
            # 视频宽度
            width: int = 90,
            # 数据类型,可选
            dtype: Optional[torch.dtype] = None,
            # 设备类型,可选
            device: Optional[torch.device] = None,
            # 随机数生成器,可选
            generator: Optional[torch.Generator] = None,
            # 现有潜在变量,可选
            latents: Optional[torch.Tensor] = None,
            # 时间步长,可选
            timestep: Optional[torch.Tensor] = None,
        ):
            # 计算视频帧数,如果潜在变量未提供则根据视频尺寸计算
            num_frames = (video.size(2) - 1) // self.vae_scale_factor_temporal + 1 if latents is None else latents.size(1)
    
            # 设置潜在变量的形状
            shape = (
                batch_size,
                num_frames,
                num_channels_latents,
                height // self.vae_scale_factor_spatial,
                width // self.vae_scale_factor_spatial,
            )
    
            # 检查生成器列表的长度是否与批次大小匹配
            if isinstance(generator, list) and len(generator) != batch_size:
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
    
            # 如果未提供潜在变量
            if latents is None:
                # 如果生成器是列表,则检查长度
                if isinstance(generator, list):
                    if len(generator) != batch_size:
                        raise ValueError(
                            f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                            f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                        )
    
                    # 为每个视频初始化潜在变量
                    init_latents = [
                        retrieve_latents(self.vae.encode(video[i].unsqueeze(0)), generator[i]) for i in range(batch_size)
                    ]
                else:
                    # 单一生成器情况下为视频初始化潜在变量
                    init_latents = [retrieve_latents(self.vae.encode(vid.unsqueeze(0)), generator) for vid in video]
    
                # 将初始潜在变量连接并转移到目标数据类型,调整维度顺序
                init_latents = torch.cat(init_latents, dim=0).to(dtype).permute(0, 2, 1, 3, 4)  # [B, F, C, H, W]
                # 通过配置的缩放因子调整潜在变量
                init_latents = self.vae.config.scaling_factor * init_latents
    
                # 生成随机噪声
                noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
                # 将噪声添加到初始潜在变量中
                latents = self.scheduler.add_noise(init_latents, noise, timestep)
            else:
                # 如果潜在变量已提供,则将其转移到目标设备
                latents = latents.to(device)
    
            # 根据调度器要求缩放初始噪声
            latents = latents * self.scheduler.init_noise_sigma
            # 返回准备好的潜在变量
            return latents
    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.decode_latents 拷贝而来
    def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
        # 将输入的张量进行维度变换,排列为 [batch_size, num_channels, num_frames, height, width]
        latents = latents.permute(0, 2, 1, 3, 4)  # [batch_size, num_channels, num_frames, height, width]
        # 使用 VAE 的缩放因子对 latents 进行缩放
        latents = 1 / self.vae.config.scaling_factor * latents

        # 解码 latents,生成相应的帧并返回
        frames = self.vae.decode(latents).sample
        # 返回解码后的帧
        return frames

    # 从 diffusers.pipelines.animatediff.pipeline_animatediff_video2video.AnimateDiffVideoToVideoPipeline.get_timesteps 拷贝而来
    def get_timesteps(self, num_inference_steps, timesteps, strength, device):
        # 根据初始时间步计算原始时间步
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

        # 计算开始时间步,确保不小于0
        t_start = max(num_inference_steps - init_timestep, 0)
        # 从时间步数组中截取相关部分
        timesteps = timesteps[t_start * self.scheduler.order :]

        # 返回调整后的时间步和剩余的推理步骤
        return timesteps, num_inference_steps - t_start

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 拷贝而来
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的参数,因为并非所有调度器具有相同的参数签名
        # eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
        # eta 对应于 DDIM 论文中的 η,参考链接:https://arxiv.org/abs/2010.02502
        # eta 应该在 [0, 1] 之间

        # 检查调度器步骤是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        # 如果接受 eta,则将其添加到额外参数字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器步骤是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator,则将其添加到额外参数字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外参数字典
        return extra_step_kwargs

    def check_inputs(
        self,
        prompt,
        height,
        width,
        strength,
        negative_prompt,
        callback_on_step_end_tensor_inputs,
        video=None,
        latents=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
    ):
        # 检查高度和宽度是否能被8整除,如果不能则抛出错误
        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # 检查strength的值是否在0到1之间,如果不在范围内则抛出错误
        if strength < 0 or strength > 1:
            raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")

        # 检查callback_on_step_end_tensor_inputs是否不为None且是否所有元素都在_callback_tensor_inputs中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
        ):
            # 抛出错误,如果callback_on_step_end_tensor_inputs中的某些元素不在_callback_tensor_inputs中
            raise ValueError(
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )
        
        # 检查prompt和prompt_embeds是否同时不为None
        if prompt is not None and prompt_embeds is not None:
            # 抛出错误,提示不能同时提供prompt和prompt_embeds
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        elif prompt is None and prompt_embeds is None:
            # 抛出错误,提示必须提供prompt或prompt_embeds其中之一
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            # 抛出错误,提示prompt的类型必须是str或list
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查prompt和negative_prompt_embeds是否同时不为None
        if prompt is not None and negative_prompt_embeds is not None:
            # 抛出错误,提示不能同时提供prompt和negative_prompt_embeds
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查negative_prompt和negative_prompt_embeds是否同时不为None
        if negative_prompt is not None and negative_prompt_embeds is not None:
            # 抛出错误,提示不能同时提供negative_prompt和negative_prompt_embeds
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查prompt_embeds和negative_prompt_embeds是否都不为None
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            # 检查两个embeds的形状是否相同,如果不同则抛出错误
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

        # 检查video和latents是否同时不为None
        if video is not None and latents is not None:
            # 抛出错误,提示只能提供video或latents其中之一
            raise ValueError("Only one of `video` or `latents` should be provided")

    # 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline复制的方法
    def fuse_qkv_projections(self) -> None:
        # 文档字符串,说明该方法启用融合的QKV投影
        r"""Enables fused QKV projections."""
        # 设置fusing_transformer属性为True,表示启用融合
        self.fusing_transformer = True
        # 调用transformer对象的fuse_qkv_projections方法
        self.transformer.fuse_qkv_projections()
    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.unfuse_qkv_projections 复制的代码
    def unfuse_qkv_projections(self) -> None:
        r"""禁用 QKV 投影融合(如果已启用)。"""
        # 检查是否启用了投影融合
        if not self.fusing_transformer:
            # 如果没有启用,记录警告信息
            logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
        else:
            # 如果启用了,执行解除 QKV 投影融合操作
            self.transformer.unfuse_qkv_projections()
            # 将融合标志设置为 False
            self.fusing_transformer = False

    # 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._prepare_rotary_positional_embeddings 复制的代码
    def _prepare_rotary_positional_embeddings(
        self,
        height: int,  # 输入的高度
        width: int,   # 输入的宽度
        num_frames: int,  # 输入的帧数
        device: torch.device,  # 指定的设备(CPU 或 GPU)
    ) -> Tuple[torch.Tensor, torch.Tensor]:  # 返回的类型为一对张量
        # 计算网格高度,基于输入高度和其他参数
        grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 计算网格宽度,基于输入宽度和其他参数
        grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 计算基础宽度大小
        base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
        # 计算基础高度大小
        base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)

        # 获取用于网格的裁剪区域坐标
        grid_crops_coords = get_resize_crop_region_for_grid(
            (grid_height, grid_width), base_size_width, base_size_height
        )
        # 生成三维旋转位置嵌入的余弦和正弦频率
        freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
            embed_dim=self.transformer.config.attention_head_dim,  # 嵌入维度
            crops_coords=grid_crops_coords,  # 裁剪坐标
            grid_size=(grid_height, grid_width),  # 网格大小
            temporal_size=num_frames,  # 时间序列大小
        )

        # 将余弦频率张量移动到指定设备
        freqs_cos = freqs_cos.to(device=device)
        # 将正弦频率张量移动到指定设备
        freqs_sin = freqs_sin.to(device=device)
        # 返回余弦和正弦频率张量
        return freqs_cos, freqs_sin

    @property
    def guidance_scale(self):
        # 返回指导尺度的值
        return self._guidance_scale

    @property
    def num_timesteps(self):
        # 返回时间步数的值
        return self._num_timesteps

    @property
    def interrupt(self):
        # 返回中断标志的值
        return self._interrupt

    @torch.no_grad()  # 在不计算梯度的上下文中运行
    @replace_example_docstring(EXAMPLE_DOC_STRING)  # 替换示例文档字符串
    # 定义可调用的类方法,允许传入多个参数以处理视频生成
    def __call__(
            # 视频图像列表,默认为 None
            self,
            video: List[Image.Image] = None,
            # 生成视频的提示文本,可以是字符串或字符串列表,默认为 None
            prompt: Optional[Union[str, List[str]]] = None,
            # 生成视频的负面提示文本,可以是字符串或字符串列表,默认为 None
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 输出视频的高度,默认为 480 像素
            height: int = 480,
            # 输出视频的宽度,默认为 720 像素
            width: int = 720,
            # 进行推断的步骤数量,默认为 50 步
            num_inference_steps: int = 50,
            # 选定的时间步列表,默认为 None
            timesteps: Optional[List[int]] = None,
            # 控制强度的浮点数,默认为 0.8
            strength: float = 0.8,
            # 引导缩放比例,默认为 6
            guidance_scale: float = 6,
            # 是否使用动态配置的布尔值,默认为 False
            use_dynamic_cfg: bool = False,
            # 每个提示生成视频的数量,默认为 1
            num_videos_per_prompt: int = 1,
            # eta 参数,默认为 0.0
            eta: float = 0.0,
            # 随机数生成器,可以是 torch.Generator 或其列表,默认为 None
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 可选的潜在张量,默认为 None
            latents: Optional[torch.FloatTensor] = None,
            # 可选的提示嵌入,默认为 None
            prompt_embeds: Optional[torch.FloatTensor] = None,
            # 可选的负面提示嵌入,默认为 None
            negative_prompt_embeds: Optional[torch.FloatTensor] = None,
            # 输出类型,默认为 "pil"
            output_type: str = "pil",
            # 是否返回字典格式,默认为 True
            return_dict: bool = True,
            # 步骤结束时调用的回调函数,可以是单一或多个回调,默认为 None
            callback_on_step_end: Optional[
                Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
            ] = None,
            # 用于步骤结束回调的张量输入列表,默认为 ["latents"]
            callback_on_step_end_tensor_inputs: List[str] = ["latents"],
            # 最大序列长度,默认为 226
            max_sequence_length: int = 226,

.\diffusers\pipelines\cogvideo\pipeline_output.py

# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass

# 导入 PyTorch 库
import torch

# 从 diffusers.utils 导入 BaseOutput 基类
from diffusers.utils import BaseOutput


# 定义 CogVideoXPipelineOutput 类,继承自 BaseOutput
@dataclass
class CogVideoXPipelineOutput(BaseOutput):
    r"""
    CogVideo 管道的输出类。

    参数:
        frames (`torch.Tensor`, `np.ndarray`, 或 List[List[PIL.Image.Image]]):
            视频输出的列表 - 可以是长度为 `batch_size` 的嵌套列表,每个子列表包含
            去噪的 PIL 图像序列,长度为 `num_frames`。也可以是形状为
            `(batch_size, num_frames, channels, height, width)` 的 NumPy 数组或 Torch 张量。
    """

    # 定义输出的帧,类型为 torch.Tensor
    frames: torch.Tensor

.\diffusers\pipelines\cogvideo\__init__.py

# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING

# 从相对路径的 utils 模块导入所需的工具函数和常量
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 用于标识慢速导入的标志
    OptionalDependencyNotAvailable,  # 用于处理可选依赖项未安装的异常
    _LazyModule,  # 用于创建懒加载模块
    get_objects_from_module,  # 从模块中获取对象的函数
    is_torch_available,  # 检查 PyTorch 是否可用的函数
    is_transformers_available,  # 检查 Transformers 是否可用的函数
)

# 创建一个空字典,用于存放虚拟对象
_dummy_objects = {}
# 创建一个空字典,用于存放模块的导入结构
_import_structure = {}

# 尝试检查依赖项的可用性
try:
    # 如果 Transformers 或 PyTorch 不可用,抛出异常
    if not (is_transformers_available() and is_torch_available()):
        raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未可用的异常
except OptionalDependencyNotAvailable:
    # 从 utils 模块导入虚拟对象,避免导入失败
    from ...utils import dummy_torch_and_transformers_objects  # noqa F403

    # 更新虚拟对象字典,填充虚拟对象
    _dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果依赖项可用,更新导入结构
else:
    _import_structure["pipeline_cogvideox"] = ["CogVideoXPipeline"]  # 添加 CogVideoXPipeline
    _import_structure["pipeline_cogvideox_image2video"] = ["CogVideoXImageToVideoPipeline"]  # 添加图像转视频管道
    _import_structure["pipeline_cogvideox_video2video"] = ["CogVideoXVideoToVideoPipeline"]  # 添加视频转视频管道

# 根据类型检查或慢速导入的标志进行判断
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 尝试检查依赖项的可用性
    try:
        # 如果 Transformers 或 PyTorch 不可用,抛出异常
        if not (is_transformers_available() and is_torch_available()):
            raise OptionalDependencyNotAvailable()

    # 捕获可选依赖项未可用的异常
    except OptionalDependencyNotAvailable:
        # 从虚拟对象模块导入所有对象
        from ...utils.dummy_torch_and_transformers_objects import *
    else:
        # 导入实际的管道类
        from .pipeline_cogvideox import CogVideoXPipeline
        from .pipeline_cogvideox_image2video import CogVideoXImageToVideoPipeline
        from .pipeline_cogvideox_video2video import CogVideoXVideoToVideoPipeline

# 否则处理懒加载模块
else:
    import sys

    # 用 _LazyModule 创建当前模块的懒加载实例
    sys.modules[__name__] = _LazyModule(
        __name__,
        globals()["__file__"],
        _import_structure,  # 传递导入结构
        module_spec=__spec__,  # 传递模块规格
    )

    # 将虚拟对象添加到当前模块
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\consistency_models\pipeline_consistency_models.py

# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获得许可证的副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,否则根据许可证分发的软件是“按原样”提供的,
# 不附带任何形式的保证或条件,无论是明示还是暗示。
# 请参阅许可证以获取有关权限和限制的具体信息。

from typing import Callable, List, Optional, Union  # 导入类型注解,用于函数签名和变量类型标注

import torch  # 导入 PyTorch 库,供后续深度学习模型使用

from ...models import UNet2DModel  # 从模型模块导入 UNet2DModel 类
from ...schedulers import CMStochasticIterativeScheduler  # 从调度模块导入 CMStochasticIterativeScheduler 类
from ...utils import (  # 从工具模块导入多个工具函数和类
    logging,
    replace_example_docstring,
)
from ...utils.torch_utils import randn_tensor  # 从 PyTorch 工具模块导入 randn_tensor 函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput  # 从管道工具模块导入 DiffusionPipeline 和 ImagePipelineOutput 类

logger = logging.get_logger(__name__)  # 创建日志记录器,记录当前模块的日志信息

EXAMPLE_DOC_STRING = """  # 示例文档字符串,提供用法示例
    Examples:
        ```py
        >>> import torch  # 导入 PyTorch 库

        >>> from diffusers import ConsistencyModelPipeline  # 从 diffusers 导入 ConsistencyModelPipeline 类

        >>> device = "cuda"  # 设置设备为 CUDA(GPU)
        >>> # 加载 cd_imagenet64_l2 检查点。
        >>> model_id_or_path = "openai/diffusers-cd_imagenet64_l2"  # 指定模型 ID 或路径
        >>> pipe = ConsistencyModelPipeline.from_pretrained(model_id_or_path, torch_dtype=torch.float16)  # 从预训练模型加载管道
        >>> pipe.to(device)  # 将管道移到指定设备上

        >>> # 单步采样
        >>> image = pipe(num_inference_steps=1).images[0]  # 使用单步推理生成图像
        >>> image.save("cd_imagenet64_l2_onestep_sample.png")  # 保存生成的图像

        >>> # 单步采样,类条件图像生成
        >>> # ImageNet-64 类标签 145 对应于国王企鹅
        >>> image = pipe(num_inference_steps=1, class_labels=145).images[0]  # 生成特定类的图像
        >>> image.save("cd_imagenet64_l2_onestep_sample_penguin.png")  # 保存生成的图像

        >>> # 多步采样,类条件图像生成
        >>> # 可以显式指定时间步,以下时间步来自原始 GitHub 仓库:
        >>> # https://github.com/openai/consistency_models/blob/main/scripts/launch.sh#L77
        >>> image = pipe(num_inference_steps=None, timesteps=[22, 0], class_labels=145).images[0]  # 生成特定类的多步图像
        >>> image.save("cd_imagenet64_l2_multistep_sample_penguin.png")  # 保存生成的图像
        ```py
"""

class ConsistencyModelPipeline(DiffusionPipeline):  # 定义 ConsistencyModelPipeline 类,继承自 DiffusionPipeline
    r"""  # 类的文档字符串,描述其功能
    Pipeline for unconditional or class-conditional image generation.  # 描述此管道用于无条件或类条件图像生成

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods  # 说明此模型继承自 DiffusionPipeline,并建议查看超类文档以了解通用方法
    implemented for all pipelines (downloading, saving, running on a particular device, etc.).  # 说明所实现的方法,包括下载、保存和在特定设备上运行等
    # 函数参数说明
    Args:
        unet ([`UNet2DModel`]):  # 传入一个 UNet2DModel 对象,用于对编码后的图像潜变量去噪。
            A `UNet2DModel` to denoise the encoded image latents.
        scheduler ([`SchedulerMixin`]):  # 传入一个调度器,结合 unet 用于去噪,当前仅与 CMStochasticIterativeScheduler 兼容。
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. Currently only
            compatible with [`CMStochasticIterativeScheduler`].
    """

    # 定义模型的 CPU 卸载顺序
    model_cpu_offload_seq = "unet"

    # 构造函数,初始化类的实例
    def __init__(self, unet: UNet2DModel, scheduler: CMStochasticIterativeScheduler) -> None:
        # 调用父类的构造函数
        super().__init__()

        # 注册 unet 和 scheduler 模块
        self.register_modules(
            unet=unet,
            scheduler=scheduler,
        )

        # 初始化安全检查器为 None
        self.safety_checker = None

    # 准备潜变量的函数
    def prepare_latents(self, batch_size, num_channels, height, width, dtype, device, generator, latents=None):
        # 定义潜变量的形状
        shape = (batch_size, num_channels, height, width)
        # 检查生成器列表的长度是否与批量大小匹配
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果没有传入潜变量,则生成随机潜变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 将潜变量移动到指定设备并转换数据类型
            latents = latents.to(device=device, dtype=dtype)

        # 根据调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜变量
        return latents

    # 后处理图像的函数,遵循 diffusers.VaeImageProcessor.postprocess
    def postprocess_image(self, sample: torch.Tensor, output_type: str = "pil"):
        # 检查输出类型是否合法
        if output_type not in ["pt", "np", "pil"]:
            raise ValueError(
                f"output_type={output_type} is not supported. Make sure to choose one of ['pt', 'np', or 'pil']"
            )

        # 等同于 diffusers.VaeImageProcessor.denormalize
        sample = (sample / 2 + 0.5).clamp(0, 1)  # 将样本值归一化到 [0, 1] 范围内
        if output_type == "pt":  # 如果输出类型为 pt,直接返回样本
            return sample

        # 等同于 diffusers.VaeImageProcessor.pt_to_numpy
        sample = sample.cpu().permute(0, 2, 3, 1).numpy()  # 转换为 NumPy 数组
        if output_type == "np":  # 如果输出类型为 np,返回样本
            return sample

        # 如果输出类型必须为 'pil'
        sample = self.numpy_to_pil(sample)  # 将 NumPy 数组转换为 PIL 图像
        return sample  # 返回最终的图像
    # 准备类别标签,根据给定的批大小和设备,将类别标签转换为张量
    def prepare_class_labels(self, batch_size, device, class_labels=None):
        # 检查 UNet 配置中类别嵌入的数量是否不为 None
        if self.unet.config.num_class_embeds is not None:
            # 如果 class_labels 是一个列表,将其转换为整型张量
            if isinstance(class_labels, list):
                class_labels = torch.tensor(class_labels, dtype=torch.int)
            # 如果 class_labels 是一个整数,确保批大小为 1,并将其转换为张量
            elif isinstance(class_labels, int):
                assert batch_size == 1, "Batch size must be 1 if classes is an int"
                class_labels = torch.tensor([class_labels], dtype=torch.int)
            # 如果 class_labels 为 None,随机生成 batch_size 个类别标签
            elif class_labels is None:
                # 随机生成 batch_size 类别标签
                # TODO: 应该在这里使用生成器吗?randn_tensor 的整数等价物未在 ...utils 中公开
                class_labels = torch.randint(0, self.unet.config.num_class_embeds, size=(batch_size,))
            # 将类别标签移动到指定的设备上
            class_labels = class_labels.to(device)
        else:
            # 如果没有类别嵌入,类别标签设为 None
            class_labels = None
        # 返回处理后的类别标签
        return class_labels

    # 检查输入参数的有效性
    def check_inputs(self, num_inference_steps, timesteps, latents, batch_size, img_size, callback_steps):
        # 确保提供了 num_inference_steps 或 timesteps 其中之一
        if num_inference_steps is None and timesteps is None:
            raise ValueError("Exactly one of `num_inference_steps` or `timesteps` must be supplied.")

        # 如果同时提供了 num_inference_steps 和 timesteps,发出警告
        if num_inference_steps is not None and timesteps is not None:
            logger.warning(
                f"Both `num_inference_steps`: {num_inference_steps} and `timesteps`: {timesteps} are supplied;"
                " `timesteps` will be used over `num_inference_steps`."
            )

        # 如果 latents 不为 None,检查其形状是否符合预期
        if latents is not None:
            expected_shape = (batch_size, 3, img_size, img_size)
            # 如果 latents 的形状不符合预期,则抛出错误
            if latents.shape != expected_shape:
                raise ValueError(f"The shape of latents is {latents.shape} but is expected to be {expected_shape}.")

        # 检查 callback_steps 是否为正整数
        if (callback_steps is None) or (
            callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
        ):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )

    # 装饰器,禁止梯度计算,提供调用示例文档字符串
    @torch.no_grad()
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    def __call__(
        # 定义默认参数和类型注解,初始化调用方法
        batch_size: int = 1,
        class_labels: Optional[Union[torch.Tensor, List[int], int]] = None,
        num_inference_steps: int = 1,
        timesteps: List[int] = None,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        latents: Optional[torch.Tensor] = None,
        output_type: Optional[str] = "pil",
        return_dict: bool = True,
        callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
        callback_steps: int = 1,

.\diffusers\pipelines\consistency_models\__init__.py

# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING

# 从上级模块导入相关工具
from ...utils import (
    # 导入常量 DIFFUSERS_SLOW_IMPORT
    DIFFUSERS_SLOW_IMPORT,
    # 导入延迟加载模块的工具类
    _LazyModule,
)

# 定义要导入的模块结构,包含 'pipeline_consistency_models' 模块及其内容
_import_structure = {
    "pipeline_consistency_models": ["ConsistencyModelPipeline"],
}

# 判断是否处于类型检查阶段或需要慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 从 'pipeline_consistency_models' 模块导入 ConsistencyModelPipeline 类
    from .pipeline_consistency_models import ConsistencyModelPipeline

# 否则执行以下代码
else:
    import sys

    # 使用 _LazyModule 创建一个延迟加载模块,并将其赋值给当前模块名
    sys.modules[__name__] = _LazyModule(
        # 当前模块名
        __name__,
        # 当前模块文件路径
        globals()["__file__"],
        # 导入结构
        _import_structure,
        # 模块规范
        module_spec=__spec__,
    )

.\diffusers\pipelines\controlnet\multicontrolnet.py

# 导入操作系统模块
import os
# 从 typing 模块导入类型注解
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

# 导入 PyTorch 库
import torch
# 从 torch 模块导入神经网络相关功能
from torch import nn

# 从上级目录导入 ControlNetModel 和 ControlNetOutput
from ...models.controlnet import ControlNetModel, ControlNetOutput
# 从上级目录导入 ModelMixin 类
from ...models.modeling_utils import ModelMixin
# 从上级目录导入 logging 工具
from ...utils import logging

# 创建一个日志记录器,使用当前模块的名称
logger = logging.get_logger(__name__)

# 定义 MultiControlNetModel 类,继承自 ModelMixin
class MultiControlNetModel(ModelMixin):
    r"""
    多个 `ControlNetModel` 的包装类,用于 Multi-ControlNet

    该模块是多个 `ControlNetModel` 实例的包装器。`forward()` API 设计为与 `ControlNetModel` 兼容。

    参数:
        controlnets (`List[ControlNetModel]`):
            在去噪过程中为 unet 提供额外的条件。必须将多个 `ControlNetModel` 作为列表设置。
    """

    # 初始化方法,接收一个 ControlNetModel 的列表或元组
    def __init__(self, controlnets: Union[List[ControlNetModel], Tuple[ControlNetModel]]):
        # 调用父类的初始化方法
        super().__init__()
        # 将控制网模型保存到模块列表中
        self.nets = nn.ModuleList(controlnets)

    # 前向传播方法,处理输入数据
    def forward(
        self,
        sample: torch.Tensor,  # 输入样本
        timestep: Union[torch.Tensor, float, int],  # 当前时间步
        encoder_hidden_states: torch.Tensor,  # 编码器的隐藏状态
        controlnet_cond: List[torch.tensor],  # 控制网络的条件
        conditioning_scale: List[float],  # 条件缩放因子
        class_labels: Optional[torch.Tensor] = None,  # 可选的类标签
        timestep_cond: Optional[torch.Tensor] = None,  # 可选的时间步条件
        attention_mask: Optional[torch.Tensor] = None,  # 可选的注意力掩码
        added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None,  # 可选的附加条件参数
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,  # 可选的交叉注意力参数
        guess_mode: bool = False,  # 是否使用猜测模式
        return_dict: bool = True,  # 是否返回字典格式的输出
    ) -> Union[ControlNetOutput, Tuple]:  # 返回类型可以是 ControlNetOutput 或元组
        # 遍历每个控制网络条件和缩放因子
        for i, (image, scale, controlnet) in enumerate(zip(controlnet_cond, conditioning_scale, self.nets)):
            # 调用控制网络进行前向传播,获取下采样和中间样本
            down_samples, mid_sample = controlnet(
                sample=sample,  # 输入样本
                timestep=timestep,  # 当前时间步
                encoder_hidden_states=encoder_hidden_states,  # 编码器隐藏状态
                controlnet_cond=image,  # 控制网络条件
                conditioning_scale=scale,  # 条件缩放
                class_labels=class_labels,  # 类标签
                timestep_cond=timestep_cond,  # 时间步条件
                attention_mask=attention_mask,  # 注意力掩码
                added_cond_kwargs=added_cond_kwargs,  # 附加条件参数
                cross_attention_kwargs=cross_attention_kwargs,  # 交叉注意力参数
                guess_mode=guess_mode,  # 猜测模式
                return_dict=return_dict,  # 返回格式
            )

            # 合并样本
            if i == 0:  # 如果是第一个控制网络
                # 初始化下采样和中间样本
                down_block_res_samples, mid_block_res_sample = down_samples, mid_sample
            else:  # 如果不是第一个控制网络
                # 将当前下采样样本与之前的样本合并
                down_block_res_samples = [
                    samples_prev + samples_curr  # 累加下采样样本
                    for samples_prev, samples_curr in zip(down_block_res_samples, down_samples)
                ]
                # 累加中间样本
                mid_block_res_sample += mid_sample

        # 返回合并后的下采样样本和中间样本
        return down_block_res_samples, mid_block_res_sample
    # 定义一个方法,用于将模型及其配置文件保存到指定目录
    def save_pretrained(
        self,  # 代表类实例
        save_directory: Union[str, os.PathLike],  # 保存目录,可以是字符串或路径类型
        is_main_process: bool = True,  # 指示当前进程是否为主进程,默认为 True
        save_function: Callable = None,  # 自定义保存函数,默认为 None
        safe_serialization: bool = True,  # 是否使用安全序列化方式,默认为 True
        variant: Optional[str] = None,  # 可选参数,指定保存权重的格式
    ):
        """
        保存模型及其配置文件到指定目录,以便可以使用
        `[`~pipelines.controlnet.MultiControlNetModel.from_pretrained`]` 类方法重新加载。

        参数:
            save_directory (`str` 或 `os.PathLike`):
                要保存的目录,如果不存在则会创建。
            is_main_process (`bool`, *可选*, 默认为 `True`):
                调用此方法的进程是否为主进程,适用于分布式训练,避免竞争条件。
            save_function (`Callable`):
                用于保存状态字典的函数,适用于分布式训练。
            safe_serialization (`bool`, *可选*, 默认为 `True`):
                是否使用 `safetensors` 保存模型,或使用传统的 PyTorch 方法。
            variant (`str`, *可选*):
                如果指定,权重将以 pytorch_model.<variant>.bin 格式保存。
        """
        # 遍历网络模型列表,并获取索引
        for idx, controlnet in enumerate(self.nets):
            # 确定后缀名,如果是第一个模型则无后缀
            suffix = "" if idx == 0 else f"_{idx}"
            # 调用每个控制网络的保存方法,传入相关参数
            controlnet.save_pretrained(
                save_directory + suffix,  # 结合目录和后缀形成完整的保存路径
                is_main_process=is_main_process,  # 传递主进程标识
                save_function=save_function,  # 传递保存函数
                safe_serialization=safe_serialization,  # 传递序列化方式
                variant=variant,  # 传递权重格式
            )