diffusers-源码解析-五十三-

龙哥盟 / 2024-11-09 / 原文

diffusers 源码解析(五十三)

.\diffusers\pipelines\t2i_adapter\__init__.py

# 从 typing 模块导入 TYPE_CHECKING,用于类型检查的特殊常量
from typing import TYPE_CHECKING

# 从 utils 模块导入一系列工具函数和常量
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 慢速导入的标志
    OptionalDependencyNotAvailable,  # 可选依赖项不可用的异常
    _LazyModule,  # 懒加载模块的类
    get_objects_from_module,  # 从模块中获取对象的函数
    is_torch_available,  # 检查 PyTorch 是否可用的函数
    is_transformers_available,  # 检查 Transformers 是否可用的函数
)

# 初始化一个空字典,用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典,用于存储导入结构
_import_structure = {}

# 尝试检查是否可用必要的依赖项
try:
    if not (is_transformers_available() and is_torch_available()):  # 如果 Transformers 和 PyTorch 不可用
        raise OptionalDependencyNotAvailable()  # 抛出可选依赖项不可用异常
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
    # 从 utils 模块导入虚拟对象,避免抛出错误
    from ...utils import dummy_torch_and_transformers_objects  # noqa F403

    # 更新虚拟对象字典,获取虚拟对象
    _dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
    # 如果依赖项可用,更新导入结构,添加 StableDiffusionAdapterPipeline
    _import_structure["pipeline_stable_diffusion_adapter"] = ["StableDiffusionAdapterPipeline"]
    # 更新导入结构,添加 StableDiffusionXLAdapterPipeline
    _import_structure["pipeline_stable_diffusion_xl_adapter"] = ["StableDiffusionXLAdapterPipeline"]

# 如果正在进行类型检查或慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    try:
        # 检查是否可用必要的依赖项
        if not (is_transformers_available() and is_torch_available()):  # 如果 Transformers 和 PyTorch 不可用
            raise OptionalDependencyNotAvailable()  # 抛出可选依赖项不可用异常
    # 捕获可选依赖项不可用的异常
    except OptionalDependencyNotAvailable:
        # 从虚拟对象模块导入所有内容,避免抛出错误
        from ...utils.dummy_torch_and_transformers_objects import *  # noqa F403
    else:
        # 如果依赖项可用,导入 StableDiffusionAdapterPipeline
        from .pipeline_stable_diffusion_adapter import StableDiffusionAdapterPipeline
        # 导入 StableDiffusionXLAdapterPipeline
        from .pipeline_stable_diffusion_xl_adapter import StableDiffusionXLAdapterPipeline
else:
    # 如果不是类型检查或慢速导入,使用懒加载模块
    import sys

    # 将当前模块替换为懒加载模块
    sys.modules[__name__] = _LazyModule(
        __name__,  # 模块名称
        globals()["__file__"],  # 模块文件路径
        _import_structure,  # 模块导入结构
        module_spec=__spec__,  # 模块规格
    )
    # 遍历虚拟对象字典,设置当前模块的属性
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)  # 设置属性

.\diffusers\pipelines\text_to_video_synthesis\pipeline_output.py

# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入 List 和 Union 类型
from typing import List, Union

# 导入 numpy 库并简写为 np
import numpy as np
# 导入 PIL 库,用于图像处理
import PIL
# 导入 PyTorch 库
import torch

# 从上级模块的 utils 导入 BaseOutput 类
from ...utils import (
    BaseOutput,
)

# 定义一个数据类 TextToVideoSDPipelineOutput 继承自 BaseOutput
@dataclass
class TextToVideoSDPipelineOutput(BaseOutput):
    """
     文本到视频管道的输出类。

    参数:
         frames (`torch.Tensor`, `np.ndarray` 或 List[List[PIL.Image.Image]]):
             视频输出的列表 - 可以是长度为 `batch_size` 的嵌套列表,
             每个子列表包含去噪后的
             PIL 图像序列,长度为 `num_frames`。也可以是形状为
    `(batch_size, num_frames, channels, height, width)` 的 NumPy 数组或 Torch 张量
    """

    # 定义一个 frames 属性,可以是 Torch 张量、NumPy 数组或嵌套的 PIL 图像列表
    frames: Union[torch.Tensor, np.ndarray, List[List[PIL.Image.Image]]]

.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_synth.py

# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# 该文件受 Apache 2.0 许可证保护,使用需遵循该许可证
# you may not use this file except in compliance with the License.
# 只有在遵循许可证的情况下,才能使用此文件
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
# 可在此处获取许可证的副本
#
# Unless required by applicable law or agreed to in writing, software
# 除非法律要求或书面协议,软件
# distributed under the License is distributed on an "AS IS" BASIS,
# 按 "原样" 方式分发,不提供任何保证或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 不提供任何明示或暗示的保证或条件
# See the License for the specific language governing permissions and
# 查看许可证以了解特定权限和
# limitations under the License.
# 限制条件

import inspect  # 导入 inspect 模块,用于获取对象的成员信息
from typing import Any, Callable, Dict, List, Optional, Union  # 导入类型注释相关的类

import torch  # 导入 PyTorch 库,进行张量操作和深度学习
from transformers import CLIPTextModel, CLIPTokenizer  # 从 transformers 库导入 CLIP 模型和分词器

from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 导入加载器混合类
from ...models import AutoencoderKL, UNet3DConditionModel  # 导入模型类
from ...models.lora import adjust_lora_scale_text_encoder  # 导入调整 LoRA 权重的函数
from ...schedulers import KarrasDiffusionSchedulers  # 导入调度器类
from ...utils import (  # 导入工具函数和常量
    USE_PEFT_BACKEND,  # 表示是否使用 PEFT 后端的常量
    deprecate,  # 用于标记过时的功能
    logging,  # 导入日志记录模块
    replace_example_docstring,  # 用于替换示例文档字符串的函数
    scale_lora_layers,  # 用于缩放 LoRA 层的函数
    unscale_lora_layers,  # 用于还原 LoRA 层的函数
)
from ...utils.torch_utils import randn_tensor  # 导入用于生成随机张量的函数
from ...video_processor import VideoProcessor  # 导入视频处理器类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 导入扩散管道和稳定扩散混合类
from . import TextToVideoSDPipelineOutput  # 导入文本到视频生成管道的输出类


logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器,便于记录日志信息

EXAMPLE_DOC_STRING = """  # 定义一个示例文档字符串,提供使用示例
    Examples:
        ```py  # Python 代码块标记
        >>> import torch  # 导入 PyTorch 库
        >>> from diffusers import TextToVideoSDPipeline  # 从 diffusers 库导入文本到视频管道
        >>> from diffusers.utils import export_to_video  # 导入视频导出工具

        >>> pipe = TextToVideoSDPipeline.from_pretrained(  # 从预训练模型加载管道
        ...     "damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16, variant="fp16"  # 指定模型名称和数据类型
        ... )
        >>> pipe.enable_model_cpu_offload()  # 启用模型的 CPU 内存卸载

        >>> prompt = "Spiderman is surfing"  # 定义生成视频的提示语
        >>> video_frames = pipe(prompt).frames[0]  # 生成视频帧
        >>> video_path = export_to_video(video_frames)  # 导出生成的视频帧为视频文件
        >>> video_path  # 输出视频文件路径
        ```py
"""


class TextToVideoSDPipeline(  # 定义文本到视频生成管道类
    DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin  # 继承多个混合类
):
    r"""  # 开始类的文档字符串
    Pipeline for text-to-video generation.  # 文档说明:用于文本到视频生成的管道

    This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
    该模型继承自 `DiffusionPipeline`。查阅超类文档以获取通用方法的说明
    implemented for all pipelines (downloading, saving, running on a particular device, etc.).
    实现了所有管道的通用方法(下载、保存、在特定设备上运行等)

    The pipeline also inherits the following loading methods:
    该管道还继承了以下加载方法:
        - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
        - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
        - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
    # 文档字符串,描述构造函数的参数及其用途
    Args:
        vae ([`AutoencoderKL`]):
            # 变分自编码器模型,用于将图像编码和解码为潜在表示
        text_encoder ([`CLIPTextModel`]):
            # 冻结的文本编码器模型,用于处理文本数据
        tokenizer (`CLIPTokenizer`):
            # 用于将文本标记化的 CLIP 标记器
        unet ([`UNet3DConditionModel`]):
            # UNet 模型,用于对编码的视频潜在空间进行去噪
        scheduler ([`SchedulerMixin`]):
            # 与 UNet 结合使用的调度器,用于去噪编码的图像潜在空间,可以是多种调度器之一
    """

    # 定义模型的计算顺序,依次为文本编码器、UNet 和 VAE
    model_cpu_offload_seq = "text_encoder->unet->vae"

    # 初始化函数,接受多个模型参数
    def __init__(
        self,
        vae: AutoencoderKL,
        text_encoder: CLIPTextModel,
        tokenizer: CLIPTokenizer,
        unet: UNet3DConditionModel,
        scheduler: KarrasDiffusionSchedulers,
    ):
        # 调用父类的初始化方法
        super().__init__()

        # 注册各个模型模块
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            scheduler=scheduler,
        )
        # 计算 VAE 的缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 初始化视频处理器,不进行缩放
        self.video_processor = VideoProcessor(do_resize=False, vae_scale_factor=self.vae_scale_factor)

    # 定义私有方法,用于编码输入的提示
    def _encode_prompt(
        self,
        prompt,
        device,
        num_images_per_prompt,
        do_classifier_free_guidance,
        negative_prompt=None,
        prompt_embeds: Optional[torch.Tensor] = None,
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        lora_scale: Optional[float] = None,
        **kwargs,
    ):
        # 定义弃用消息,提醒用户此方法将在未来版本中删除
        deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
        # 调用弃用警告函数
        deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)

        # 调用编码提示的方法,获取提示的嵌入元组
        prompt_embeds_tuple = self.encode_prompt(
            prompt=prompt,
            device=device,
            num_images_per_prompt=num_images_per_prompt,
            do_classifier_free_guidance=do_classifier_free_guidance,
            negative_prompt=negative_prompt,
            prompt_embeds=prompt_embeds,
            negative_prompt_embeds=negative_prompt_embeds,
            lora_scale=lora_scale,
            **kwargs,
        )

        # 连接嵌入以用于向后兼容
        prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])

        # 返回最终的提示嵌入
        return prompt_embeds

    # 从稳定扩散管道中复制的方法,编码提示
    # 编码提示信息和相关参数以生成图像
    def encode_prompt(
            self,
            prompt,  # 用户输入的提示文本
            device,  # 设备类型(如CPU或GPU)
            num_images_per_prompt,  # 每个提示生成的图像数量
            do_classifier_free_guidance,  # 是否使用无分类器引导
            negative_prompt=None,  # 可选的负面提示文本
            prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
            negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
            lora_scale: Optional[float] = None,  # 可选的LoRA缩放因子
            clip_skip: Optional[int] = None,  # 可选的剪切跳过参数
        # 解码潜在空间中的数据以生成图像
        def decode_latents(self, latents):
            # 使用缩放因子调整潜在数据
            latents = 1 / self.vae.config.scaling_factor * latents
    
            # 获取批次大小、通道数、帧数、高度和宽度
            batch_size, channels, num_frames, height, width = latents.shape
            # 重新排列潜在数据的维度,以适应解码器输入
            latents = latents.permute(0, 2, 1, 3, 4).reshape(batch_size * num_frames, channels, height, width)
    
            # 解码潜在数据以生成图像
            image = self.vae.decode(latents).sample
            # 重塑图像以形成视频格式
            video = image[None, :].reshape((batch_size, num_frames, -1) + image.shape[2:]).permute(0, 2, 1, 3, 4)
            # 转换数据类型为float32,以确保与bfloat16兼容
            video = video.float()
            # 返回生成的视频数据
            return video
    
        # 准备额外的步骤参数,以供调度器使用
        def prepare_extra_step_kwargs(self, generator, eta):
            # 为调度器步骤准备额外的关键字参数
            # eta (η) 仅在DDIMScheduler中使用,其他调度器将忽略
            # eta对应于DDIM论文中的η,取值范围应在[0, 1]之间
    
            # 检查调度器是否接受eta参数
            accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
            extra_step_kwargs = {}
            # 如果接受eta,添加到额外参数中
            if accepts_eta:
                extra_step_kwargs["eta"] = eta
    
            # 检查调度器是否接受generator参数
            accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
            # 如果接受generator,添加到额外参数中
            if accepts_generator:
                extra_step_kwargs["generator"] = generator
            # 返回准备好的额外参数
            return extra_step_kwargs
    
        # 检查输入参数的有效性
        def check_inputs(
            self,
            prompt,  # 用户输入的提示文本
            height,  # 生成图像的高度
            width,  # 生成图像的宽度
            callback_steps,  # 回调步骤的数量
            negative_prompt=None,  # 可选的负面提示文本
            prompt_embeds=None,  # 可选的提示嵌入
            negative_prompt_embeds=None,  # 可选的负面提示嵌入
            callback_on_step_end_tensor_inputs=None,  # 可选的回调张量输入
    # 结束前面的代码块
        ):
            # 检查高度和宽度是否能被8整除,若不能则抛出异常
            if height % 8 != 0 or width % 8 != 0:
                raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
    
            # 检查回调步数是否为正整数,若不是则抛出异常
            if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
                raise ValueError(
                    f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                    f" {type(callback_steps)}."
                )
            # 检查回调结束时的张量输入是否都在指定的输入中,若不在则抛出异常
            if callback_on_step_end_tensor_inputs is not None and not all(
                k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
            ):
                raise ValueError(
                    f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
                )
    
            # 检查是否同时提供了prompt和prompt_embeds,若是则抛出异常
            if prompt is not None and prompt_embeds is not None:
                raise ValueError(
                    f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                    " only forward one of the two."
                )
            # 检查是否都没有提供prompt和prompt_embeds,若是则抛出异常
            elif prompt is None and prompt_embeds is None:
                raise ValueError(
                    "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
                )
            # 检查prompt的类型是否为字符串或列表,若不是则抛出异常
            elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
                raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
    
            # 检查是否同时提供了negative_prompt和negative_prompt_embeds,若是则抛出异常
            if negative_prompt is not None and negative_prompt_embeds is not None:
                raise ValueError(
                    f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                    f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
                )
    
            # 检查prompt_embeds和negative_prompt_embeds的形状是否一致,若不一致则抛出异常
            if prompt_embeds is not None and negative_prompt_embeds is not None:
                if prompt_embeds.shape != negative_prompt_embeds.shape:
                    raise ValueError(
                        "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                        f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                        f" {negative_prompt_embeds.shape}."
                    )
    
        # 定义准备潜变量的方法,接受多个参数
        def prepare_latents(
            self, batch_size, num_channels_latents, num_frames, height, width, dtype, device, generator, latents=None
    ):
        # 定义形状元组,包括批次大小、通道数、帧数、高度和宽度的缩放
        shape = (
            batch_size,
            num_channels_latents,
            num_frames,
            height // self.vae_scale_factor,
            width // self.vae_scale_factor,
        )
        # 检查生成器是否为列表且其长度与批次大小不匹配,若不匹配则抛出错误
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果潜在变量为空,则生成新的随机张量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 将给定的潜在变量转移到指定设备
            latents = latents.to(device)

        # 根据调度器要求的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    # 禁用梯度计算以提高性能
    @torch.no_grad()
    # 替换示例文档字符串
    @replace_example_docstring(EXAMPLE_DOC_STRING)
    def __call__(
        # 定义输入参数,包括提示、图像尺寸、帧数等
        self,
        prompt: Union[str, List[str]] = None,
        height: Optional[int] = None,
        width: Optional[int] = None,
        num_frames: int = 16,
        num_inference_steps: int = 50,
        guidance_scale: float = 9.0,
        negative_prompt: Optional[Union[str, List[str]]] = None,
        eta: float = 0.0,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        latents: Optional[torch.Tensor] = None,
        prompt_embeds: Optional[torch.Tensor] = None,
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        output_type: Optional[str] = "np",
        return_dict: bool = True,
        callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
        callback_steps: int = 1,
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        clip_skip: Optional[int] = None,

.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_synth_img2img.py

# 版权声明,标明此文件的版权归 HuggingFace 团队所有
# 
# 根据 Apache License 2.0 版本(“许可证”)授权;
# 除非遵守该许可证,否则您不能使用此文件。
# 您可以在以下网址获取许可证的副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律要求或书面同意,按照许可证分发的软件按“原样”提供,
# 不附带任何形式的明示或暗示的担保或条件。
# 有关特定语言管理权限和
# 限制的更多信息,请参见许可证。

# 导入 inspect 模块以进行对象检查和获取信息
import inspect
# 从 typing 模块导入多种类型,用于类型提示
from typing import Any, Callable, Dict, List, Optional, Union

# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 torch 库,用于深度学习相关操作
import torch
# 从 transformers 库导入 CLIP 模型和标记器
from transformers import CLIPTextModel, CLIPTokenizer

# 从 loaders 模块导入所需的混合类
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 从 models 模块导入自动编码器和条件 UNet 模型
from ...models import AutoencoderKL, UNet3DConditionModel
# 从 lora 模块导入调整 LORA 规模的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 从 schedulers 模块导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从 utils 模块导入多个实用工具函数和常量
from ...utils import (
    USE_PEFT_BACKEND,  # 用于指定是否使用 PEFT 后端
    deprecate,  # 用于标记已弃用的函数
    logging,  # 用于日志记录
    replace_example_docstring,  # 用于替换示例文档字符串
    scale_lora_layers,  # 用于缩放 LORA 层
    unscale_lora_layers,  # 用于反缩放 LORA 层
)
# 从 torch_utils 模块导入生成随机张量的函数
from ...utils.torch_utils import randn_tensor
# 导入视频处理器类
from ...video_processor import VideoProcessor
# 从 pipeline_utils 模块导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入文本到视频的稳定扩散管道输出类
from . import TextToVideoSDPipelineOutput

# 获取当前模块的日志记录器实例
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 示例文档字符串的模板,可能用于生成文档
EXAMPLE_DOC_STRING = """
Examples:
    ```py
    # 导入所需的库
    >>> import torch
    >>> from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
    >>> from diffusers.utils import export_to_video

    # 从预训练模型加载扩散管道,并指定数据类型为 float16
    >>> pipe = DiffusionPipeline.from_pretrained("cerspense/zeroscope_v2_576w", torch_dtype=torch.float16)
    # 设置调度器为多步 DPM 解决器,使用管道当前调度器的配置
    >>> pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
    # 将模型移动到 CUDA 设备以加速计算
    >>> pipe.to("cuda")

    # 定义生成视频的提示文本
    >>> prompt = "spiderman running in the desert"
    # 使用提示生成视频帧,设置推理步数、高度、宽度和帧数
    >>> video_frames = pipe(prompt, num_inference_steps=40, height=320, width=576, num_frames=24).frames[0]
    # 导出低分辨率视频并保存到指定路径
    >>> # safe low-res video
    >>> video_path = export_to_video(video_frames, output_video_path="./video_576_spiderman.mp4")

    # 将文本到图像的模型移回 CPU
    >>> # let's offload the text-to-image model
    >>> pipe.to("cpu")

    # 重新加载图像到图像的模型
    >>> # and load the image-to-image model
    >>> pipe = DiffusionPipeline.from_pretrained(
    ...     "cerspense/zeroscope_v2_XL", torch_dtype=torch.float16, revision="refs/pr/15"
    ... )
    # 设置调度器为多步 DPM 解决器,使用新的调度器配置
    >>> pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
    # 启用模型的 CPU 卸载,以节省内存
    >>> pipe.enable_model_cpu_offload()

    # VAE 占用大量内存,确保以切片模式运行以降低内存使用
    >>> # The VAE consumes A LOT of memory, let's make sure we run it in sliced mode
    >>> pipe.vae.enable_slicing()

    # 将视频帧上采样到更高的分辨率
    >>> # now let's upscale it
    >>> video = [Image.fromarray(frame).resize((1024, 576)) for frame in video_frames]

    # 使用生成的提示和上采样的视频帧进行去噪处理
    >>> # and denoise it
    >>> video_frames = pipe(prompt, video=video, strength=0.6).frames[0]
    # 导出去噪后的视频并保存到指定路径
    >>> video_path = export_to_video(video_frames, output_video_path="./video_1024_spiderman.mp4")
    # 返回最终视频的路径
    >>> video_path

导入所需的模块和类型

"""

从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数

def retrieve_latents(
# 输入参数:编码器输出,类型为 torch.Tensor
encoder_output: torch.Tensor,
# 可选的随机数生成器
generator: Optional[torch.Generator] = None,
# 采样模式,默认为 "sample"
sample_mode: str = "sample"
):
# 检查编码器输出是否具有 "latent_dist" 属性,并且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 返回从潜在分布中采样的结果
return encoder_output.latent_dist.sample(generator)
# 检查编码器输出是否具有 "latent_dist" 属性,并且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回潜在分布的众数
return encoder_output.latent_dist.mode()
# 检查编码器输出是否具有 "latents" 属性
elif hasattr(encoder_output, "latents"):
# 返回编码器输出的潜在值
return encoder_output.latents
# 如果都不满足,抛出属性错误
else:
raise AttributeError("Could not access latents of provided encoder_output")

定义 VideoToVideoSDPipeline 类,继承多个混入类

class VideoToVideoSDPipeline(
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin
):
r"""
用于文本引导的视频到视频生成的管道。

此模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。

该管道还继承了以下加载方法:
    - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反演嵌入
    - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
    - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重

参数:
    vae ([`AutoencoderKL`]):
        变分自编码器(VAE)模型,用于将视频编码和解码为潜在表示。
    text_encoder ([`CLIPTextModel`]):
        冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
    tokenizer (`CLIPTokenizer`):
        [`~transformers.CLIPTokenizer`] 用于对文本进行标记化。
    unet ([`UNet3DConditionModel`]):
        [`UNet3DConditionModel`] 用于去噪编码的视频潜在值。
    scheduler ([`SchedulerMixin`]):
        与 `unet` 结合使用的调度器,用于去噪编码的图像潜在值。可以是
        [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
"""

# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"

# 初始化函数,设置各个组件
def __init__(
    # 输入参数:变分自编码器
    vae: AutoencoderKL,
    # 输入参数:文本编码器
    text_encoder: CLIPTextModel,
    # 输入参数:标记器
    tokenizer: CLIPTokenizer,
    # 输入参数:去噪模型
    unet: UNet3DConditionModel,
    # 输入参数:调度器
    scheduler: KarrasDiffusionSchedulers,
):
    # 调用父类的初始化方法
    super().__init__()

    # 注册模型模块
    self.register_modules(
        vae=vae,
        text_encoder=text_encoder,
        tokenizer=tokenizer,
        unet=unet,
        scheduler=scheduler,
    )
    # 计算 VAE 的缩放因子
    self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
    # 初始化视频处理器,禁用调整大小并使用 VAE 缩放因子
    self.video_processor = VideoProcessor(do_resize=False, vae_scale_factor=self.vae_scale_factor)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的
def _encode_prompt(
    self,  # 当前实例的引用
    prompt,  # 要编码的提示文本
    device,  # 设备类型(例如 CPU 或 GPU)
    num_images_per_prompt,  # 每个提示生成的图像数量
    do_classifier_free_guidance,  # 是否进行无分类器引导
    negative_prompt=None,  # 可选的负面提示文本
    prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
    negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
    lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
    **kwargs,  # 其他可选参数
):
    # 定义弃用消息,告知用户此函数在未来版本中将被移除
    deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
    # 调用 deprecate 函数,记录弃用信息
    deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)

    # 调用 encode_prompt 方法获取提示嵌入的元组
    prompt_embeds_tuple = self.encode_prompt(
        prompt=prompt,  # 传递提示文本
        device=device,  # 传递设备信息
        num_images_per_prompt=num_images_per_prompt,  # 传递每个提示的图像数量
        do_classifier_free_guidance=do_classifier_free_guidance,  # 传递无分类器引导信息
        negative_prompt=negative_prompt,  # 传递负面提示文本
        prompt_embeds=prompt_embeds,  # 传递提示嵌入
        negative_prompt_embeds=negative_prompt_embeds,  # 传递负面提示嵌入
        lora_scale=lora_scale,  # 传递 Lora 缩放因子
        **kwargs,  # 传递其他参数
    )

    # 将提示嵌入元组中的两个元素连接起来以兼容旧版本
    prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])

    # 返回连接后的提示嵌入
    return prompt_embeds

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的
def encode_prompt(
    self,  # 当前实例的引用
    prompt,  # 要编码的提示文本
    device,  # 设备类型(例如 CPU 或 GPU)
    num_images_per_prompt,  # 每个提示生成的图像数量
    do_classifier_free_guidance,  # 是否进行无分类器引导
    negative_prompt=None,  # 可选的负面提示文本
    prompt_embeds: Optional[torch.Tensor] = None,  # 可选的提示嵌入
    negative_prompt_embeds: Optional[torch.Tensor] = None,  # 可选的负面提示嵌入
    lora_scale: Optional[float] = None,  # 可选的 Lora 缩放因子
    clip_skip: Optional[int] = None,  # 可选的剪辑跳过参数
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_synth.TextToVideoSDPipeline.decode_latents 复制的
def decode_latents(self, latents):  # 解码潜在变量的方法
    # 按照 VAE 配置的缩放因子调整潜在变量
    latents = 1 / self.vae.config.scaling_factor * latents

    # 获取潜在变量的形状,分别表示批量大小、通道数、帧数、高度和宽度
    batch_size, channels, num_frames, height, width = latents.shape
    # 调整潜在变量的维度,以便于解码
    latents = latents.permute(0, 2, 1, 3, 4).reshape(batch_size * num_frames, channels, height, width)

    # 使用 VAE 解码潜在变量并获取样本
    image = self.vae.decode(latents).sample
    # 将图像调整为视频格式,包含批量大小和帧数
    video = image[None, :].reshape((batch_size, num_frames, -1) + image.shape[2:]).permute(0, 2, 1, 3, 4)
    # 始终将视频转换为 float32 格式,以确保兼容性
    video = video.float()
    # 返回处理后的 video
    return video

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的
# 定义准备额外参数的函数,用于调度器步骤
    def prepare_extra_step_kwargs(self, generator, eta):
        # 为调度器步骤准备额外的关键字参数,因不同调度器的参数签名不同
        # eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
        # eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
        # eta 应在 [0, 1] 范围内

        # 检查调度器的步骤函数是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 创建一个空字典以存储额外参数
        extra_step_kwargs = {}
        # 如果调度器接受 eta,则将其添加到额外参数字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器的步骤函数是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果调度器接受 generator,则将其添加到额外参数字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回包含额外参数的字典
        return extra_step_kwargs

    # 定义检查输入参数的函数
    def check_inputs(
        self,
        prompt,  # 输入提示文本
        strength,  # 强度参数
        callback_steps,  # 回调步骤数
        negative_prompt=None,  # 可选的负面提示文本
        prompt_embeds=None,  # 可选的提示嵌入
        negative_prompt_embeds=None,  # 可选的负面提示嵌入
        callback_on_step_end_tensor_inputs=None,  # 可选的步骤结束时的张量输入
):
    # 检查 strength 的值是否在有效范围 [0.0, 1.0] 之间
    if strength < 0 or strength > 1:
        # 如果不在范围内,抛出 ValueError 异常
        raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")

    # 检查 callback_steps 是否为正整数
    if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
        # 如果不是,抛出 ValueError 异常
        raise ValueError(
            f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
            f" {type(callback_steps)}."
        )

    # 检查 callback_on_step_end_tensor_inputs 是否在 _callback_tensor_inputs 中
    if callback_on_step_end_tensor_inputs is not None and not all(
        k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
    ):
        # 如果有不在的项,抛出 ValueError 异常
        raise ValueError(
            f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
        )
    # 检查是否同时提供了 prompt 和 prompt_embeds
    if prompt is not None and prompt_embeds is not None:
        # 如果同时提供,抛出 ValueError 异常
        raise ValueError(
            f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
            " only forward one of the two."
        )
    # 检查 prompt 和 prompt_embeds 是否都未定义
    elif prompt is None and prompt_embeds is None:
        # 如果都未定义,抛出 ValueError 异常
        raise ValueError(
            "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
        )
    # 检查 prompt 的类型是否正确
    elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
        # 如果类型不正确,抛出 ValueError 异常
        raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

    # 检查是否同时提供了 negative_prompt 和 negative_prompt_embeds
    if negative_prompt is not None and negative_prompt_embeds is not None:
        # 如果同时提供,抛出 ValueError 异常
        raise ValueError(
            f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
            f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
        )

    # 检查 prompt_embeds 和 negative_prompt_embeds 的形状是否相同
    if prompt_embeds is not None and negative_prompt_embeds is not None:
        if prompt_embeds.shape != negative_prompt_embeds.shape:
            # 如果形状不匹配,抛出 ValueError 异常
            raise ValueError(
                "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                f" {negative_prompt_embeds.shape}."
            )

# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img 中复制的函数
# 定义获取时间步长的方法,输入为推理步数、强度和设备
def get_timesteps(self, num_inference_steps, strength, device):
    # 计算初始时间步长,取最小值以确保不超过总推理步数
    init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

    # 计算开始的时间步长,确保不小于0
    t_start = max(num_inference_steps - init_timestep, 0)
    # 从调度器获取时间步长,从t_start开始,按照调度器的顺序切片
    timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
    # 如果调度器有设置开始索引的方法,则设置为t_start
    if hasattr(self.scheduler, "set_begin_index"):
        self.scheduler.set_begin_index(t_start * self.scheduler.order)

    # 返回时间步长和有效推理步数
    return timesteps, num_inference_steps - t_start

# 定义准备潜在表示的方法,输入为视频、时间步长、批大小、数据类型和设备
def prepare_latents(self, video, timestep, batch_size, dtype, device, generator=None):
    # 将视频数据转移到指定的设备并转换数据类型
    video = video.to(device=device, dtype=dtype)

    # 改变视频的形状从 (b, c, f, h, w) 到 (b * f, c, w, h)
    bsz, channel, frames, width, height = video.shape
    video = video.permute(0, 2, 1, 3, 4).reshape(bsz * frames, channel, width, height)

    # 如果视频有4个通道,则初始化潜在表示为视频本身
    if video.shape[1] == 4:
        init_latents = video
    else:
        # 检查生成器是否是列表且其长度与批大小不匹配,抛出异常
        if isinstance(generator, list) and len(generator) != batch_size:
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )
        # 如果生成器是列表,则逐个处理视频并获取潜在表示
        elif isinstance(generator, list):
            init_latents = [
                retrieve_latents(self.vae.encode(video[i : i + 1]), generator=generator[i])
                for i in range(batch_size)
            ]
            # 将所有潜在表示沿着第0维度连接
            init_latents = torch.cat(init_latents, dim=0)
        else:
            # 如果生成器不是列表,直接处理整个视频获取潜在表示
            init_latents = retrieve_latents(self.vae.encode(video), generator=generator)

        # 对潜在表示进行缩放
        init_latents = self.vae.config.scaling_factor * init_latents

    # 检查批大小是否大于潜在表示的数量且不可整除,抛出异常
    if batch_size > init_latents.shape[0] and batch_size % init_latents.shape[0] != 0:
        raise ValueError(
            f"Cannot duplicate `video` of batch size {init_latents.shape[0]} to {batch_size} text prompts."
        )
    else:
        # 将潜在表示扩展为批大小
        init_latents = torch.cat([init_latents], dim=0)

    # 获取潜在表示的形状
    shape = init_latents.shape
    # 生成噪声张量,用于添加到潜在表示中
    noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)

    # 获取潜在表示,添加噪声
    init_latents = self.scheduler.add_noise(init_latents, noise, timestep)
    latents = init_latents

    # 重新调整潜在表示的形状以匹配原始视频的维度
    latents = latents[None, :].reshape((bsz, frames, latents.shape[1]) + latents.shape[2:]).permute(0, 2, 1, 3, 4)

    # 返回处理后的潜在表示
    return latents

# 关闭梯度计算以节省内存
@torch.no_grad()
# 替换示例文档字符串为指定文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的方法,支持多种输入参数
    def __call__(
        # 输入提示,可以是单个字符串或字符串列表
        self,
        prompt: Union[str, List[str]] = None,
        # 输入视频,可以是图像数组列表或张量
        video: Union[List[np.ndarray], torch.Tensor] = None,
        # 强度参数,默认值为0.6,控制某些特性
        strength: float = 0.6,
        # 推理步骤数量,默认为50,影响生成质量
        num_inference_steps: int = 50,
        # 引导尺度,默认值为15.0,控制生成的引导强度
        guidance_scale: float = 15.0,
        # 负面提示,可以是单个字符串或字符串列表,用于排除不想要的内容
        negative_prompt: Optional[Union[str, List[str]]] = None,
        # 额外参数,控制噪声,默认值为0.0
        eta: float = 0.0,
        # 随机生成器,可选,控制随机性
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 潜在张量,可选,影响生成过程
        latents: Optional[torch.Tensor] = None,
        # 提示嵌入张量,可选,直接使用预处理过的提示
        prompt_embeds: Optional[torch.Tensor] = None,
        # 负面提示嵌入张量,可选,直接使用预处理过的负面提示
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 输出类型,可选,默认为"np",指定返回格式
        output_type: Optional[str] = "np",
        # 是否返回字典格式,默认为True
        return_dict: bool = True,
        # 可选的回调函数,允许在特定步骤执行自定义逻辑
        callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
        # 回调执行的步骤间隔,默认为1
        callback_steps: int = 1,
        # 交叉注意力的额外参数,可选,用于细化生成过程
        cross_attention_kwargs: Optional[Dict[str, Any]] = None,
        # 可选,跳过的剪辑步骤
        clip_skip: Optional[int] = None,

# `.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_zero.py`

```py
# 导入所需模块和库
import copy  # 复制对象的模块
import inspect  # 检查对象的模块
from dataclasses import dataclass  # 数据类装饰器
from typing import Callable, List, Optional, Union  # 类型提示相关

import numpy as np  # 数组处理库
import PIL.Image  # 图像处理库
import torch  # 深度学习框架
import torch.nn.functional as F  # 神经网络功能模块
from torch.nn.functional import grid_sample  # 网格采样函数
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer  # 处理CLIP模型的类

from ...image_processor import VaeImageProcessor  # VAE图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin  # 加载器混合类
from ...models import AutoencoderKL, UNet2DConditionModel  # 模型类
from ...models.lora import adjust_lora_scale_text_encoder  # 调整LoRA比例的函数
from ...schedulers import KarrasDiffusionSchedulers  # Karras扩散调度器
from ...utils import USE_PEFT_BACKEND, BaseOutput, logging, scale_lora_layers, unscale_lora_layers  # 实用工具
from ...utils.torch_utils import randn_tensor  # 随机张量生成工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 管道工具类
from ..stable_diffusion import StableDiffusionSafetyChecker  # 稳定扩散安全检查器

logger = logging.get_logger(__name__)  # 创建模块日志记录器,禁用pylint命名检查


def rearrange_0(tensor, f):
    F, C, H, W = tensor.size()  # 解构张量维度
    tensor = torch.permute(torch.reshape(tensor, (F // f, f, C, H, W)), (0, 2, 1, 3, 4))  # 重排列和调整张量形状
    return tensor  # 返回调整后的张量


def rearrange_1(tensor):
    B, C, F, H, W = tensor.size()  # 解构张量维度
    return torch.reshape(torch.permute(tensor, (0, 2, 1, 3, 4)), (B * F, C, H, W))  # 重排列并调整形状


def rearrange_3(tensor, f):
    F, D, C = tensor.size()  # 解构张量维度
    return torch.reshape(tensor, (F // f, f, D, C))  # 调整张量形状


def rearrange_4(tensor):
    B, F, D, C = tensor.size()  # 解构张量维度
    return torch.reshape(tensor, (B * F, D, C))  # 调整张量形状


class CrossFrameAttnProcessor:
    """
    跨帧注意力处理器。每帧关注第一帧。

    Args:
        batch_size: 表示实际批大小的数字,而不是帧数。
            例如,使用单个提示和num_images_per_prompt=1调用unet时,batch_size应等于
            2,因为使用了无分类器引导。
    """

    def __init__(self, batch_size=2):
        self.batch_size = batch_size  # 初始化批大小
    # 定义可调用方法,处理注意力机制
    def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
        # 获取批次大小、序列长度及其维度
        batch_size, sequence_length, _ = hidden_states.shape
        # 准备注意力掩码,以适应批次和序列长度
        attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
        # 将隐藏状态转换为查询向量
        query = attn.to_q(hidden_states)
    
        # 判断是否进行交叉注意力
        is_cross_attention = encoder_hidden_states is not None
        # 如果没有编码器隐藏状态,则使用隐藏状态本身
        if encoder_hidden_states is None:
            encoder_hidden_states = hidden_states
        # 如果需要归一化交叉注意力的隐藏状态
        elif attn.norm_cross:
            encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
    
        # 将编码器隐藏状态转换为键和值
        key = attn.to_k(encoder_hidden_states)
        value = attn.to_v(encoder_hidden_states)
    
        # 处理交叉帧注意力
        if not is_cross_attention:
            # 计算视频长度和第一个帧索引
            video_length = key.size()[0] // self.batch_size
            first_frame_index = [0] * video_length
    
            # 重新排列键,使批次和帧在前两个维度
            key = rearrange_3(key, video_length)
            key = key[:, first_frame_index]
            # 重新排列值,使批次和帧在前两个维度
            value = rearrange_3(value, video_length)
            value = value[:, first_frame_index]
    
            # 重新排列回原始形状
            key = rearrange_4(key)
            value = rearrange_4(value)
    
        # 将查询、键和值转换为批次维度
        query = attn.head_to_batch_dim(query)
        key = attn.head_to_batch_dim(key)
        value = attn.head_to_batch_dim(value)
    
        # 计算注意力分数
        attention_probs = attn.get_attention_scores(query, key, attention_mask)
        # 应用注意力分数于值,得到隐藏状态
        hidden_states = torch.bmm(attention_probs, value)
        # 将隐藏状态转换回头部维度
        hidden_states = attn.batch_to_head_dim(hidden_states)
    
        # 线性投影
        hidden_states = attn.to_out[0](hidden_states)
        # 应用 dropout
        hidden_states = attn.to_out[1](hidden_states)
    
        # 返回最终的隐藏状态
        return hidden_states
# 定义一个名为 CrossFrameAttnProcessor2_0 的类
class CrossFrameAttnProcessor2_0:
    """
    Cross frame attention processor with scaled_dot_product attention of Pytorch 2.0.

    Args:
        batch_size: The number that represents actual batch size, other than the frames.
            For example, calling unet with a single prompt and num_images_per_prompt=1, batch_size should be equal to
            2, due to classifier-free guidance.
    """

    # 初始化方法,设置默认的批次大小为 2
    def __init__(self, batch_size=2):
        # 检查 F 是否具有 scaled_dot_product_attention 属性
        if not hasattr(F, "scaled_dot_product_attention"):
            # 如果没有,抛出 ImportError,提示需要升级到 PyTorch 2.0
            raise ImportError("AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
        # 将传入的批次大小赋值给实例变量 self.batch_size
        self.batch_size = batch_size
    # 定义可调用对象的方法,接受注意力、隐藏状态和可选的编码器隐藏状态、注意力掩码
        def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
            # 获取批次大小和序列长度,使用编码器隐藏状态的形状或隐藏状态的形状
            batch_size, sequence_length, _ = (
                hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
            )
            # 获取隐藏状态的内部维度
            inner_dim = hidden_states.shape[-1]
    
            # 如果提供了注意力掩码
            if attention_mask is not None:
                # 准备注意力掩码,调整形状以匹配序列长度和批次大小
                attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
                # scaled_dot_product_attention 期望注意力掩码的形状为 (batch, heads, source_length, target_length)
                attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])
    
            # 将隐藏状态转换为查询
            query = attn.to_q(hidden_states)
    
            # 检查是否为交叉注意力
            is_cross_attention = encoder_hidden_states is not None
            # 如果没有提供编码器隐藏状态,则使用隐藏状态
            if encoder_hidden_states is None:
                encoder_hidden_states = hidden_states
            # 如果需要,对编码器隐藏状态进行归一化
            elif attn.norm_cross:
                encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
    
            # 将编码器隐藏状态转换为键和值
            key = attn.to_k(encoder_hidden_states)
            value = attn.to_v(encoder_hidden_states)
    
            # 处理交叉帧注意力
            if not is_cross_attention:
                # 计算视频长度并初始化第一个帧索引
                video_length = max(1, key.size()[0] // self.batch_size)
                first_frame_index = [0] * video_length
    
                # 重新排列键,使批次和帧位于第1和第2维
                key = rearrange_3(key, video_length)
                key = key[:, first_frame_index]
                # 重新排列值,使批次和帧位于第1和第2维
                value = rearrange_3(value, video_length)
                value = value[:, first_frame_index]
    
                # 重新排列回原始形状
                key = rearrange_4(key)
                value = rearrange_4(value)
    
            # 计算每个头的维度
            head_dim = inner_dim // attn.heads
            # 调整查询的形状并转置维度
            query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
            # 调整键的形状并转置维度
            key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
            # 调整值的形状并转置维度
            value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
    
            # 执行缩放点积注意力,输出形状为 (batch, num_heads, seq_len, head_dim)
            # TODO: 当我们迁移到 Torch 2.1 时,添加对 attn.scale 的支持
            hidden_states = F.scaled_dot_product_attention(
                query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
            )
    
            # 转置输出并调整形状以合并头的维度
            hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
            # 将隐藏状态转换为查询的 dtype
            hidden_states = hidden_states.to(query.dtype)
    
            # 应用线性投影
            hidden_states = attn.to_out[0](hidden_states)
            # 应用 dropout
            hidden_states = attn.to_out[1](hidden_states)
            # 返回最终的隐藏状态
            return hidden_states
# 定义一个数据类,用于表示零-shot文本到视频的输出
@dataclass
class TextToVideoPipelineOutput(BaseOutput):
    r"""
    输出类用于零-shot文本到视频管道。

    参数:
        images (`[List[PIL.Image.Image]`, `np.ndarray`]):
            长度为`batch_size`的去噪PIL图像列表或形状为`(batch_size, height, width, num_channels)`的NumPy数组。
        nsfw_content_detected (`[List[bool]]`):
            列表,指示相应生成的图像是否包含“不安全内容”(nsfw),如果安全检查无法执行则为`None`。
    """

    # 图像字段,支持PIL图像列表或NumPy数组
    images: Union[List[PIL.Image.Image], np.ndarray]
    # 检测到的NSFW内容列表
    nsfw_content_detected: Optional[List[bool]]


# 定义一个函数,用于生成坐标网格
def coords_grid(batch, ht, wd, device):
    # 从指定高度和宽度生成网格坐标
    coords = torch.meshgrid(torch.arange(ht, device=device), torch.arange(wd, device=device))
    # 将生成的坐标堆叠并转换为浮点数
    coords = torch.stack(coords[::-1], dim=0).float()
    # 将坐标扩展到指定批量大小
    return coords[None].repeat(batch, 1, 1, 1)


# 定义一个函数,用于根据给定的光流变形单帧的潜在编码
def warp_single_latent(latent, reference_flow):
    """
    使用给定的光流变形单帧的潜在编码

    参数:
        latent: 单帧的潜在编码
        reference_flow: 用于变形潜在编码的光流

    返回:
        warped: 变形后的潜在编码
    """
    # 获取参考光流的高度和宽度
    _, _, H, W = reference_flow.size()
    # 获取潜在编码的高度和宽度
    _, _, h, w = latent.size()
    # 生成坐标网格
    coords0 = coords_grid(1, H, W, device=latent.device).to(latent.dtype)

    # 将光流添加到坐标上
    coords_t0 = coords0 + reference_flow
    # 归一化坐标
    coords_t0[:, 0] /= W
    coords_t0[:, 1] /= H

    # 将坐标缩放到[-1, 1]范围
    coords_t0 = coords_t0 * 2.0 - 1.0
    # 使用双线性插值调整坐标大小
    coords_t0 = F.interpolate(coords_t0, size=(h, w), mode="bilinear")
    # 重新排列坐标的维度
    coords_t0 = torch.permute(coords_t0, (0, 2, 3, 1))

    # 根据坐标样本获取变形后的潜在编码
    warped = grid_sample(latent, coords_t0, mode="nearest", padding_mode="reflection")
    return warped


# 定义一个函数,用于创建平移运动场
def create_motion_field(motion_field_strength_x, motion_field_strength_y, frame_ids, device, dtype):
    """
    创建平移运动场

    参数:
        motion_field_strength_x: x轴的运动强度
        motion_field_strength_y: y轴的运动强度
        frame_ids: 正在处理的潜在帧的索引。
            在进行分块推理时需要此信息
        device: 设备
        dtype: 数据类型

    返回:

    """
    # 获取帧的数量
    seq_length = len(frame_ids)
    # 创建一个全零的参考光流张量
    reference_flow = torch.zeros((seq_length, 2, 512, 512), device=device, dtype=dtype)
    # 遍历每一帧,生成对应的运动场
    for fr_idx in range(seq_length):
        reference_flow[fr_idx, 0, :, :] = motion_field_strength_x * (frame_ids[fr_idx])
        reference_flow[fr_idx, 1, :, :] = motion_field_strength_y * (frame_ids[fr_idx])
    return reference_flow


# 定义一个函数,用于创建运动场并相应地变形潜在编码
def create_motion_field_and_warp_latents(motion_field_strength_x, motion_field_strength_y, frame_ids, latents):
    """
    创建平移运动并相应地变形潜在编码
    # 参数说明
    Args:
        motion_field_strength_x: motion strength along x-axis  # 表示x轴上的运动强度
        motion_field_strength_y: motion strength along y-axis  # 表示y轴上的运动强度
        frame_ids: indexes of the frames the latents of which are being processed.  # 当前处理的帧的索引
            This is needed when we perform chunk-by-chunk inference  # 进行分块推理时需要使用
        latents: latent codes of frames  # 帧的潜在代码

    Returns:
        warped_latents: warped latents  # 返回经过变形处理的潜在代码
    """
    # 创建运动场,结合x轴和y轴的运动强度
    motion_field = create_motion_field(
        motion_field_strength_x=motion_field_strength_x,  # 传入x轴运动强度
        motion_field_strength_y=motion_field_strength_y,  # 传入y轴运动强度
        frame_ids=frame_ids,  # 传入帧索引
        device=latents.device,  # 使用潜在代码的设备信息
        dtype=latents.dtype,  # 使用潜在代码的数据类型
    )
    # 克隆潜在代码,以便进行后续的变形处理
    warped_latents = latents.clone().detach()
    # 遍历每个变形后的潜在代码
    for i in range(len(warped_latents)):
        # 对每个潜在代码进行单独的变形处理
        warped_latents[i] = warp_single_latent(latents[i][None], motion_field[i][None])
    # 返回变形后的潜在代码
    return warped_latents
# 定义一个名为 TextToVideoZeroPipeline 的类,继承自多个父类
class TextToVideoZeroPipeline(
    # 继承 DiffusionPipeline 类
    DiffusionPipeline, 
    # 继承 StableDiffusionMixin 类
    StableDiffusionMixin, 
    # 继承 TextualInversionLoaderMixin 类
    TextualInversionLoaderMixin, 
    # 继承 StableDiffusionLoraLoaderMixin 类
    StableDiffusionLoraLoaderMixin
):
    r"""
    用于使用 Stable Diffusion 进行零-shot 文本到视频生成的管道。

    该模型继承自 [`DiffusionPipeline`]。请查看父类文档以获取所有管道的通用方法
    (下载、保存、在特定设备上运行等)。

    参数:
        vae ([`AutoencoderKL`]):
            变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。
        text_encoder ([`CLIPTextModel`]):
            冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
        tokenizer (`CLIPTokenizer`):
            用于标记文本的 [`~transformers.CLIPTokenizer`]。
        unet ([`UNet2DConditionModel`]):
            [`UNet3DConditionModel`] 用于去噪编码的视频潜在。
        scheduler ([`SchedulerMixin`]):
            用于与 `unet` 结合使用的调度器,以去噪编码的图像潜在。可以是
            [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
        safety_checker ([`StableDiffusionSafetyChecker`]):
            分类模块,用于估计生成的图像是否可能被视为攻击性或有害。
            有关模型潜在危害的更多详细信息,请参阅 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
        feature_extractor ([`CLIPImageProcessor`]):
            用于从生成图像中提取特征的 [`CLIPImageProcessor`];作为 `safety_checker` 的输入。
    """

    # 初始化方法,用于创建类的实例
    def __init__(
        # 变分自编码器 (VAE) 实例
        vae: AutoencoderKL,
        # 文本编码器实例
        text_encoder: CLIPTextModel,
        # 标记器实例
        tokenizer: CLIPTokenizer,
        # UNet 实例,用于去噪处理
        unet: UNet2DConditionModel,
        # 调度器实例
        scheduler: KarrasDiffusionSchedulers,
        # 安全检查器实例
        safety_checker: StableDiffusionSafetyChecker,
        # 特征提取器实例
        feature_extractor: CLIPImageProcessor,
        # 是否需要安全检查器的布尔值,默认值为 True
        requires_safety_checker: bool = True,
    ):
        # 调用父类的构造函数进行初始化
        super().__init__()
        # 注册多个模块,包括 VAE、文本编码器、分词器、UNet、调度器、安全检查器和特征提取器
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            unet=unet,
            scheduler=scheduler,
            safety_checker=safety_checker,
            feature_extractor=feature_extractor,
        )
        # 如果安全检查器为 None 且需要安全检查器,则发出警告
        if safety_checker is None and requires_safety_checker:
            logger.warning(
                # 提示用户禁用安全检查器可能带来的风险和使用条款
                f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
                " that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
                " results in services or applications open to the public. Both the diffusers team and Hugging Face"
                " strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
                " it only for use-cases that involve analyzing network behavior or auditing its results. For more"
                " information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
            )
        # 根据 VAE 的配置计算缩放因子
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建 VAE 图像处理器,使用之前计算的缩放因子
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

    def forward_loop(self, x_t0, t0, t1, generator):
        """
        执行 DDPM 前向过程,从时间 t0 到 t1。这与添加具有相应方差的噪声相同。

        Args:
            x_t0:
                时间 t0 时的潜在代码。
            t0:
                t0 时的时间步。
            t1:
                t1 时的时间戳。
            generator (`torch.Generator` 或 `List[torch.Generator]`, *可选*):
                一个 [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) 用于生成
                确定性结果。

        Returns:
            x_t1:
                应用前向过程后的 x_t0,从时间 t0 到 t1。
        """
        # 生成与 x_t0 大小相同的随机噪声张量
        eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
        # 计算在 t0 到 t1 时间步之间的 alpha 向量的乘积
        alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
        # 计算 t1 时的潜在代码,结合原始潜在代码和随机噪声
        x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
        # 返回时间 t1 的潜在代码
        return x_t1

    def backward_loop(
        self,
        latents,
        timesteps,
        prompt_embeds,
        guidance_scale,
        callback,
        callback_steps,
        num_warmup_steps,
        extra_step_kwargs,
        cross_attention_kwargs=None,
    # 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.StableDiffusionKDiffusionPipeline.check_inputs 复制
    def check_inputs(
        self,
        prompt,
        height,
        width,
        callback_steps,
        negative_prompt=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        callback_on_step_end_tensor_inputs=None,
    ):
        # 检查高度和宽度是否能被8整除,若不能则抛出错误
        if height % 8 != 0 or width % 8 != 0:
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")

        # 检查回调步数是否为正整数,若不是则抛出错误
        if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
            raise ValueError(
                f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
                f" {type(callback_steps)}."
            )
        
        # 检查回调结束时的张量输入是否在已定义的输入列表中
        if callback_on_step_end_tensor_inputs is not None and not all(
            k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
        ):
            raise ValueError(
                f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
            )

        # 检查提示和提示嵌入是否同时存在,若同时存在则抛出错误
        if prompt is not None and prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
                " only forward one of the two."
            )
        # 检查提示和提示嵌入是否都未提供,若都未提供则抛出错误
        elif prompt is None and prompt_embeds is None:
            raise ValueError(
                "Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
            )
        # 检查提示类型是否为字符串或列表,若不是则抛出错误
        elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
            raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")

        # 检查负提示和负提示嵌入是否同时存在,若同时存在则抛出错误
        if negative_prompt is not None and negative_prompt_embeds is not None:
            raise ValueError(
                f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
                f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
            )

        # 检查提示嵌入和负提示嵌入是否同时存在且形状是否一致,若不一致则抛出错误
        if prompt_embeds is not None and negative_prompt_embeds is not None:
            if prompt_embeds.shape != negative_prompt_embeds.shape:
                raise ValueError(
                    "`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
                    f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
                    f" {negative_prompt_embeds.shape}."
                )

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的内容
    # 准备潜在变量以进行模型推理
        def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
            # 定义潜在变量的形状,包括批量大小和通道数
            shape = (
                batch_size,
                num_channels_latents,
                int(height) // self.vae_scale_factor,
                int(width) // self.vae_scale_factor,
            )
            # 检查生成器是否为列表且长度与批量大小匹配
            if isinstance(generator, list) and len(generator) != batch_size:
                raise ValueError(
                    f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                    f" size of {batch_size}. Make sure the batch size matches the length of the generators."
                )
            # 如果潜在变量未提供,则生成随机潜在变量
            if latents is None:
                latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
            else:
                # 将提供的潜在变量转移到指定设备
                latents = latents.to(device)
            # 根据调度器所需的标准差缩放初始噪声
            latents = latents * self.scheduler.init_noise_sigma
            # 返回处理后的潜在变量
            return latents
    
        # 不需要计算梯度的调用方法
        @torch.no_grad()
        def __call__(
            # 接受多个参数以生成视频
            self,
            prompt: Union[str, List[str]],
            video_length: Optional[int] = 8,
            height: Optional[int] = None,
            width: Optional[int] = None,
            num_inference_steps: int = 50,
            guidance_scale: float = 7.5,
            negative_prompt: Optional[Union[str, List[str]]] = None,
            num_videos_per_prompt: Optional[int] = 1,
            eta: float = 0.0,
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            latents: Optional[torch.Tensor] = None,
            motion_field_strength_x: float = 12,
            motion_field_strength_y: float = 12,
            output_type: Optional[str] = "tensor",
            return_dict: bool = True,
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
            callback_steps: Optional[int] = 1,
            t0: int = 44,
            t1: int = 47,
            frame_ids: Optional[List[int]] = None,
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 中复制
        def run_safety_checker(self, image, device, dtype):
            # 检查是否定义了安全检查器
            if self.safety_checker is None:
                has_nsfw_concept = None
            else:
                # 如果输入为张量,进行后处理转换
                if torch.is_tensor(image):
                    feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
                else:
                    # 如果输入为 numpy 数组,转换为 PIL 图像
                    feature_extractor_input = self.image_processor.numpy_to_pil(image)
                # 使用特征提取器获取安全检查器输入并转移到指定设备
                safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
                # 运行安全检查器,返回处理后的图像和 NSFW 概念标识
                image, has_nsfw_concept = self.safety_checker(
                    images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
                )
            # 返回处理后的图像及其 NSFW 概念标识
            return image, has_nsfw_concept
    
        # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 中复制
    # 准备额外的参数用于调度器步骤,因为并非所有调度器都有相同的参数签名
    def prepare_extra_step_kwargs(self, generator, eta):
        # eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 其值应在 [0, 1] 之间

        # 检查调度器的 step 方法是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 初始化额外步骤参数的字典
        extra_step_kwargs = {}
        # 如果接受 eta 参数,则将其添加到字典中
        if accepts_eta:
            extra_step_kwargs["eta"] = eta

        # 检查调度器的 step 方法是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        # 如果接受 generator 参数,则将其添加到字典中
        if accepts_generator:
            extra_step_kwargs["generator"] = generator
        # 返回包含额外参数的字典
        return extra_step_kwargs

    # 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
    def encode_prompt(
        self,
        # 要编码的提示文本
        prompt,
        # 要使用的设备
        device,
        # 每个提示生成的图像数量
        num_images_per_prompt,
        # 是否执行无分类器自由引导
        do_classifier_free_guidance,
        # 可选的负提示文本
        negative_prompt=None,
        # 可选的提示嵌入
        prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的负提示嵌入
        negative_prompt_embeds: Optional[torch.Tensor] = None,
        # 可选的 LoRA 缩放因子
        lora_scale: Optional[float] = None,
        # 可选的剪切跳过参数
        clip_skip: Optional[int] = None,
    def decode_latents(self, latents):
        # 将潜在变量缩放回原始大小
        latents = 1 / self.vae.config.scaling_factor * latents
        # 解码潜在变量,返回图像
        image = self.vae.decode(latents, return_dict=False)[0]
        # 将图像像素值归一化到 [0, 1] 范围
        image = (image / 2 + 0.5).clamp(0, 1)
        # 始终转换为 float32 类型,以避免显著开销,并与 bfloat16 兼容
        image = image.cpu().permute(0, 2, 3, 1).float().numpy()
        # 返回处理后的图像
        return image

.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_zero_sdxl.py

# 导入标准库和第三方库
import copy  # 导入复制对象的模块
import inspect  # 导入检查对象的模块
from dataclasses import dataclass  # 导入数据类装饰器
from typing import Any, Callable, Dict, List, Optional, Tuple, Union  # 导入类型提示

import numpy as np  # 导入 NumPy 库
import PIL  # 导入 PIL 库
import torch  # 导入 PyTorch 库
import torch.nn.functional as F  # 导入 PyTorch 的功能模块
from torch.nn.functional import grid_sample  # 导入网格采样功能

# 导入变换器相关的模块
from transformers import (
    CLIPImageProcessor,  # 导入 CLIP 图像处理器
    CLIPTextModel,  # 导入 CLIP 文本模型
    CLIPTextModelWithProjection,  # 导入带有投影的 CLIP 文本模型
    CLIPTokenizer,  # 导入 CLIP 词法分析器
    CLIPVisionModelWithProjection,  # 导入带有投影的 CLIP 视觉模型
)

# 从相对路径导入图像处理器和加载器
from ...image_processor import VaeImageProcessor  # 导入 VAE 图像处理器
from ...loaders import StableDiffusionXLLoraLoaderMixin, TextualInversionLoaderMixin  # 导入稳定扩散和文本反转加载器混合类
from ...models import AutoencoderKL, UNet2DConditionModel  # 导入自编码器和条件 U-Net 模型

# 导入注意力处理器相关的模块
from ...models.attention_processor import (
    AttnProcessor2_0,  # 导入注意力处理器版本 2.0
    FusedAttnProcessor2_0,  # 导入融合注意力处理器版本 2.0
    XFormersAttnProcessor,  # 导入 XFormers 注意力处理器
)

# 从相对路径导入 LoRA 相关功能
from ...models.lora import adjust_lora_scale_text_encoder  # 导入调整 LoRA 缩放文本编码器的函数

# 导入调度器
from ...schedulers import KarrasDiffusionSchedulers  # 导入 Karras 扩散调度器

# 导入实用工具
from ...utils import (
    USE_PEFT_BACKEND,  # 导入是否使用 PEFT 后端的标志
    BaseOutput,  # 导入基础输出类
    is_invisible_watermark_available,  # 导入检查不可见水印是否可用的函数
    logging,  # 导入日志模块
    scale_lora_layers,  # 导入缩放 LoRA 层的函数
    unscale_lora_layers,  # 导入反缩放 LoRA 层的函数
)

# 从相对路径导入 PyTorch 实用工具
from ...utils.torch_utils import randn_tensor  # 导入随机张量生成函数

# 导入管道工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin  # 导入扩散管道和稳定扩散混合类

# 如果不可见水印可用,则导入水印模块
if is_invisible_watermark_available():
    from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker  # 导入稳定扩散 XL 水印器

# 创建日志记录器
logger = logging.get_logger(__name__)  # 根据当前模块名称创建日志记录器,pylint 检查规则

# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_0 中复制的函数
def rearrange_0(tensor, f):
    F, C, H, W = tensor.size()  # 获取张量的尺寸,F:帧数, C:通道数, H:高度, W:宽度
    tensor = torch.permute(torch.reshape(tensor, (F // f, f, C, H, W)), (0, 2, 1, 3, 4))  # 改变张量的形状和维度
    return tensor  # 返回重新排列的张量

# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_1 中复制的函数
def rearrange_1(tensor):
    B, C, F, H, W = tensor.size()  # 获取张量的尺寸,B:批次大小, C:通道数, F:帧数, H:高度, W:宽度
    return torch.reshape(torch.permute(tensor, (0, 2, 1, 3, 4)), (B * F, C, H, W))  # 改变维度并返回张量

# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_3 中复制的函数
def rearrange_3(tensor, f):
    F, D, C = tensor.size()  # 获取张量的尺寸,F:帧数, D:特征维度, C:通道数
    return torch.reshape(tensor, (F // f, f, D, C))  # 改变张量的形状并返回

# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_4 中复制的函数
def rearrange_4(tensor):
    B, F, D, C = tensor.size()  # 获取张量的尺寸,B:批次大小, F:帧数, D:特征维度, C:通道数
    return torch.reshape(tensor, (B * F, D, C))  # 改变张量形状并返回

# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor 中复制的类
class CrossFrameAttnProcessor:
    """
    跨帧注意力处理器。每帧关注第一帧。

    参数:
        batch_size: 表示实际批次大小的数字,除了帧数以外。
            例如,在使用单个提示和 num_images_per_prompt=1 调用 unet 时,batch_size 应该等于
            2,因分类器自由引导。
    """

    def __init__(self, batch_size=2):
        self.batch_size = batch_size  # 初始化批次大小
    # 定义可调用的前向传递方法,接受注意力机制相关参数
        def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
            # 获取批次大小、序列长度和隐藏状态的维度
            batch_size, sequence_length, _ = hidden_states.shape
            # 准备注意力掩码以适应批次和序列长度
            attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
            # 将隐藏状态转换为查询向量
            query = attn.to_q(hidden_states)
    
            # 判断是否为交叉注意力
            is_cross_attention = encoder_hidden_states is not None
            # 如果没有编码器隐藏状态,则使用隐藏状态本身
            if encoder_hidden_states is None:
                encoder_hidden_states = hidden_states
            # 如果需要归一化交叉注意力,则对编码器隐藏状态进行归一化
            elif attn.norm_cross:
                encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
    
            # 将编码器隐藏状态转换为键向量
            key = attn.to_k(encoder_hidden_states)
            # 将编码器隐藏状态转换为值向量
            value = attn.to_v(encoder_hidden_states)
    
            # 处理交叉帧注意力
            if not is_cross_attention:
                # 计算视频长度并初始化第一个帧索引列表
                video_length = key.size()[0] // self.batch_size
                first_frame_index = [0] * video_length
    
                # 重新排列键以使批次和帧在第一和第二维度
                key = rearrange_3(key, video_length)
                key = key[:, first_frame_index]
                # 重新排列值以使批次和帧在第一和第二维度
                value = rearrange_3(value, video_length)
                value = value[:, first_frame_index]
    
                # 重新排列回原始形状
                key = rearrange_4(key)
                value = rearrange_4(value)
    
            # 将查询、键和值转换为批次维度
            query = attn.head_to_batch_dim(query)
            key = attn.head_to_batch_dim(key)
            value = attn.head_to_batch_dim(value)
    
            # 计算注意力得分
            attention_probs = attn.get_attention_scores(query, key, attention_mask)
            # 使用注意力概率与值进行批次矩阵乘法
            hidden_states = torch.bmm(attention_probs, value)
            # 将隐藏状态转换回头部维度
            hidden_states = attn.batch_to_head_dim(hidden_states)
    
            # 对隐藏状态进行线性投影
            hidden_states = attn.to_out[0](hidden_states)
            # 对隐藏状态应用 dropout
            hidden_states = attn.to_out[1](hidden_states)
    
            # 返回处理后的隐藏状态
            return hidden_states
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero 中复制的类
class CrossFrameAttnProcessor2_0:
    """
    使用 PyTorch 2.0 的缩放点积注意力的跨帧注意力处理器。

    Args:
        batch_size: 表示实际批处理大小的数字,而不是帧的数量。
            例如,使用单个提示和 num_images_per_prompt=1 调用 unet 时,batch_size 应该等于
            2,因为无分类器引导。
    """

    # 初始化方法,接受批处理大小的参数,默认为 2
    def __init__(self, batch_size=2):
        # 检查 PyTorch 是否具有缩放点积注意力的功能
        if not hasattr(F, "scaled_dot_product_attention"):
            # 如果没有,抛出导入错误,提示需要升级 PyTorch 到 2.0
            raise ImportError("AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
        # 将传入的批处理大小赋值给实例变量
        self.batch_size = batch_size
    # 定义调用方法,用于执行注意力机制
    def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
        # 获取批次大小和序列长度,选择使用的隐藏状态
        batch_size, sequence_length, _ = (
            hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
        )
        # 获取隐藏状态的内维度
        inner_dim = hidden_states.shape[-1]

        # 如果存在注意力掩码
        if attention_mask is not None:
            # 准备注意力掩码,以适应序列长度和批次大小
            attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
            # scaled_dot_product_attention 期望注意力掩码的形状为 (batch, heads, source_length, target_length)
            attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])

        # 将隐藏状态转换为查询向量
        query = attn.to_q(hidden_states)

        # 检查是否为交叉注意力
        is_cross_attention = encoder_hidden_states is not None
        # 如果没有交叉隐藏状态,则使用当前的隐藏状态
        if encoder_hidden_states is None:
            encoder_hidden_states = hidden_states
        # 如果需要进行交叉注意力的归一化
        elif attn.norm_cross:
            encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)

        # 将交叉隐藏状态转换为键和值
        key = attn.to_k(encoder_hidden_states)
        value = attn.to_v(encoder_hidden_states)

        # 处理非交叉帧注意力
        if not is_cross_attention:
            # 计算视频长度,确保至少为1
            video_length = max(1, key.size()[0] // self.batch_size)
            first_frame_index = [0] * video_length

            # 重新排列键,使批次和帧位于第1和第2维
            key = rearrange_3(key, video_length)
            key = key[:, first_frame_index]
            # 重新排列值,使批次和帧位于第1和第2维
            value = rearrange_3(value, video_length)
            value = value[:, first_frame_index]

            # 重新排列回原始形状
            key = rearrange_4(key)
            value = rearrange_4(value)

        # 计算每个头的维度
        head_dim = inner_dim // attn.heads
        # 重新排列查询、键和值的形状
        query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
        key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
        value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)

        # scaled_dot_product_attention 的输出形状为 (batch, num_heads, seq_len, head_dim)
        # TODO: 在升级到 Torch 2.1 时添加对 attn.scale 的支持
        hidden_states = F.scaled_dot_product_attention(
            query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
        )

        # 转置并重塑隐藏状态以合并头维度
        hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
        # 将隐藏状态转换为查询向量的数据类型
        hidden_states = hidden_states.to(query.dtype)

        # 线性变换
        hidden_states = attn.to_out[0](hidden_states)
        # 应用 dropout
        hidden_states = attn.to_out[1](hidden_states)
        # 返回最终的隐藏状态
        return hidden_states
# 定义一个用于零-shot文本到视频管道的输出类,继承自BaseOutput
@dataclass
class TextToVideoSDXLPipelineOutput(BaseOutput):
    """
    输出类用于零-shot文本到视频管道。

    参数:
        images (`List[PIL.Image.Image]` 或 `np.ndarray`)
            长度为 `batch_size` 的去噪PIL图像列表或形状为 `(batch_size, height, width,
            num_channels)` 的numpy数组。PIL图像或numpy数组呈现扩散管道的去噪图像。
    """

    # 定义属性images,可以是PIL图像列表或numpy数组
    images: Union[List[PIL.Image.Image], np.ndarray]


# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.coords_grid复制的函数
def coords_grid(batch, ht, wd, device):
    # 从给定的高度和宽度生成网格坐标,适应设备
    # 适配自:https://github.com/princeton-vl/RAFT/blob/master/core/utils/utils.py
    coords = torch.meshgrid(torch.arange(ht, device=device), torch.arange(wd, device=device))
    # 反转坐标顺序并转换为浮点型
    coords = torch.stack(coords[::-1], dim=0).float()
    # 生成的坐标扩展到指定的batch维度
    return coords[None].repeat(batch, 1, 1, 1)


# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.warp_single_latent复制的函数
def warp_single_latent(latent, reference_flow):
    """
    使用给定的流对单帧的潜在表示进行扭曲

    参数:
        latent: 单帧的潜在代码
        reference_flow: 用于扭曲潜在表示的流

    返回:
        warped: 扭曲后的潜在表示
    """
    # 获取参考流的尺寸,高度和宽度
    _, _, H, W = reference_flow.size()
    # 获取潜在表示的尺寸,高度和宽度
    _, _, h, w = latent.size()
    # 生成坐标网格并转换为潜在表示的dtype
    coords0 = coords_grid(1, H, W, device=latent.device).to(latent.dtype)

    # 将参考流加到坐标网格上
    coords_t0 = coords0 + reference_flow
    # 将坐标归一化到[0, 1]范围
    coords_t0[:, 0] /= W
    coords_t0[:, 1] /= H

    # 将坐标缩放到[-1, 1]范围
    coords_t0 = coords_t0 * 2.0 - 1.0
    # 对坐标进行双线性插值以调整大小
    coords_t0 = F.interpolate(coords_t0, size=(h, w), mode="bilinear")
    # 调整坐标的维度顺序
    coords_t0 = torch.permute(coords_t0, (0, 2, 3, 1))

    # 使用给定坐标从潜在表示中采样扭曲的结果
    warped = grid_sample(latent, coords_t0, mode="nearest", padding_mode="reflection")
    return warped


# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field复制的函数
def create_motion_field(motion_field_strength_x, motion_field_strength_y, frame_ids, device, dtype):
    """
    创建平移运动场

    参数:
        motion_field_strength_x: x轴的运动强度
        motion_field_strength_y: y轴的运动强度
        frame_ids: 正在处理的潜在表示的帧索引。
            当我们执行分块推理时,这一点很重要
        device: 设备
        dtype: 数据类型

    返回:

    """
    # 获取帧的序列长度
    seq_length = len(frame_ids)
    # 初始化参考流,大小为 (seq_length, 2, 512, 512)
    reference_flow = torch.zeros((seq_length, 2, 512, 512), device=device, dtype=dtype)
    # 遍历每一帧,计算对应的运动场
    for fr_idx in range(seq_length):
        # 在x轴上为每一帧应用运动强度
        reference_flow[fr_idx, 0, :, :] = motion_field_strength_x * (frame_ids[fr_idx])
        # 在y轴上为每一帧应用运动强度
        reference_flow[fr_idx, 1, :, :] = motion_field_strength_y * (frame_ids[fr_idx])
    # 返回生成的参考流
    return reference_flow


# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field_and_warp_latents复制的函数
def create_motion_field_and_warp_latents(motion_field_strength_x, motion_field_strength_y, frame_ids, latents):
    """
    # 创建平移运动,并相应地扭曲潜在表示
        """
        # 参数说明
        # motion_field_strength_x: x 轴上的运动强度
        # motion_field_strength_y: y 轴上的运动强度
        # frame_ids: 正在处理的帧的索引。
        #          在进行分块推理时需要此信息
        # latents: 帧的潜在编码
        # 返回值说明
        # warped_latents: 扭曲后的潜在表示
        """
        # 创建运动场,根据给定的运动强度和帧 ID
        motion_field = create_motion_field(
            # 设置 x 轴运动强度
            motion_field_strength_x=motion_field_strength_x,
            # 设置 y 轴运动强度
            motion_field_strength_y=motion_field_strength_y,
            # 传入帧 ID
            frame_ids=frame_ids,
            # 设定设备为潜在表示的设备
            device=latents.device,
            # 设定数据类型为潜在表示的数据类型
            dtype=latents.dtype,
        )
        # 克隆潜在表示以创建扭曲后的潜在表示
        warped_latents = latents.clone().detach()
        # 遍历所有扭曲的潜在表示
        for i in range(len(warped_latents)):
            # 对每个潜在表示应用单个扭曲操作
            warped_latents[i] = warp_single_latent(latents[i][None], motion_field[i][None])
        # 返回扭曲后的潜在表示
        return warped_latents
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
    """
    根据 `guidance_rescale` 调整 `noise_cfg`。基于 [Common Diffusion Noise Schedules and
    Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 的研究结果。见第 3.4 节
    """
    # 计算 noise_pred_text 在所有维度上的标准差,并保持维度
    std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
    # 计算 noise_cfg 在所有维度上的标准差,并保持维度
    std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
    # 根据标准差调整来自指导的结果(修正过度曝光)
    noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
    # 通过 guidance_rescale 因子与来自指导的原始结果混合,以避免“单调”的图像
    noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
    # 返回调整后的 noise_cfg
    return noise_cfg


# 定义一个用于零-shot 文本到视频生成的管道类
class TextToVideoZeroSDXLPipeline(
    DiffusionPipeline,  # 继承自 DiffusionPipeline
    StableDiffusionMixin,  # 继承自 StableDiffusionMixin
    StableDiffusionXLLoraLoaderMixin,  # 继承自 StableDiffusionXLLoraLoaderMixin
    TextualInversionLoaderMixin,  # 继承自 TextualInversionLoaderMixin
):
    r"""
    使用 Stable Diffusion XL 进行零-shot 文本到视频生成的管道。

    该模型继承自 [`DiffusionPipeline`]。查看超类文档以获取所有管道的通用方法
    (下载、保存、在特定设备上运行等)。
    # 参数说明
    Args:
        vae ([`AutoencoderKL`]):
            # 变分自编码器(VAE)模型,用于将图像编码为潜在表示,并从潜在表示解码图像。
        text_encoder ([`CLIPTextModel`]):
            # 冻结的文本编码器。稳定扩散 XL 使用 CLIP 的文本部分,
            # 具体来说是 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
        text_encoder_2 ([` CLIPTextModelWithProjection`]):
            # 第二个冻结文本编码器。稳定扩散 XL 使用 CLIP 的文本和池部分,
            # 具体是 [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k) 变体。
        tokenizer (`CLIPTokenizer`):
            # CLIPTokenizer 类的分词器。
        tokenizer_2 (`CLIPTokenizer`):
            # 第二个 CLIPTokenizer 类的分词器。
        unet ([`UNet2DConditionModel`]): 
            # 条件 U-Net 架构,用于去噪编码后的图像潜在表示。
        scheduler ([`SchedulerMixin`]):
            # 调度器,用于与 `unet` 结合去噪编码后的图像潜在表示。可以是
            # [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的任意一个。
    """

    # 定义模型在 CPU 上的卸载顺序
    model_cpu_offload_seq = "text_encoder->text_encoder_2->unet->vae"
    # 定义可选组件列表
    _optional_components = [
        # 分词器
        "tokenizer",
        # 第二个分词器
        "tokenizer_2",
        # 第一个文本编码器
        "text_encoder",
        # 第二个文本编码器
        "text_encoder_2",
        # 图像编码器
        "image_encoder",
        # 特征提取器
        "feature_extractor",
    ]

    # 初始化方法定义
    def __init__(
        # 变分自编码器
        self,
        vae: AutoencoderKL,
        # 文本编码器
        text_encoder: CLIPTextModel,
        # 第二个文本编码器
        text_encoder_2: CLIPTextModelWithProjection,
        # 第一个分词器
        tokenizer: CLIPTokenizer,
        # 第二个分词器
        tokenizer_2: CLIPTokenizer,
        # 条件 U-Net
        unet: UNet2DConditionModel,
        # 调度器
        scheduler: KarrasDiffusionSchedulers,
        # 可选的图像编码器,默认为 None
        image_encoder: CLIPVisionModelWithProjection = None,
        # 可选的特征提取器,默认为 None
        feature_extractor: CLIPImageProcessor = None,
        # 用于处理空提示的强制零值,默认为 True
        force_zeros_for_empty_prompt: bool = True,
        # 可选的水印标记,默认为 None
        add_watermarker: Optional[bool] = None,
    # 初始化父类
    ):
        super().__init__()
        # 注册多个模块,包括 VAE、文本编码器和其他组件
        self.register_modules(
            vae=vae,
            text_encoder=text_encoder,
            text_encoder_2=text_encoder_2,
            tokenizer=tokenizer,
            tokenizer_2=tokenizer_2,
            unet=unet,
            scheduler=scheduler,
            image_encoder=image_encoder,
            feature_extractor=feature_extractor,
        )
        # 将配置参数注册到对象中,强制为空提示使用零
        self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
        # 计算 VAE 的缩放因子,基于输出通道的数量
        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        # 创建图像处理器,使用 VAE 缩放因子
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

        # 设置默认的采样尺寸,从 UNet 配置中获取
        self.default_sample_size = self.unet.config.sample_size

        # 如果 add_watermarker 为 None,使用可用的隐形水印设置
        add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()

        # 如果需要添加水印,则初始化水印器,否则设置为 None
        if add_watermarker:
            self.watermark = StableDiffusionXLWatermarker()
        else:
            self.watermark = None

    # 从稳定扩散管道中复制的函数,用于准备额外的步骤参数
    def prepare_extra_step_kwargs(self, generator, eta):
        # 准备调度器步骤的额外参数,因为并非所有调度器具有相同的签名
        # eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
        # eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
        # 应在 [0, 1] 范围内

        # 检查调度器是否接受 eta 参数
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
        extra_step_kwargs = {}
        if accepts_eta:
            # 如果接受,则将 eta 添加到额外参数中
            extra_step_kwargs["eta"] = eta

        # 检查调度器是否接受 generator 参数
        accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
        if accepts_generator:
            # 如果接受,则将 generator 添加到额外参数中
            extra_step_kwargs["generator"] = generator
        # 返回准备好的额外参数
        return extra_step_kwargs

    # 从稳定扩散 XL 管道中复制的函数,用于提升 VAE 精度
    def upcast_vae(self):
        # 获取 VAE 的数据类型
        dtype = self.vae.dtype
        # 将 VAE 转换为 float32 类型
        self.vae.to(dtype=torch.float32)
        # 检查是否使用 torch 2.0 或 xformers
        use_torch_2_0_or_xformers = isinstance(
            self.vae.decoder.mid_block.attentions[0].processor,
            (
                AttnProcessor2_0,
                XFormersAttnProcessor,
                FusedAttnProcessor2_0,
            ),
        )
        # 如果使用了 xformers 或 torch 2.0,则不需要将注意力块放在 float32 中,以节省大量内存
        if use_torch_2_0_or_xformers:
            # 将后量化卷积转换为原始数据类型
            self.vae.post_quant_conv.to(dtype)
            # 将解码器的输入卷积转换为原始数据类型
            self.vae.decoder.conv_in.to(dtype)
            # 将解码器的中间块转换为原始数据类型
            self.vae.decoder.mid_block.to(dtype)

    # 从稳定扩散 XL 管道中复制的函数,用于获取附加时间 ID
    # 定义一个私有方法,用于获取添加时间的 ID
    def _get_add_time_ids(
        # 输入参数:原始大小、裁剪的左上角坐标、目标大小、数据类型和文本编码器的投影维度(可选)
        self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
    ):
        # 将原始大小、裁剪坐标和目标大小合并为一个列表
        add_time_ids = list(original_size + crops_coords_top_left + target_size)

        # 计算通过时间嵌入所需的维度
        passed_add_embed_dim = (
            self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
        )
        # 获取模型预期的时间嵌入维度
        expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features

        # 检查计算得到的维度与预期是否一致
        if expected_add_embed_dim != passed_add_embed_dim:
            # 如果不一致,抛出值错误,提示用户检查配置
            raise ValueError(
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
            )

        # 将添加时间的 ID 转换为张量,并指定数据类型
        add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
        # 返回添加时间的 ID 张量
        return add_time_ids

    # 从 StableDiffusionPipeline 复制的方法,用于准备潜在的张量
    def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
        # 根据输入参数计算潜在张量的形状
        shape = (
            batch_size,
            num_channels_latents,
            int(height) // self.vae_scale_factor,
            int(width) // self.vae_scale_factor,
        )
        # 检查生成器的长度是否与批量大小一致
        if isinstance(generator, list) and len(generator) != batch_size:
            # 如果不一致,抛出值错误,提示用户检查生成器长度
            raise ValueError(
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
                f" size of {batch_size}. Make sure the batch size matches the length of the generators."
            )

        # 如果没有提供潜在张量,随机生成一个
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果提供了潜在张量,将其移动到指定设备
            latents = latents.to(device)

        # 根据调度器所需的标准差缩放初始噪声
        latents = latents * self.scheduler.init_noise_sigma
        # 返回准备好的潜在张量
        return latents

    # 检查输入参数的方法
    def check_inputs(
        # 输入参数:提示、第二个提示、高度、宽度、回调步骤以及可选的负提示和嵌入
        self,
        prompt,
        prompt_2,
        height,
        width,
        callback_steps,
        negative_prompt=None,
        negative_prompt_2=None,
        prompt_embeds=None,
        negative_prompt_embeds=None,
        pooled_prompt_embeds=None,
        negative_pooled_prompt_embeds=None,
        callback_on_step_end_tensor_inputs=None,
    # 从 StableDiffusionXLPipeline 复制的方法,用于编码提示
    # 定义一个方法用于编码提示信息
        def encode_prompt(
            # 输入的提示字符串
            self,
            prompt: str,
            # 第二个提示字符串,默认为 None
            prompt_2: Optional[str] = None,
            # 设备类型,默认为 None
            device: Optional[torch.device] = None,
            # 每个提示生成的图像数量,默认为 1
            num_images_per_prompt: int = 1,
            # 是否使用分类器自由引导,默认为 True
            do_classifier_free_guidance: bool = True,
            # 负提示字符串,默认为 None
            negative_prompt: Optional[str] = None,
            # 第二个负提示字符串,默认为 None
            negative_prompt_2: Optional[str] = None,
            # 提示的张量表示,默认为 None
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负提示的张量表示,默认为 None
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 池化后的提示张量表示,默认为 None
            pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 池化后的负提示张量表示,默认为 None
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # Lora 的缩放因子,默认为 None
            lora_scale: Optional[float] = None,
            # 跳过的 clip 数,默认为 None
            clip_skip: Optional[int] = None,
        # 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoZeroPipeline.forward_loop 复制的代码
        def forward_loop(self, x_t0, t0, t1, generator):
            """
            从 t0 到 t1 执行 DDPM 向前过程,即根据相应方差添加噪声。
    
            Args:
                x_t0:
                    t0 时刻的潜在编码。
                t0:
                    t0 时刻的时间步。
                t1:
                    t1 时刻的时间步。
                generator (`torch.Generator` 或 `List[torch.Generator]`, *可选*):
                    用于生成确定性的 [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html)。
    
            Returns:
                x_t1:
                    对 x_t0 应用从 t0 到 t1 的向前过程。
            """
            # 生成与 x_t0 相同形状的随机噪声张量
            eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
            # 计算在 t0 到 t1 之间的 alpha 向量的乘积
            alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
            # 计算向前过程结果 x_t1
            x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
            # 返回结果 x_t1
            return x_t1
    
        # 定义一个方法用于执行向后过程
        def backward_loop(
            # 潜在变量
            latents,
            # 时间步序列
            timesteps,
            # 提示的张量表示
            prompt_embeds,
            # 引导的缩放因子
            guidance_scale,
            # 回调函数
            callback,
            # 回调步骤
            callback_steps,
            # 预热步骤数量
            num_warmup_steps,
            # 额外步骤的参数
            extra_step_kwargs,
            # 添加文本嵌入
            add_text_embeds,
            # 添加时间 ID
            add_time_ids,
            # 交叉注意力的参数,默认为 None
            cross_attention_kwargs=None,
            # 引导重缩放因子,默认为 0.0
            guidance_rescale: float = 0.0,
        # 不计算梯度的装饰器
        @torch.no_grad()
    # 定义可调用对象的方法,允许使用特定参数生成视频
    def __call__(
            self,
            # 输入提示,支持字符串或字符串列表
            prompt: Union[str, List[str]],
            # 第二个提示,支持字符串或字符串列表,默认为 None
            prompt_2: Optional[Union[str, List[str]]] = None,
            # 视频长度,默认为 8 秒
            video_length: Optional[int] = 8,
            # 视频高度,默认为 None
            height: Optional[int] = None,
            # 视频宽度,默认为 None
            width: Optional[int] = None,
            # 推理步骤数,默认为 50
            num_inference_steps: int = 50,
            # 去噪结束阈值,默认为 None
            denoising_end: Optional[float] = None,
            # 引导缩放因子,默认为 7.5
            guidance_scale: float = 7.5,
            # 负提示,支持字符串或字符串列表,默认为 None
            negative_prompt: Optional[Union[str, List[str]]] = None,
            # 第二个负提示,支持字符串或字符串列表,默认为 None
            negative_prompt_2: Optional[Union[str, List[str]]] = None,
            # 每个提示生成的视频数量,默认为 1
            num_videos_per_prompt: Optional[int] = 1,
            # 额外噪声强度,默认为 0.0
            eta: float = 0.0,
            # 随机数生成器,可以是单个或多个 torch.Generator,默认为 None
            generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
            # 帧 ID 列表,默认为 None
            frame_ids: Optional[List[int]] = None,
            # 提示的嵌入,默认为 None
            prompt_embeds: Optional[torch.Tensor] = None,
            # 负提示的嵌入,默认为 None
            negative_prompt_embeds: Optional[torch.Tensor] = None,
            # 聚合提示的嵌入,默认为 None
            pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 负聚合提示的嵌入,默认为 None
            negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
            # 潜在变量,默认为 None
            latents: Optional[torch.Tensor] = None,
            # 动作场强度 X,默认为 12
            motion_field_strength_x: float = 12,
            # 动作场强度 Y,默认为 12
            motion_field_strength_y: float = 12,
            # 输出类型,默认为 "tensor"
            output_type: Optional[str] = "tensor",
            # 是否返回字典,默认为 True
            return_dict: bool = True,
            # 回调函数,接受特定参数,默认为 None
            callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
            # 每多少步骤调用一次回调,默认为 1
            callback_steps: int = 1,
            # 跨注意力的额外参数,默认为 None
            cross_attention_kwargs: Optional[Dict[str, Any]] = None,
            # 引导重标定,默认为 0.0
            guidance_rescale: float = 0.0,
            # 原始尺寸,默认为 None
            original_size: Optional[Tuple[int, int]] = None,
            # 裁剪坐标的左上角,默认为 (0, 0)
            crops_coords_top_left: Tuple[int, int] = (0, 0),
            # 目标尺寸,默认为 None
            target_size: Optional[Tuple[int, int]] = None,
            # 初始时间 t0,默认为 44
            t0: int = 44,
            # 结束时间 t1,默认为 47
            t1: int = 47,

.\diffusers\pipelines\text_to_video_synthesis\__init__.py

# 导入类型检查相关的模块
from typing import TYPE_CHECKING

# 从上层目录的 utils 模块导入所需的工具和常量
from ...utils import (
    DIFFUSERS_SLOW_IMPORT,  # 导入慢导入标志
    OptionalDependencyNotAvailable,  # 导入可选依赖未可用异常
    _LazyModule,  # 导入懒加载模块
    get_objects_from_module,  # 导入从模块获取对象的函数
    is_torch_available,  # 导入检查是否可用 PyTorch 的函数
    is_transformers_available,  # 导入检查是否可用 Transformers 的函数
)

# 初始化一个空字典用于存放虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存放导入结构
_import_structure = {}

# 尝试检查是否可用 Transformers 和 PyTorch
try:
    if not (is_transformers_available() and is_torch_available()):  # 如果两者不可用
        raise OptionalDependencyNotAvailable()  # 抛出异常
except OptionalDependencyNotAvailable:  # 捕获异常
    from ...utils import dummy_torch_and_transformers_objects  # 导入虚拟对象模块,忽略未使用警告

    # 更新虚拟对象字典
    _dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:  # 如果没有异常
    # 更新导入结构字典,添加相应的管道输出
    _import_structure["pipeline_output"] = ["TextToVideoSDPipelineOutput"]
    # 添加文本到视频合成的管道
    _import_structure["pipeline_text_to_video_synth"] = ["TextToVideoSDPipeline"]
    # 添加图像到图像的视频合成管道
    _import_structure["pipeline_text_to_video_synth_img2img"] = ["VideoToVideoSDPipeline"]
    # 添加文本到视频零样本合成管道
    _import_structure["pipeline_text_to_video_zero"] = ["TextToVideoZeroPipeline"]
    # 添加文本到视频零样本 SDXL 合成管道
    _import_structure["pipeline_text_to_video_zero_sdxl"] = ["TextToVideoZeroSDXLPipeline"]

# 如果在类型检查阶段或慢导入标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    try:
        # 再次检查是否可用 Transformers 和 PyTorch
        if not (is_transformers_available() and is_torch_available()):  # 如果两者不可用
            raise OptionalDependencyNotAvailable()  # 抛出异常
    except OptionalDependencyNotAvailable:  # 捕获异常
        from ...utils.dummy_torch_and_transformers_objects import *  # 导入虚拟对象,忽略未使用警告
    else:  # 如果没有异常
        # 从各自的模块中导入所需的管道类
        from .pipeline_output import TextToVideoSDPipelineOutput
        from .pipeline_text_to_video_synth import TextToVideoSDPipeline
        from .pipeline_text_to_video_synth_img2img import VideoToVideoSDPipeline
        from .pipeline_text_to_video_zero import TextToVideoZeroPipeline
        from .pipeline_text_to_video_zero_sdxl import TextToVideoZeroSDXLPipeline

else:  # 如果不在类型检查阶段也不需要慢导入
    import sys  # 导入系统模块

    # 将当前模块替换为懒加载模块
    sys.modules[__name__] = _LazyModule(
        __name__,  # 模块名
        globals()["__file__"],  # 文件路径
        _import_structure,  # 导入结构
        module_spec=__spec__,  # 模块规范
    )
    # 将虚拟对象字典中的对象添加到当前模块
    for name, value in _dummy_objects.items():
        setattr(sys.modules[__name__], name, value)

.\diffusers\pipelines\unclip\pipeline_unclip.py

# 版权声明,指明版权归属及许可信息
# Copyright 2024 Kakao Brain and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可协议,使用该文件需要遵循许可条款
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 许可协议的副本可以在以下网址获取
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律规定或书面同意,否则软件按“现状”提供,不提供任何形式的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 具体的许可条款可以查看许可协议
# See the License for the specific language governing permissions and
# limitations under the License.

# 导入 inspect 模块,用于获取对象的签名和源代码
import inspect
# 从 typing 模块导入类型注解
from typing import List, Optional, Tuple, Union

# 导入 torch 库以进行张量运算
import torch
# 从 torch.nn.functional 导入常用的函数接口
from torch.nn import functional as F
# 导入 CLIP 文本模型和分词器
from transformers import CLIPTextModelWithProjection, CLIPTokenizer
# 从 CLIP 模型导入文本模型输出类
from transformers.models.clip.modeling_clip import CLIPTextModelOutput

# 从当前目录导入多个模型和调度器
from ...models import PriorTransformer, UNet2DConditionModel, UNet2DModel
from ...schedulers import UnCLIPScheduler
from ...utils import logging
from ...utils.torch_utils import randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from .text_proj import UnCLIPTextProjModel

# 初始化日志记录器,用于记录运行信息
logger = logging.get_logger(__name__)  # pylint: disable=invalid-name

# 定义一个用于文本到图像生成的管道类
class UnCLIPPipeline(DiffusionPipeline):
    """
    使用 unCLIP 进行文本到图像生成的管道。

    该模型继承自 [`DiffusionPipeline`]。有关通用方法的文档请参见超类文档
    (下载、保存、在特定设备上运行等)。

    参数:
        text_encoder ([`~transformers.CLIPTextModelWithProjection`]):
            冻结的文本编码器。
        tokenizer ([`~transformers.CLIPTokenizer`]):
            用于对文本进行分词的 `CLIPTokenizer`。
        prior ([`PriorTransformer`]):
            经典的 unCLIP 先验,用于从文本嵌入近似图像嵌入。
        text_proj ([`UnCLIPTextProjModel`]):
            用于准备和组合嵌入的实用类,嵌入会在传递给解码器之前进行处理。
        decoder ([`UNet2DConditionModel`]):
            将图像嵌入反转为图像的解码器。
        super_res_first ([`UNet2DModel`]):
            超分辨率 UNet。在超分辨率扩散过程的所有步骤中使用,除了最后一步。
        super_res_last ([`UNet2DModel`]):
            超分辨率 UNet。在超分辨率扩散过程的最后一步使用。
        prior_scheduler ([`UnCLIPScheduler`]):
            在先验去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
        decoder_scheduler ([`UnCLIPScheduler`]):
            在解码器去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
        super_res_scheduler ([`UnCLIPScheduler`]):
            在超分辨率去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
    """

    # 指定在 CPU 卸载时要排除的组件
    _exclude_from_cpu_offload = ["prior"]

    # 定义先验字段,类型为 PriorTransformer
    prior: PriorTransformer
    # 定义解码器模型类型
    decoder: UNet2DConditionModel
    # 定义文本投影模型类型
    text_proj: UnCLIPTextProjModel
    # 定义文本编码器模型类型
    text_encoder: CLIPTextModelWithProjection
    # 定义分词器类型
    tokenizer: CLIPTokenizer
    # 定义超分辨率模型的第一个部分
    super_res_first: UNet2DModel
    # 定义超分辨率模型的最后一个部分
    super_res_last: UNet2DModel

    # 定义优先级调度器
    prior_scheduler: UnCLIPScheduler
    # 定义解码器调度器
    decoder_scheduler: UnCLIPScheduler
    # 定义超分辨率调度器
    super_res_scheduler: UnCLIPScheduler

    # 定义模型的CPU卸载顺序
    model_cpu_offload_seq = "text_encoder->text_proj->decoder->super_res_first->super_res_last"

    # 初始化函数,定义所需的参数
    def __init__(
        # 定义优先变换器参数
        prior: PriorTransformer,
        # 定义解码器模型参数
        decoder: UNet2DConditionModel,
        # 定义文本编码器参数
        text_encoder: CLIPTextModelWithProjection,
        # 定义分词器参数
        tokenizer: CLIPTokenizer,
        # 定义文本投影模型参数
        text_proj: UnCLIPTextProjModel,
        # 定义超分辨率模型的第一个部分参数
        super_res_first: UNet2DModel,
        # 定义超分辨率模型的最后一个部分参数
        super_res_last: UNet2DModel,
        # 定义优先级调度器参数
        prior_scheduler: UnCLIPScheduler,
        # 定义解码器调度器参数
        decoder_scheduler: UnCLIPScheduler,
        # 定义超分辨率调度器参数
        super_res_scheduler: UnCLIPScheduler,
    ):
        # 调用父类的初始化函数
        super().__init__()

        # 注册模块,存储所需的模型和调度器
        self.register_modules(
            prior=prior,
            decoder=decoder,
            text_encoder=text_encoder,
            tokenizer=tokenizer,
            text_proj=text_proj,
            super_res_first=super_res_first,
            super_res_last=super_res_last,
            prior_scheduler=prior_scheduler,
            decoder_scheduler=decoder_scheduler,
            super_res_scheduler=super_res_scheduler,
        )

    # 准备潜在变量的函数
    def prepare_latents(self, shape, dtype, device, generator, latents, scheduler):
        # 如果没有给定潜在变量,生成随机潜在变量
        if latents is None:
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
        else:
            # 如果给定的潜在变量形状与预期不符,则抛出异常
            if latents.shape != shape:
                raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
            # 将潜在变量移动到指定设备
            latents = latents.to(device)

        # 将潜在变量乘以调度器的初始噪声标准差
        latents = latents * scheduler.init_noise_sigma
        # 返回处理后的潜在变量
        return latents

    # 编码提示的函数
    def _encode_prompt(
        self,
        # 提示内容
        prompt,
        # 设备类型
        device,
        # 每个提示生成的图像数量
        num_images_per_prompt,
        # 是否执行分类器自由引导
        do_classifier_free_guidance,
        # 文本模型输出,可选
        text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None,
        # 文本注意力掩码,可选
        text_attention_mask: Optional[torch.Tensor] = None,
    @torch.no_grad()
    # 调用函数,用于生成图像
    def __call__(
        # 提示内容,可选
        prompt: Optional[Union[str, List[str]]] = None,
        # 每个提示生成的图像数量,默认为1
        num_images_per_prompt: int = 1,
        # 优先模型的推理步骤数量,默认为25
        prior_num_inference_steps: int = 25,
        # 解码器的推理步骤数量,默认为25
        decoder_num_inference_steps: int = 25,
        # 超分辨率的推理步骤数量,默认为7
        super_res_num_inference_steps: int = 7,
        # 随机数生成器,可选
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        # 优先潜在变量,可选
        prior_latents: Optional[torch.Tensor] = None,
        # 解码器潜在变量,可选
        decoder_latents: Optional[torch.Tensor] = None,
        # 超分辨率潜在变量,可选
        super_res_latents: Optional[torch.Tensor] = None,
        # 文本模型输出,可选
        text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None,
        # 文本注意力掩码,可选
        text_attention_mask: Optional[torch.Tensor] = None,
        # 优先引导尺度,默认为4.0
        prior_guidance_scale: float = 4.0,
        # 解码器引导尺度,默认为8.0
        decoder_guidance_scale: float = 8.0,
        # 输出类型,默认为"pil"
        output_type: Optional[str] = "pil",
        # 是否返回字典格式,默认为True
        return_dict: bool = True,