diffusers-源码解析-五十三-
diffusers 源码解析(五十三)
.\diffusers\pipelines\t2i_adapter\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查的特殊常量
from typing import TYPE_CHECKING
# 从 utils 模块导入一系列工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢速导入的标志
OptionalDependencyNotAvailable, # 可选依赖项不可用的异常
_LazyModule, # 懒加载模块的类
get_objects_from_module, # 从模块中获取对象的函数
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 初始化一个空字典,用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典,用于存储导入结构
_import_structure = {}
# 尝试检查是否可用必要的依赖项
try:
if not (is_transformers_available() and is_torch_available()): # 如果 Transformers 和 PyTorch 不可用
raise OptionalDependencyNotAvailable() # 抛出可选依赖项不可用异常
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从 utils 模块导入虚拟对象,避免抛出错误
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典,获取虚拟对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果依赖项可用,更新导入结构,添加 StableDiffusionAdapterPipeline
_import_structure["pipeline_stable_diffusion_adapter"] = ["StableDiffusionAdapterPipeline"]
# 更新导入结构,添加 StableDiffusionXLAdapterPipeline
_import_structure["pipeline_stable_diffusion_xl_adapter"] = ["StableDiffusionXLAdapterPipeline"]
# 如果正在进行类型检查或慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 检查是否可用必要的依赖项
if not (is_transformers_available() and is_torch_available()): # 如果 Transformers 和 PyTorch 不可用
raise OptionalDependencyNotAvailable() # 抛出可选依赖项不可用异常
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从虚拟对象模块导入所有内容,避免抛出错误
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
# 如果依赖项可用,导入 StableDiffusionAdapterPipeline
from .pipeline_stable_diffusion_adapter import StableDiffusionAdapterPipeline
# 导入 StableDiffusionXLAdapterPipeline
from .pipeline_stable_diffusion_xl_adapter import StableDiffusionXLAdapterPipeline
else:
# 如果不是类型检查或慢速导入,使用懒加载模块
import sys
# 将当前模块替换为懒加载模块
sys.modules[__name__] = _LazyModule(
__name__, # 模块名称
globals()["__file__"], # 模块文件路径
_import_structure, # 模块导入结构
module_spec=__spec__, # 模块规格
)
# 遍历虚拟对象字典,设置当前模块的属性
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value) # 设置属性
.\diffusers\pipelines\text_to_video_synthesis\pipeline_output.py
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入 List 和 Union 类型
from typing import List, Union
# 导入 numpy 库并简写为 np
import numpy as np
# 导入 PIL 库,用于图像处理
import PIL
# 导入 PyTorch 库
import torch
# 从上级模块的 utils 导入 BaseOutput 类
from ...utils import (
BaseOutput,
)
# 定义一个数据类 TextToVideoSDPipelineOutput 继承自 BaseOutput
@dataclass
class TextToVideoSDPipelineOutput(BaseOutput):
"""
文本到视频管道的输出类。
参数:
frames (`torch.Tensor`, `np.ndarray` 或 List[List[PIL.Image.Image]]):
视频输出的列表 - 可以是长度为 `batch_size` 的嵌套列表,
每个子列表包含去噪后的
PIL 图像序列,长度为 `num_frames`。也可以是形状为
`(batch_size, num_frames, channels, height, width)` 的 NumPy 数组或 Torch 张量
"""
# 定义一个 frames 属性,可以是 Torch 张量、NumPy 数组或嵌套的 PIL 图像列表
frames: Union[torch.Tensor, np.ndarray, List[List[PIL.Image.Image]]]
.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_synth.py
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# 该文件受 Apache 2.0 许可证保护,使用需遵循该许可证
# you may not use this file except in compliance with the License.
# 只有在遵循许可证的情况下,才能使用此文件
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# 可在此处获取许可证的副本
#
# Unless required by applicable law or agreed to in writing, software
# 除非法律要求或书面协议,软件
# distributed under the License is distributed on an "AS IS" BASIS,
# 按 "原样" 方式分发,不提供任何保证或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 不提供任何明示或暗示的保证或条件
# See the License for the specific language governing permissions and
# 查看许可证以了解特定权限和
# limitations under the License.
# 限制条件
import inspect # 导入 inspect 模块,用于获取对象的成员信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型注释相关的类
import torch # 导入 PyTorch 库,进行张量操作和深度学习
from transformers import CLIPTextModel, CLIPTokenizer # 从 transformers 库导入 CLIP 模型和分词器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器混合类
from ...models import AutoencoderKL, UNet3DConditionModel # 导入模型类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LoRA 权重的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入调度器类
from ...utils import ( # 导入工具函数和常量
USE_PEFT_BACKEND, # 表示是否使用 PEFT 后端的常量
deprecate, # 用于标记过时的功能
logging, # 导入日志记录模块
replace_example_docstring, # 用于替换示例文档字符串的函数
scale_lora_layers, # 用于缩放 LoRA 层的函数
unscale_lora_layers, # 用于还原 LoRA 层的函数
)
from ...utils.torch_utils import randn_tensor # 导入用于生成随机张量的函数
from ...video_processor import VideoProcessor # 导入视频处理器类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和稳定扩散混合类
from . import TextToVideoSDPipelineOutput # 导入文本到视频生成管道的输出类
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,便于记录日志信息
EXAMPLE_DOC_STRING = """ # 定义一个示例文档字符串,提供使用示例
Examples:
```py # Python 代码块标记
>>> import torch # 导入 PyTorch 库
>>> from diffusers import TextToVideoSDPipeline # 从 diffusers 库导入文本到视频管道
>>> from diffusers.utils import export_to_video # 导入视频导出工具
>>> pipe = TextToVideoSDPipeline.from_pretrained( # 从预训练模型加载管道
... "damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16, variant="fp16" # 指定模型名称和数据类型
... )
>>> pipe.enable_model_cpu_offload() # 启用模型的 CPU 内存卸载
>>> prompt = "Spiderman is surfing" # 定义生成视频的提示语
>>> video_frames = pipe(prompt).frames[0] # 生成视频帧
>>> video_path = export_to_video(video_frames) # 导出生成的视频帧为视频文件
>>> video_path # 输出视频文件路径
```py
"""
class TextToVideoSDPipeline( # 定义文本到视频生成管道类
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin # 继承多个混合类
):
r""" # 开始类的文档字符串
Pipeline for text-to-video generation. # 文档说明:用于文本到视频生成的管道
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
该模型继承自 `DiffusionPipeline`。查阅超类文档以获取通用方法的说明
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
实现了所有管道的通用方法(下载、保存、在特定设备上运行等)
The pipeline also inherits the following loading methods:
该管道还继承了以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
# 文档字符串,描述构造函数的参数及其用途
Args:
vae ([`AutoencoderKL`]):
# 变分自编码器模型,用于将图像编码和解码为潜在表示
text_encoder ([`CLIPTextModel`]):
# 冻结的文本编码器模型,用于处理文本数据
tokenizer (`CLIPTokenizer`):
# 用于将文本标记化的 CLIP 标记器
unet ([`UNet3DConditionModel`]):
# UNet 模型,用于对编码的视频潜在空间进行去噪
scheduler ([`SchedulerMixin`]):
# 与 UNet 结合使用的调度器,用于去噪编码的图像潜在空间,可以是多种调度器之一
"""
# 定义模型的计算顺序,依次为文本编码器、UNet 和 VAE
model_cpu_offload_seq = "text_encoder->unet->vae"
# 初始化函数,接受多个模型参数
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet3DConditionModel,
scheduler: KarrasDiffusionSchedulers,
):
# 调用父类的初始化方法
super().__init__()
# 注册各个模型模块
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 初始化视频处理器,不进行缩放
self.video_processor = VideoProcessor(do_resize=False, vae_scale_factor=self.vae_scale_factor)
# 定义私有方法,用于编码输入的提示
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
**kwargs,
):
# 定义弃用消息,提醒用户此方法将在未来版本中删除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用弃用警告函数
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用编码提示的方法,获取提示的嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt,
device=device,
num_images_per_prompt=num_images_per_prompt,
do_classifier_free_guidance=do_classifier_free_guidance,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
lora_scale=lora_scale,
**kwargs,
)
# 连接嵌入以用于向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回最终的提示嵌入
return prompt_embeds
# 从稳定扩散管道中复制的方法,编码提示
# 编码提示信息和相关参数以生成图像
def encode_prompt(
self,
prompt, # 用户输入的提示文本
device, # 设备类型(如CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的LoRA缩放因子
clip_skip: Optional[int] = None, # 可选的剪切跳过参数
# 解码潜在空间中的数据以生成图像
def decode_latents(self, latents):
# 使用缩放因子调整潜在数据
latents = 1 / self.vae.config.scaling_factor * latents
# 获取批次大小、通道数、帧数、高度和宽度
batch_size, channels, num_frames, height, width = latents.shape
# 重新排列潜在数据的维度,以适应解码器输入
latents = latents.permute(0, 2, 1, 3, 4).reshape(batch_size * num_frames, channels, height, width)
# 解码潜在数据以生成图像
image = self.vae.decode(latents).sample
# 重塑图像以形成视频格式
video = image[None, :].reshape((batch_size, num_frames, -1) + image.shape[2:]).permute(0, 2, 1, 3, 4)
# 转换数据类型为float32,以确保与bfloat16兼容
video = video.float()
# 返回生成的视频数据
return video
# 准备额外的步骤参数,以供调度器使用
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数
# eta (η) 仅在DDIMScheduler中使用,其他调度器将忽略
# eta对应于DDIM论文中的η,取值范围应在[0, 1]之间
# 检查调度器是否接受eta参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
# 如果接受eta,添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受generator参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受generator,添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 检查输入参数的有效性
def check_inputs(
self,
prompt, # 用户输入的提示文本
height, # 生成图像的高度
width, # 生成图像的宽度
callback_steps, # 回调步骤的数量
negative_prompt=None, # 可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选的回调张量输入
# 结束前面的代码块
):
# 检查高度和宽度是否能被8整除,若不能则抛出异常
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步数是否为正整数,若不是则抛出异常
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调结束时的张量输入是否都在指定的输入中,若不在则抛出异常
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了prompt和prompt_embeds,若是则抛出异常
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否都没有提供prompt和prompt_embeds,若是则抛出异常
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查prompt的类型是否为字符串或列表,若不是则抛出异常
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了negative_prompt和negative_prompt_embeds,若是则抛出异常
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查prompt_embeds和negative_prompt_embeds的形状是否一致,若不一致则抛出异常
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 定义准备潜变量的方法,接受多个参数
def prepare_latents(
self, batch_size, num_channels_latents, num_frames, height, width, dtype, device, generator, latents=None
):
# 定义形状元组,包括批次大小、通道数、帧数、高度和宽度的缩放
shape = (
batch_size,
num_channels_latents,
num_frames,
height // self.vae_scale_factor,
width // self.vae_scale_factor,
)
# 检查生成器是否为列表且其长度与批次大小不匹配,若不匹配则抛出错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量为空,则生成新的随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将给定的潜在变量转移到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 禁用梯度计算以提高性能
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 定义输入参数,包括提示、图像尺寸、帧数等
self,
prompt: Union[str, List[str]] = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_frames: int = 16,
num_inference_steps: int = 50,
guidance_scale: float = 9.0,
negative_prompt: Optional[Union[str, List[str]]] = None,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
output_type: Optional[str] = "np",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: int = 1,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
clip_skip: Optional[int] = None,
.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_synth_img2img.py
# 版权声明,标明此文件的版权归 HuggingFace 团队所有
#
# 根据 Apache License 2.0 版本(“许可证”)授权;
# 除非遵守该许可证,否则您不能使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,按照许可证分发的软件按“原样”提供,
# 不附带任何形式的明示或暗示的担保或条件。
# 有关特定语言管理权限和
# 限制的更多信息,请参见许可证。
# 导入 inspect 模块以进行对象检查和获取信息
import inspect
# 从 typing 模块导入多种类型,用于类型提示
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 torch 库,用于深度学习相关操作
import torch
# 从 transformers 库导入 CLIP 模型和标记器
from transformers import CLIPTextModel, CLIPTokenizer
# 从 loaders 模块导入所需的混合类
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 从 models 模块导入自动编码器和条件 UNet 模型
from ...models import AutoencoderKL, UNet3DConditionModel
# 从 lora 模块导入调整 LORA 规模的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 从 schedulers 模块导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从 utils 模块导入多个实用工具函数和常量
from ...utils import (
USE_PEFT_BACKEND, # 用于指定是否使用 PEFT 后端
deprecate, # 用于标记已弃用的函数
logging, # 用于日志记录
replace_example_docstring, # 用于替换示例文档字符串
scale_lora_layers, # 用于缩放 LORA 层
unscale_lora_layers, # 用于反缩放 LORA 层
)
# 从 torch_utils 模块导入生成随机张量的函数
from ...utils.torch_utils import randn_tensor
# 导入视频处理器类
from ...video_processor import VideoProcessor
# 从 pipeline_utils 模块导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入文本到视频的稳定扩散管道输出类
from . import TextToVideoSDPipelineOutput
# 获取当前模块的日志记录器实例
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串的模板,可能用于生成文档
EXAMPLE_DOC_STRING = """
Examples:
```py
# 导入所需的库
>>> import torch
>>> from diffusers import DiffusionPipeline, DPMSolverMultistepScheduler
>>> from diffusers.utils import export_to_video
# 从预训练模型加载扩散管道,并指定数据类型为 float16
>>> pipe = DiffusionPipeline.from_pretrained("cerspense/zeroscope_v2_576w", torch_dtype=torch.float16)
# 设置调度器为多步 DPM 解决器,使用管道当前调度器的配置
>>> pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
# 将模型移动到 CUDA 设备以加速计算
>>> pipe.to("cuda")
# 定义生成视频的提示文本
>>> prompt = "spiderman running in the desert"
# 使用提示生成视频帧,设置推理步数、高度、宽度和帧数
>>> video_frames = pipe(prompt, num_inference_steps=40, height=320, width=576, num_frames=24).frames[0]
# 导出低分辨率视频并保存到指定路径
>>> # safe low-res video
>>> video_path = export_to_video(video_frames, output_video_path="./video_576_spiderman.mp4")
# 将文本到图像的模型移回 CPU
>>> # let's offload the text-to-image model
>>> pipe.to("cpu")
# 重新加载图像到图像的模型
>>> # and load the image-to-image model
>>> pipe = DiffusionPipeline.from_pretrained(
... "cerspense/zeroscope_v2_XL", torch_dtype=torch.float16, revision="refs/pr/15"
... )
# 设置调度器为多步 DPM 解决器,使用新的调度器配置
>>> pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
# 启用模型的 CPU 卸载,以节省内存
>>> pipe.enable_model_cpu_offload()
# VAE 占用大量内存,确保以切片模式运行以降低内存使用
>>> # The VAE consumes A LOT of memory, let's make sure we run it in sliced mode
>>> pipe.vae.enable_slicing()
# 将视频帧上采样到更高的分辨率
>>> # now let's upscale it
>>> video = [Image.fromarray(frame).resize((1024, 576)) for frame in video_frames]
# 使用生成的提示和上采样的视频帧进行去噪处理
>>> # and denoise it
>>> video_frames = pipe(prompt, video=video, strength=0.6).frames[0]
# 导出去噪后的视频并保存到指定路径
>>> video_path = export_to_video(video_frames, output_video_path="./video_1024_spiderman.mp4")
# 返回最终视频的路径
>>> video_path
导入所需的模块和类型
"""
从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
# 输入参数:编码器输出,类型为 torch.Tensor
encoder_output: torch.Tensor,
# 可选的随机数生成器
generator: Optional[torch.Generator] = None,
# 采样模式,默认为 "sample"
sample_mode: str = "sample"
):
# 检查编码器输出是否具有 "latent_dist" 属性,并且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 返回从潜在分布中采样的结果
return encoder_output.latent_dist.sample(generator)
# 检查编码器输出是否具有 "latent_dist" 属性,并且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回潜在分布的众数
return encoder_output.latent_dist.mode()
# 检查编码器输出是否具有 "latents" 属性
elif hasattr(encoder_output, "latents"):
# 返回编码器输出的潜在值
return encoder_output.latents
# 如果都不满足,抛出属性错误
else:
raise AttributeError("Could not access latents of provided encoder_output")
定义 VideoToVideoSDPipeline 类,继承多个混入类
class VideoToVideoSDPipeline(
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin
):
r"""
用于文本引导的视频到视频生成的管道。
此模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
该管道还继承了以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反演嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
参数:
vae ([`AutoencoderKL`]):
变分自编码器(VAE)模型,用于将视频编码和解码为潜在表示。
text_encoder ([`CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer (`CLIPTokenizer`):
[`~transformers.CLIPTokenizer`] 用于对文本进行标记化。
unet ([`UNet3DConditionModel`]):
[`UNet3DConditionModel`] 用于去噪编码的视频潜在值。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码的图像潜在值。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 初始化函数,设置各个组件
def __init__(
# 输入参数:变分自编码器
vae: AutoencoderKL,
# 输入参数:文本编码器
text_encoder: CLIPTextModel,
# 输入参数:标记器
tokenizer: CLIPTokenizer,
# 输入参数:去噪模型
unet: UNet3DConditionModel,
# 输入参数:调度器
scheduler: KarrasDiffusionSchedulers,
):
# 调用父类的初始化方法
super().__init__()
# 注册模型模块
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 初始化视频处理器,禁用调整大小并使用 VAE 缩放因子
self.video_processor = VideoProcessor(do_resize=False, vae_scale_factor=self.vae_scale_factor)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的
def _encode_prompt(
self, # 当前实例的引用
prompt, # 要编码的提示文本
device, # 设备类型(例如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否进行无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 Lora 缩放因子
**kwargs, # 其他可选参数
):
# 定义弃用消息,告知用户此函数在未来版本中将被移除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数,记录弃用信息
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法获取提示嵌入的元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 传递提示文本
device=device, # 传递设备信息
num_images_per_prompt=num_images_per_prompt, # 传递每个提示的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 传递无分类器引导信息
negative_prompt=negative_prompt, # 传递负面提示文本
prompt_embeds=prompt_embeds, # 传递提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 传递负面提示嵌入
lora_scale=lora_scale, # 传递 Lora 缩放因子
**kwargs, # 传递其他参数
)
# 将提示嵌入元组中的两个元素连接起来以兼容旧版本
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的
def encode_prompt(
self, # 当前实例的引用
prompt, # 要编码的提示文本
device, # 设备类型(例如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否进行无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 Lora 缩放因子
clip_skip: Optional[int] = None, # 可选的剪辑跳过参数
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_synth.TextToVideoSDPipeline.decode_latents 复制的
def decode_latents(self, latents): # 解码潜在变量的方法
# 按照 VAE 配置的缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 获取潜在变量的形状,分别表示批量大小、通道数、帧数、高度和宽度
batch_size, channels, num_frames, height, width = latents.shape
# 调整潜在变量的维度,以便于解码
latents = latents.permute(0, 2, 1, 3, 4).reshape(batch_size * num_frames, channels, height, width)
# 使用 VAE 解码潜在变量并获取样本
image = self.vae.decode(latents).sample
# 将图像调整为视频格式,包含批量大小和帧数
video = image[None, :].reshape((batch_size, num_frames, -1) + image.shape[2:]).permute(0, 2, 1, 3, 4)
# 始终将视频转换为 float32 格式,以确保兼容性
video = video.float()
# 返回处理后的 video
return video
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的
# 定义准备额外参数的函数,用于调度器步骤
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因不同调度器的参数签名不同
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
# eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
# eta 应在 [0, 1] 范围内
# 检查调度器的步骤函数是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建一个空字典以存储额外参数
extra_step_kwargs = {}
# 如果调度器接受 eta,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤函数是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 定义检查输入参数的函数
def check_inputs(
self,
prompt, # 输入提示文本
strength, # 强度参数
callback_steps, # 回调步骤数
negative_prompt=None, # 可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选的步骤结束时的张量输入
):
# 检查 strength 的值是否在有效范围 [0.0, 1.0] 之间
if strength < 0 or strength > 1:
# 如果不在范围内,抛出 ValueError 异常
raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")
# 检查 callback_steps 是否为正整数
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 如果不是,抛出 ValueError 异常
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查 callback_on_step_end_tensor_inputs 是否在 _callback_tensor_inputs 中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有不在的项,抛出 ValueError 异常
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了 prompt 和 prompt_embeds
if prompt is not None and prompt_embeds is not None:
# 如果同时提供,抛出 ValueError 异常
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都未定义
elif prompt is None and prompt_embeds is None:
# 如果都未定义,抛出 ValueError 异常
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 的类型是否正确
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 如果类型不正确,抛出 ValueError 异常
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了 negative_prompt 和 negative_prompt_embeds
if negative_prompt is not None and negative_prompt_embeds is not None:
# 如果同时提供,抛出 ValueError 异常
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 的形状是否相同
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 如果形状不匹配,抛出 ValueError 异常
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img 中复制的函数
# 定义获取时间步长的方法,输入为推理步数、强度和设备
def get_timesteps(self, num_inference_steps, strength, device):
# 计算初始时间步长,取最小值以确保不超过总推理步数
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算开始的时间步长,确保不小于0
t_start = max(num_inference_steps - init_timestep, 0)
# 从调度器获取时间步长,从t_start开始,按照调度器的顺序切片
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果调度器有设置开始索引的方法,则设置为t_start
if hasattr(self.scheduler, "set_begin_index"):
self.scheduler.set_begin_index(t_start * self.scheduler.order)
# 返回时间步长和有效推理步数
return timesteps, num_inference_steps - t_start
# 定义准备潜在表示的方法,输入为视频、时间步长、批大小、数据类型和设备
def prepare_latents(self, video, timestep, batch_size, dtype, device, generator=None):
# 将视频数据转移到指定的设备并转换数据类型
video = video.to(device=device, dtype=dtype)
# 改变视频的形状从 (b, c, f, h, w) 到 (b * f, c, w, h)
bsz, channel, frames, width, height = video.shape
video = video.permute(0, 2, 1, 3, 4).reshape(bsz * frames, channel, width, height)
# 如果视频有4个通道,则初始化潜在表示为视频本身
if video.shape[1] == 4:
init_latents = video
else:
# 检查生成器是否是列表且其长度与批大小不匹配,抛出异常
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果生成器是列表,则逐个处理视频并获取潜在表示
elif isinstance(generator, list):
init_latents = [
retrieve_latents(self.vae.encode(video[i : i + 1]), generator=generator[i])
for i in range(batch_size)
]
# 将所有潜在表示沿着第0维度连接
init_latents = torch.cat(init_latents, dim=0)
else:
# 如果生成器不是列表,直接处理整个视频获取潜在表示
init_latents = retrieve_latents(self.vae.encode(video), generator=generator)
# 对潜在表示进行缩放
init_latents = self.vae.config.scaling_factor * init_latents
# 检查批大小是否大于潜在表示的数量且不可整除,抛出异常
if batch_size > init_latents.shape[0] and batch_size % init_latents.shape[0] != 0:
raise ValueError(
f"Cannot duplicate `video` of batch size {init_latents.shape[0]} to {batch_size} text prompts."
)
else:
# 将潜在表示扩展为批大小
init_latents = torch.cat([init_latents], dim=0)
# 获取潜在表示的形状
shape = init_latents.shape
# 生成噪声张量,用于添加到潜在表示中
noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 获取潜在表示,添加噪声
init_latents = self.scheduler.add_noise(init_latents, noise, timestep)
latents = init_latents
# 重新调整潜在表示的形状以匹配原始视频的维度
latents = latents[None, :].reshape((bsz, frames, latents.shape[1]) + latents.shape[2:]).permute(0, 2, 1, 3, 4)
# 返回处理后的潜在表示
return latents
# 关闭梯度计算以节省内存
@torch.no_grad()
# 替换示例文档字符串为指定文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的方法,支持多种输入参数
def __call__(
# 输入提示,可以是单个字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 输入视频,可以是图像数组列表或张量
video: Union[List[np.ndarray], torch.Tensor] = None,
# 强度参数,默认值为0.6,控制某些特性
strength: float = 0.6,
# 推理步骤数量,默认为50,影响生成质量
num_inference_steps: int = 50,
# 引导尺度,默认值为15.0,控制生成的引导强度
guidance_scale: float = 15.0,
# 负面提示,可以是单个字符串或字符串列表,用于排除不想要的内容
negative_prompt: Optional[Union[str, List[str]]] = None,
# 额外参数,控制噪声,默认值为0.0
eta: float = 0.0,
# 随机生成器,可选,控制随机性
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在张量,可选,影响生成过程
latents: Optional[torch.Tensor] = None,
# 提示嵌入张量,可选,直接使用预处理过的提示
prompt_embeds: Optional[torch.Tensor] = None,
# 负面提示嵌入张量,可选,直接使用预处理过的负面提示
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,可选,默认为"np",指定返回格式
output_type: Optional[str] = "np",
# 是否返回字典格式,默认为True
return_dict: bool = True,
# 可选的回调函数,允许在特定步骤执行自定义逻辑
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调执行的步骤间隔,默认为1
callback_steps: int = 1,
# 交叉注意力的额外参数,可选,用于细化生成过程
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 可选,跳过的剪辑步骤
clip_skip: Optional[int] = None,
# `.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_zero.py`
```py
# 导入所需模块和库
import copy # 复制对象的模块
import inspect # 检查对象的模块
from dataclasses import dataclass # 数据类装饰器
from typing import Callable, List, Optional, Union # 类型提示相关
import numpy as np # 数组处理库
import PIL.Image # 图像处理库
import torch # 深度学习框架
import torch.nn.functional as F # 神经网络功能模块
from torch.nn.functional import grid_sample # 网格采样函数
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer # 处理CLIP模型的类
from ...image_processor import VaeImageProcessor # VAE图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 加载器混合类
from ...models import AutoencoderKL, UNet2DConditionModel # 模型类
from ...models.lora import adjust_lora_scale_text_encoder # 调整LoRA比例的函数
from ...schedulers import KarrasDiffusionSchedulers # Karras扩散调度器
from ...utils import USE_PEFT_BACKEND, BaseOutput, logging, scale_lora_layers, unscale_lora_layers # 实用工具
from ...utils.torch_utils import randn_tensor # 随机张量生成工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 管道工具类
from ..stable_diffusion import StableDiffusionSafetyChecker # 稳定扩散安全检查器
logger = logging.get_logger(__name__) # 创建模块日志记录器,禁用pylint命名检查
def rearrange_0(tensor, f):
F, C, H, W = tensor.size() # 解构张量维度
tensor = torch.permute(torch.reshape(tensor, (F // f, f, C, H, W)), (0, 2, 1, 3, 4)) # 重排列和调整张量形状
return tensor # 返回调整后的张量
def rearrange_1(tensor):
B, C, F, H, W = tensor.size() # 解构张量维度
return torch.reshape(torch.permute(tensor, (0, 2, 1, 3, 4)), (B * F, C, H, W)) # 重排列并调整形状
def rearrange_3(tensor, f):
F, D, C = tensor.size() # 解构张量维度
return torch.reshape(tensor, (F // f, f, D, C)) # 调整张量形状
def rearrange_4(tensor):
B, F, D, C = tensor.size() # 解构张量维度
return torch.reshape(tensor, (B * F, D, C)) # 调整张量形状
class CrossFrameAttnProcessor:
"""
跨帧注意力处理器。每帧关注第一帧。
Args:
batch_size: 表示实际批大小的数字,而不是帧数。
例如,使用单个提示和num_images_per_prompt=1调用unet时,batch_size应等于
2,因为使用了无分类器引导。
"""
def __init__(self, batch_size=2):
self.batch_size = batch_size # 初始化批大小
# 定义可调用方法,处理注意力机制
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
# 获取批次大小、序列长度及其维度
batch_size, sequence_length, _ = hidden_states.shape
# 准备注意力掩码,以适应批次和序列长度
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# 将隐藏状态转换为查询向量
query = attn.to_q(hidden_states)
# 判断是否进行交叉注意力
is_cross_attention = encoder_hidden_states is not None
# 如果没有编码器隐藏状态,则使用隐藏状态本身
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
# 如果需要归一化交叉注意力的隐藏状态
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将编码器隐藏状态转换为键和值
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# 处理交叉帧注意力
if not is_cross_attention:
# 计算视频长度和第一个帧索引
video_length = key.size()[0] // self.batch_size
first_frame_index = [0] * video_length
# 重新排列键,使批次和帧在前两个维度
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# 重新排列值,使批次和帧在前两个维度
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# 重新排列回原始形状
key = rearrange_4(key)
value = rearrange_4(value)
# 将查询、键和值转换为批次维度
query = attn.head_to_batch_dim(query)
key = attn.head_to_batch_dim(key)
value = attn.head_to_batch_dim(value)
# 计算注意力分数
attention_probs = attn.get_attention_scores(query, key, attention_mask)
# 应用注意力分数于值,得到隐藏状态
hidden_states = torch.bmm(attention_probs, value)
# 将隐藏状态转换回头部维度
hidden_states = attn.batch_to_head_dim(hidden_states)
# 线性投影
hidden_states = attn.to_out[0](hidden_states)
# 应用 dropout
hidden_states = attn.to_out[1](hidden_states)
# 返回最终的隐藏状态
return hidden_states
# 定义一个名为 CrossFrameAttnProcessor2_0 的类
class CrossFrameAttnProcessor2_0:
"""
Cross frame attention processor with scaled_dot_product attention of Pytorch 2.0.
Args:
batch_size: The number that represents actual batch size, other than the frames.
For example, calling unet with a single prompt and num_images_per_prompt=1, batch_size should be equal to
2, due to classifier-free guidance.
"""
# 初始化方法,设置默认的批次大小为 2
def __init__(self, batch_size=2):
# 检查 F 是否具有 scaled_dot_product_attention 属性
if not hasattr(F, "scaled_dot_product_attention"):
# 如果没有,抛出 ImportError,提示需要升级到 PyTorch 2.0
raise ImportError("AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
# 将传入的批次大小赋值给实例变量 self.batch_size
self.batch_size = batch_size
# 定义可调用对象的方法,接受注意力、隐藏状态和可选的编码器隐藏状态、注意力掩码
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
# 获取批次大小和序列长度,使用编码器隐藏状态的形状或隐藏状态的形状
batch_size, sequence_length, _ = (
hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
)
# 获取隐藏状态的内部维度
inner_dim = hidden_states.shape[-1]
# 如果提供了注意力掩码
if attention_mask is not None:
# 准备注意力掩码,调整形状以匹配序列长度和批次大小
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# scaled_dot_product_attention 期望注意力掩码的形状为 (batch, heads, source_length, target_length)
attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])
# 将隐藏状态转换为查询
query = attn.to_q(hidden_states)
# 检查是否为交叉注意力
is_cross_attention = encoder_hidden_states is not None
# 如果没有提供编码器隐藏状态,则使用隐藏状态
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
# 如果需要,对编码器隐藏状态进行归一化
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将编码器隐藏状态转换为键和值
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# 处理交叉帧注意力
if not is_cross_attention:
# 计算视频长度并初始化第一个帧索引
video_length = max(1, key.size()[0] // self.batch_size)
first_frame_index = [0] * video_length
# 重新排列键,使批次和帧位于第1和第2维
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# 重新排列值,使批次和帧位于第1和第2维
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# 重新排列回原始形状
key = rearrange_4(key)
value = rearrange_4(value)
# 计算每个头的维度
head_dim = inner_dim // attn.heads
# 调整查询的形状并转置维度
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
# 调整键的形状并转置维度
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
# 调整值的形状并转置维度
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
# 执行缩放点积注意力,输出形状为 (batch, num_heads, seq_len, head_dim)
# TODO: 当我们迁移到 Torch 2.1 时,添加对 attn.scale 的支持
hidden_states = F.scaled_dot_product_attention(
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
)
# 转置输出并调整形状以合并头的维度
hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
# 将隐藏状态转换为查询的 dtype
hidden_states = hidden_states.to(query.dtype)
# 应用线性投影
hidden_states = attn.to_out[0](hidden_states)
# 应用 dropout
hidden_states = attn.to_out[1](hidden_states)
# 返回最终的隐藏状态
return hidden_states
# 定义一个数据类,用于表示零-shot文本到视频的输出
@dataclass
class TextToVideoPipelineOutput(BaseOutput):
r"""
输出类用于零-shot文本到视频管道。
参数:
images (`[List[PIL.Image.Image]`, `np.ndarray`]):
长度为`batch_size`的去噪PIL图像列表或形状为`(batch_size, height, width, num_channels)`的NumPy数组。
nsfw_content_detected (`[List[bool]]`):
列表,指示相应生成的图像是否包含“不安全内容”(nsfw),如果安全检查无法执行则为`None`。
"""
# 图像字段,支持PIL图像列表或NumPy数组
images: Union[List[PIL.Image.Image], np.ndarray]
# 检测到的NSFW内容列表
nsfw_content_detected: Optional[List[bool]]
# 定义一个函数,用于生成坐标网格
def coords_grid(batch, ht, wd, device):
# 从指定高度和宽度生成网格坐标
coords = torch.meshgrid(torch.arange(ht, device=device), torch.arange(wd, device=device))
# 将生成的坐标堆叠并转换为浮点数
coords = torch.stack(coords[::-1], dim=0).float()
# 将坐标扩展到指定批量大小
return coords[None].repeat(batch, 1, 1, 1)
# 定义一个函数,用于根据给定的光流变形单帧的潜在编码
def warp_single_latent(latent, reference_flow):
"""
使用给定的光流变形单帧的潜在编码
参数:
latent: 单帧的潜在编码
reference_flow: 用于变形潜在编码的光流
返回:
warped: 变形后的潜在编码
"""
# 获取参考光流的高度和宽度
_, _, H, W = reference_flow.size()
# 获取潜在编码的高度和宽度
_, _, h, w = latent.size()
# 生成坐标网格
coords0 = coords_grid(1, H, W, device=latent.device).to(latent.dtype)
# 将光流添加到坐标上
coords_t0 = coords0 + reference_flow
# 归一化坐标
coords_t0[:, 0] /= W
coords_t0[:, 1] /= H
# 将坐标缩放到[-1, 1]范围
coords_t0 = coords_t0 * 2.0 - 1.0
# 使用双线性插值调整坐标大小
coords_t0 = F.interpolate(coords_t0, size=(h, w), mode="bilinear")
# 重新排列坐标的维度
coords_t0 = torch.permute(coords_t0, (0, 2, 3, 1))
# 根据坐标样本获取变形后的潜在编码
warped = grid_sample(latent, coords_t0, mode="nearest", padding_mode="reflection")
return warped
# 定义一个函数,用于创建平移运动场
def create_motion_field(motion_field_strength_x, motion_field_strength_y, frame_ids, device, dtype):
"""
创建平移运动场
参数:
motion_field_strength_x: x轴的运动强度
motion_field_strength_y: y轴的运动强度
frame_ids: 正在处理的潜在帧的索引。
在进行分块推理时需要此信息
device: 设备
dtype: 数据类型
返回:
"""
# 获取帧的数量
seq_length = len(frame_ids)
# 创建一个全零的参考光流张量
reference_flow = torch.zeros((seq_length, 2, 512, 512), device=device, dtype=dtype)
# 遍历每一帧,生成对应的运动场
for fr_idx in range(seq_length):
reference_flow[fr_idx, 0, :, :] = motion_field_strength_x * (frame_ids[fr_idx])
reference_flow[fr_idx, 1, :, :] = motion_field_strength_y * (frame_ids[fr_idx])
return reference_flow
# 定义一个函数,用于创建运动场并相应地变形潜在编码
def create_motion_field_and_warp_latents(motion_field_strength_x, motion_field_strength_y, frame_ids, latents):
"""
创建平移运动并相应地变形潜在编码
# 参数说明
Args:
motion_field_strength_x: motion strength along x-axis # 表示x轴上的运动强度
motion_field_strength_y: motion strength along y-axis # 表示y轴上的运动强度
frame_ids: indexes of the frames the latents of which are being processed. # 当前处理的帧的索引
This is needed when we perform chunk-by-chunk inference # 进行分块推理时需要使用
latents: latent codes of frames # 帧的潜在代码
Returns:
warped_latents: warped latents # 返回经过变形处理的潜在代码
"""
# 创建运动场,结合x轴和y轴的运动强度
motion_field = create_motion_field(
motion_field_strength_x=motion_field_strength_x, # 传入x轴运动强度
motion_field_strength_y=motion_field_strength_y, # 传入y轴运动强度
frame_ids=frame_ids, # 传入帧索引
device=latents.device, # 使用潜在代码的设备信息
dtype=latents.dtype, # 使用潜在代码的数据类型
)
# 克隆潜在代码,以便进行后续的变形处理
warped_latents = latents.clone().detach()
# 遍历每个变形后的潜在代码
for i in range(len(warped_latents)):
# 对每个潜在代码进行单独的变形处理
warped_latents[i] = warp_single_latent(latents[i][None], motion_field[i][None])
# 返回变形后的潜在代码
return warped_latents
# 定义一个名为 TextToVideoZeroPipeline 的类,继承自多个父类
class TextToVideoZeroPipeline(
# 继承 DiffusionPipeline 类
DiffusionPipeline,
# 继承 StableDiffusionMixin 类
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 类
TextualInversionLoaderMixin,
# 继承 StableDiffusionLoraLoaderMixin 类
StableDiffusionLoraLoaderMixin
):
r"""
用于使用 Stable Diffusion 进行零-shot 文本到视频生成的管道。
该模型继承自 [`DiffusionPipeline`]。请查看父类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。
参数:
vae ([`AutoencoderKL`]):
变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。
text_encoder ([`CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer (`CLIPTokenizer`):
用于标记文本的 [`~transformers.CLIPTokenizer`]。
unet ([`UNet2DConditionModel`]):
[`UNet3DConditionModel`] 用于去噪编码的视频潜在。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 结合使用的调度器,以去噪编码的图像潜在。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,用于估计生成的图像是否可能被视为攻击性或有害。
有关模型潜在危害的更多详细信息,请参阅 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`CLIPImageProcessor`]):
用于从生成图像中提取特征的 [`CLIPImageProcessor`];作为 `safety_checker` 的输入。
"""
# 初始化方法,用于创建类的实例
def __init__(
# 变分自编码器 (VAE) 实例
vae: AutoencoderKL,
# 文本编码器实例
text_encoder: CLIPTextModel,
# 标记器实例
tokenizer: CLIPTokenizer,
# UNet 实例,用于去噪处理
unet: UNet2DConditionModel,
# 调度器实例
scheduler: KarrasDiffusionSchedulers,
# 安全检查器实例
safety_checker: StableDiffusionSafetyChecker,
# 特征提取器实例
feature_extractor: CLIPImageProcessor,
# 是否需要安全检查器的布尔值,默认值为 True
requires_safety_checker: bool = True,
):
# 调用父类的构造函数进行初始化
super().__init__()
# 注册多个模块,包括 VAE、文本编码器、分词器、UNet、调度器、安全检查器和特征提取器
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
)
# 如果安全检查器为 None 且需要安全检查器,则发出警告
if safety_checker is None and requires_safety_checker:
logger.warning(
# 提示用户禁用安全检查器可能带来的风险和使用条款
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 根据 VAE 的配置计算缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器,使用之前计算的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def forward_loop(self, x_t0, t0, t1, generator):
"""
执行 DDPM 前向过程,从时间 t0 到 t1。这与添加具有相应方差的噪声相同。
Args:
x_t0:
时间 t0 时的潜在代码。
t0:
t0 时的时间步。
t1:
t1 时的时间戳。
generator (`torch.Generator` 或 `List[torch.Generator]`, *可选*):
一个 [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) 用于生成
确定性结果。
Returns:
x_t1:
应用前向过程后的 x_t0,从时间 t0 到 t1。
"""
# 生成与 x_t0 大小相同的随机噪声张量
eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
# 计算在 t0 到 t1 时间步之间的 alpha 向量的乘积
alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
# 计算 t1 时的潜在代码,结合原始潜在代码和随机噪声
x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
# 返回时间 t1 的潜在代码
return x_t1
def backward_loop(
self,
latents,
timesteps,
prompt_embeds,
guidance_scale,
callback,
callback_steps,
num_warmup_steps,
extra_step_kwargs,
cross_attention_kwargs=None,
# 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.StableDiffusionKDiffusionPipeline.check_inputs 复制
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否能被8整除,若不能则抛出错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步数是否为正整数,若不是则抛出错误
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调结束时的张量输入是否在已定义的输入列表中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查提示和提示嵌入是否同时存在,若同时存在则抛出错误
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查提示和提示嵌入是否都未提供,若都未提供则抛出错误
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示类型是否为字符串或列表,若不是则抛出错误
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查负提示和负提示嵌入是否同时存在,若同时存在则抛出错误
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入是否同时存在且形状是否一致,若不一致则抛出错误
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的内容
# 准备潜在变量以进行模型推理
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,包括批量大小和通道数
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器是否为列表且长度与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量未提供,则生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将提供的潜在变量转移到指定设备
latents = latents.to(device)
# 根据调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 不需要计算梯度的调用方法
@torch.no_grad()
def __call__(
# 接受多个参数以生成视频
self,
prompt: Union[str, List[str]],
video_length: Optional[int] = 8,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_videos_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
motion_field_strength_x: float = 12,
motion_field_strength_y: float = 12,
output_type: Optional[str] = "tensor",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: Optional[int] = 1,
t0: int = 44,
t1: int = 47,
frame_ids: Optional[List[int]] = None,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 中复制
def run_safety_checker(self, image, device, dtype):
# 检查是否定义了安全检查器
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入为张量,进行后处理转换
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入为 numpy 数组,转换为 PIL 图像
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器获取安全检查器输入并转移到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,返回处理后的图像和 NSFW 概念标识
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像及其 NSFW 概念标识
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 中复制
# 准备额外的参数用于调度器步骤,因为并非所有调度器都有相同的参数签名
def prepare_extra_step_kwargs(self, generator, eta):
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 其值应在 [0, 1] 之间
# 检查调度器的 step 方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤参数的字典
extra_step_kwargs = {}
# 如果接受 eta 参数,则将其添加到字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的 step 方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator 参数,则将其添加到字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
def encode_prompt(
self,
# 要编码的提示文本
prompt,
# 要使用的设备
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否执行无分类器自由引导
do_classifier_free_guidance,
# 可选的负提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LoRA 缩放因子
lora_scale: Optional[float] = None,
# 可选的剪切跳过参数
clip_skip: Optional[int] = None,
def decode_latents(self, latents):
# 将潜在变量缩放回原始大小
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,返回图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像像素值归一化到 [0, 1] 范围
image = (image / 2 + 0.5).clamp(0, 1)
# 始终转换为 float32 类型,以避免显著开销,并与 bfloat16 兼容
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
.\diffusers\pipelines\text_to_video_synthesis\pipeline_text_to_video_zero_sdxl.py
# 导入标准库和第三方库
import copy # 导入复制对象的模块
import inspect # 导入检查对象的模块
from dataclasses import dataclass # 导入数据类装饰器
from typing import Any, Callable, Dict, List, Optional, Tuple, Union # 导入类型提示
import numpy as np # 导入 NumPy 库
import PIL # 导入 PIL 库
import torch # 导入 PyTorch 库
import torch.nn.functional as F # 导入 PyTorch 的功能模块
from torch.nn.functional import grid_sample # 导入网格采样功能
# 导入变换器相关的模块
from transformers import (
CLIPImageProcessor, # 导入 CLIP 图像处理器
CLIPTextModel, # 导入 CLIP 文本模型
CLIPTextModelWithProjection, # 导入带有投影的 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 词法分析器
CLIPVisionModelWithProjection, # 导入带有投影的 CLIP 视觉模型
)
# 从相对路径导入图像处理器和加载器
from ...image_processor import VaeImageProcessor # 导入 VAE 图像处理器
from ...loaders import StableDiffusionXLLoraLoaderMixin, TextualInversionLoaderMixin # 导入稳定扩散和文本反转加载器混合类
from ...models import AutoencoderKL, UNet2DConditionModel # 导入自编码器和条件 U-Net 模型
# 导入注意力处理器相关的模块
from ...models.attention_processor import (
AttnProcessor2_0, # 导入注意力处理器版本 2.0
FusedAttnProcessor2_0, # 导入融合注意力处理器版本 2.0
XFormersAttnProcessor, # 导入 XFormers 注意力处理器
)
# 从相对路径导入 LoRA 相关功能
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LoRA 缩放文本编码器的函数
# 导入调度器
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
# 导入实用工具
from ...utils import (
USE_PEFT_BACKEND, # 导入是否使用 PEFT 后端的标志
BaseOutput, # 导入基础输出类
is_invisible_watermark_available, # 导入检查不可见水印是否可用的函数
logging, # 导入日志模块
scale_lora_layers, # 导入缩放 LoRA 层的函数
unscale_lora_layers, # 导入反缩放 LoRA 层的函数
)
# 从相对路径导入 PyTorch 实用工具
from ...utils.torch_utils import randn_tensor # 导入随机张量生成函数
# 导入管道工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和稳定扩散混合类
# 如果不可见水印可用,则导入水印模块
if is_invisible_watermark_available():
from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker # 导入稳定扩散 XL 水印器
# 创建日志记录器
logger = logging.get_logger(__name__) # 根据当前模块名称创建日志记录器,pylint 检查规则
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_0 中复制的函数
def rearrange_0(tensor, f):
F, C, H, W = tensor.size() # 获取张量的尺寸,F:帧数, C:通道数, H:高度, W:宽度
tensor = torch.permute(torch.reshape(tensor, (F // f, f, C, H, W)), (0, 2, 1, 3, 4)) # 改变张量的形状和维度
return tensor # 返回重新排列的张量
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_1 中复制的函数
def rearrange_1(tensor):
B, C, F, H, W = tensor.size() # 获取张量的尺寸,B:批次大小, C:通道数, F:帧数, H:高度, W:宽度
return torch.reshape(torch.permute(tensor, (0, 2, 1, 3, 4)), (B * F, C, H, W)) # 改变维度并返回张量
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_3 中复制的函数
def rearrange_3(tensor, f):
F, D, C = tensor.size() # 获取张量的尺寸,F:帧数, D:特征维度, C:通道数
return torch.reshape(tensor, (F // f, f, D, C)) # 改变张量的形状并返回
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.rearrange_4 中复制的函数
def rearrange_4(tensor):
B, F, D, C = tensor.size() # 获取张量的尺寸,B:批次大小, F:帧数, D:特征维度, C:通道数
return torch.reshape(tensor, (B * F, D, C)) # 改变张量形状并返回
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.CrossFrameAttnProcessor 中复制的类
class CrossFrameAttnProcessor:
"""
跨帧注意力处理器。每帧关注第一帧。
参数:
batch_size: 表示实际批次大小的数字,除了帧数以外。
例如,在使用单个提示和 num_images_per_prompt=1 调用 unet 时,batch_size 应该等于
2,因分类器自由引导。
"""
def __init__(self, batch_size=2):
self.batch_size = batch_size # 初始化批次大小
# 定义可调用的前向传递方法,接受注意力机制相关参数
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
# 获取批次大小、序列长度和隐藏状态的维度
batch_size, sequence_length, _ = hidden_states.shape
# 准备注意力掩码以适应批次和序列长度
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# 将隐藏状态转换为查询向量
query = attn.to_q(hidden_states)
# 判断是否为交叉注意力
is_cross_attention = encoder_hidden_states is not None
# 如果没有编码器隐藏状态,则使用隐藏状态本身
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
# 如果需要归一化交叉注意力,则对编码器隐藏状态进行归一化
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将编码器隐藏状态转换为键向量
key = attn.to_k(encoder_hidden_states)
# 将编码器隐藏状态转换为值向量
value = attn.to_v(encoder_hidden_states)
# 处理交叉帧注意力
if not is_cross_attention:
# 计算视频长度并初始化第一个帧索引列表
video_length = key.size()[0] // self.batch_size
first_frame_index = [0] * video_length
# 重新排列键以使批次和帧在第一和第二维度
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# 重新排列值以使批次和帧在第一和第二维度
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# 重新排列回原始形状
key = rearrange_4(key)
value = rearrange_4(value)
# 将查询、键和值转换为批次维度
query = attn.head_to_batch_dim(query)
key = attn.head_to_batch_dim(key)
value = attn.head_to_batch_dim(value)
# 计算注意力得分
attention_probs = attn.get_attention_scores(query, key, attention_mask)
# 使用注意力概率与值进行批次矩阵乘法
hidden_states = torch.bmm(attention_probs, value)
# 将隐藏状态转换回头部维度
hidden_states = attn.batch_to_head_dim(hidden_states)
# 对隐藏状态进行线性投影
hidden_states = attn.to_out[0](hidden_states)
# 对隐藏状态应用 dropout
hidden_states = attn.to_out[1](hidden_states)
# 返回处理后的隐藏状态
return hidden_states
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero 中复制的类
class CrossFrameAttnProcessor2_0:
"""
使用 PyTorch 2.0 的缩放点积注意力的跨帧注意力处理器。
Args:
batch_size: 表示实际批处理大小的数字,而不是帧的数量。
例如,使用单个提示和 num_images_per_prompt=1 调用 unet 时,batch_size 应该等于
2,因为无分类器引导。
"""
# 初始化方法,接受批处理大小的参数,默认为 2
def __init__(self, batch_size=2):
# 检查 PyTorch 是否具有缩放点积注意力的功能
if not hasattr(F, "scaled_dot_product_attention"):
# 如果没有,抛出导入错误,提示需要升级 PyTorch 到 2.0
raise ImportError("AttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
# 将传入的批处理大小赋值给实例变量
self.batch_size = batch_size
# 定义调用方法,用于执行注意力机制
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
# 获取批次大小和序列长度,选择使用的隐藏状态
batch_size, sequence_length, _ = (
hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
)
# 获取隐藏状态的内维度
inner_dim = hidden_states.shape[-1]
# 如果存在注意力掩码
if attention_mask is not None:
# 准备注意力掩码,以适应序列长度和批次大小
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# scaled_dot_product_attention 期望注意力掩码的形状为 (batch, heads, source_length, target_length)
attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])
# 将隐藏状态转换为查询向量
query = attn.to_q(hidden_states)
# 检查是否为交叉注意力
is_cross_attention = encoder_hidden_states is not None
# 如果没有交叉隐藏状态,则使用当前的隐藏状态
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
# 如果需要进行交叉注意力的归一化
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将交叉隐藏状态转换为键和值
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# 处理非交叉帧注意力
if not is_cross_attention:
# 计算视频长度,确保至少为1
video_length = max(1, key.size()[0] // self.batch_size)
first_frame_index = [0] * video_length
# 重新排列键,使批次和帧位于第1和第2维
key = rearrange_3(key, video_length)
key = key[:, first_frame_index]
# 重新排列值,使批次和帧位于第1和第2维
value = rearrange_3(value, video_length)
value = value[:, first_frame_index]
# 重新排列回原始形状
key = rearrange_4(key)
value = rearrange_4(value)
# 计算每个头的维度
head_dim = inner_dim // attn.heads
# 重新排列查询、键和值的形状
query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
# scaled_dot_product_attention 的输出形状为 (batch, num_heads, seq_len, head_dim)
# TODO: 在升级到 Torch 2.1 时添加对 attn.scale 的支持
hidden_states = F.scaled_dot_product_attention(
query, key, value, attn_mask=attention_mask, dropout_p=0.0, is_causal=False
)
# 转置并重塑隐藏状态以合并头维度
hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
# 将隐藏状态转换为查询向量的数据类型
hidden_states = hidden_states.to(query.dtype)
# 线性变换
hidden_states = attn.to_out[0](hidden_states)
# 应用 dropout
hidden_states = attn.to_out[1](hidden_states)
# 返回最终的隐藏状态
return hidden_states
# 定义一个用于零-shot文本到视频管道的输出类,继承自BaseOutput
@dataclass
class TextToVideoSDXLPipelineOutput(BaseOutput):
"""
输出类用于零-shot文本到视频管道。
参数:
images (`List[PIL.Image.Image]` 或 `np.ndarray`)
长度为 `batch_size` 的去噪PIL图像列表或形状为 `(batch_size, height, width,
num_channels)` 的numpy数组。PIL图像或numpy数组呈现扩散管道的去噪图像。
"""
# 定义属性images,可以是PIL图像列表或numpy数组
images: Union[List[PIL.Image.Image], np.ndarray]
# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.coords_grid复制的函数
def coords_grid(batch, ht, wd, device):
# 从给定的高度和宽度生成网格坐标,适应设备
# 适配自:https://github.com/princeton-vl/RAFT/blob/master/core/utils/utils.py
coords = torch.meshgrid(torch.arange(ht, device=device), torch.arange(wd, device=device))
# 反转坐标顺序并转换为浮点型
coords = torch.stack(coords[::-1], dim=0).float()
# 生成的坐标扩展到指定的batch维度
return coords[None].repeat(batch, 1, 1, 1)
# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.warp_single_latent复制的函数
def warp_single_latent(latent, reference_flow):
"""
使用给定的流对单帧的潜在表示进行扭曲
参数:
latent: 单帧的潜在代码
reference_flow: 用于扭曲潜在表示的流
返回:
warped: 扭曲后的潜在表示
"""
# 获取参考流的尺寸,高度和宽度
_, _, H, W = reference_flow.size()
# 获取潜在表示的尺寸,高度和宽度
_, _, h, w = latent.size()
# 生成坐标网格并转换为潜在表示的dtype
coords0 = coords_grid(1, H, W, device=latent.device).to(latent.dtype)
# 将参考流加到坐标网格上
coords_t0 = coords0 + reference_flow
# 将坐标归一化到[0, 1]范围
coords_t0[:, 0] /= W
coords_t0[:, 1] /= H
# 将坐标缩放到[-1, 1]范围
coords_t0 = coords_t0 * 2.0 - 1.0
# 对坐标进行双线性插值以调整大小
coords_t0 = F.interpolate(coords_t0, size=(h, w), mode="bilinear")
# 调整坐标的维度顺序
coords_t0 = torch.permute(coords_t0, (0, 2, 3, 1))
# 使用给定坐标从潜在表示中采样扭曲的结果
warped = grid_sample(latent, coords_t0, mode="nearest", padding_mode="reflection")
return warped
# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field复制的函数
def create_motion_field(motion_field_strength_x, motion_field_strength_y, frame_ids, device, dtype):
"""
创建平移运动场
参数:
motion_field_strength_x: x轴的运动强度
motion_field_strength_y: y轴的运动强度
frame_ids: 正在处理的潜在表示的帧索引。
当我们执行分块推理时,这一点很重要
device: 设备
dtype: 数据类型
返回:
"""
# 获取帧的序列长度
seq_length = len(frame_ids)
# 初始化参考流,大小为 (seq_length, 2, 512, 512)
reference_flow = torch.zeros((seq_length, 2, 512, 512), device=device, dtype=dtype)
# 遍历每一帧,计算对应的运动场
for fr_idx in range(seq_length):
# 在x轴上为每一帧应用运动强度
reference_flow[fr_idx, 0, :, :] = motion_field_strength_x * (frame_ids[fr_idx])
# 在y轴上为每一帧应用运动强度
reference_flow[fr_idx, 1, :, :] = motion_field_strength_y * (frame_ids[fr_idx])
# 返回生成的参考流
return reference_flow
# 从diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.create_motion_field_and_warp_latents复制的函数
def create_motion_field_and_warp_latents(motion_field_strength_x, motion_field_strength_y, frame_ids, latents):
"""
# 创建平移运动,并相应地扭曲潜在表示
"""
# 参数说明
# motion_field_strength_x: x 轴上的运动强度
# motion_field_strength_y: y 轴上的运动强度
# frame_ids: 正在处理的帧的索引。
# 在进行分块推理时需要此信息
# latents: 帧的潜在编码
# 返回值说明
# warped_latents: 扭曲后的潜在表示
"""
# 创建运动场,根据给定的运动强度和帧 ID
motion_field = create_motion_field(
# 设置 x 轴运动强度
motion_field_strength_x=motion_field_strength_x,
# 设置 y 轴运动强度
motion_field_strength_y=motion_field_strength_y,
# 传入帧 ID
frame_ids=frame_ids,
# 设定设备为潜在表示的设备
device=latents.device,
# 设定数据类型为潜在表示的数据类型
dtype=latents.dtype,
)
# 克隆潜在表示以创建扭曲后的潜在表示
warped_latents = latents.clone().detach()
# 遍历所有扭曲的潜在表示
for i in range(len(warped_latents)):
# 对每个潜在表示应用单个扭曲操作
warped_latents[i] = warp_single_latent(latents[i][None], motion_field[i][None])
# 返回扭曲后的潜在表示
return warped_latents
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
根据 `guidance_rescale` 调整 `noise_cfg`。基于 [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 的研究结果。见第 3.4 节
"""
# 计算 noise_pred_text 在所有维度上的标准差,并保持维度
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
# 计算 noise_cfg 在所有维度上的标准差,并保持维度
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# 根据标准差调整来自指导的结果(修正过度曝光)
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# 通过 guidance_rescale 因子与来自指导的原始结果混合,以避免“单调”的图像
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
# 返回调整后的 noise_cfg
return noise_cfg
# 定义一个用于零-shot 文本到视频生成的管道类
class TextToVideoZeroSDXLPipeline(
DiffusionPipeline, # 继承自 DiffusionPipeline
StableDiffusionMixin, # 继承自 StableDiffusionMixin
StableDiffusionXLLoraLoaderMixin, # 继承自 StableDiffusionXLLoraLoaderMixin
TextualInversionLoaderMixin, # 继承自 TextualInversionLoaderMixin
):
r"""
使用 Stable Diffusion XL 进行零-shot 文本到视频生成的管道。
该模型继承自 [`DiffusionPipeline`]。查看超类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。
# 参数说明
Args:
vae ([`AutoencoderKL`]):
# 变分自编码器(VAE)模型,用于将图像编码为潜在表示,并从潜在表示解码图像。
text_encoder ([`CLIPTextModel`]):
# 冻结的文本编码器。稳定扩散 XL 使用 CLIP 的文本部分,
# 具体来说是 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
text_encoder_2 ([` CLIPTextModelWithProjection`]):
# 第二个冻结文本编码器。稳定扩散 XL 使用 CLIP 的文本和池部分,
# 具体是 [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k) 变体。
tokenizer (`CLIPTokenizer`):
# CLIPTokenizer 类的分词器。
tokenizer_2 (`CLIPTokenizer`):
# 第二个 CLIPTokenizer 类的分词器。
unet ([`UNet2DConditionModel`]):
# 条件 U-Net 架构,用于去噪编码后的图像潜在表示。
scheduler ([`SchedulerMixin`]):
# 调度器,用于与 `unet` 结合去噪编码后的图像潜在表示。可以是
# [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的任意一个。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->unet->vae"
# 定义可选组件列表
_optional_components = [
# 分词器
"tokenizer",
# 第二个分词器
"tokenizer_2",
# 第一个文本编码器
"text_encoder",
# 第二个文本编码器
"text_encoder_2",
# 图像编码器
"image_encoder",
# 特征提取器
"feature_extractor",
]
# 初始化方法定义
def __init__(
# 变分自编码器
self,
vae: AutoencoderKL,
# 文本编码器
text_encoder: CLIPTextModel,
# 第二个文本编码器
text_encoder_2: CLIPTextModelWithProjection,
# 第一个分词器
tokenizer: CLIPTokenizer,
# 第二个分词器
tokenizer_2: CLIPTokenizer,
# 条件 U-Net
unet: UNet2DConditionModel,
# 调度器
scheduler: KarrasDiffusionSchedulers,
# 可选的图像编码器,默认为 None
image_encoder: CLIPVisionModelWithProjection = None,
# 可选的特征提取器,默认为 None
feature_extractor: CLIPImageProcessor = None,
# 用于处理空提示的强制零值,默认为 True
force_zeros_for_empty_prompt: bool = True,
# 可选的水印标记,默认为 None
add_watermarker: Optional[bool] = None,
# 初始化父类
):
super().__init__()
# 注册多个模块,包括 VAE、文本编码器和其他组件
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
scheduler=scheduler,
image_encoder=image_encoder,
feature_extractor=feature_extractor,
)
# 将配置参数注册到对象中,强制为空提示使用零
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
# 计算 VAE 的缩放因子,基于输出通道的数量
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 设置默认的采样尺寸,从 UNet 配置中获取
self.default_sample_size = self.unet.config.sample_size
# 如果 add_watermarker 为 None,使用可用的隐形水印设置
add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
# 如果需要添加水印,则初始化水印器,否则设置为 None
if add_watermarker:
self.watermark = StableDiffusionXLWatermarker()
else:
self.watermark = None
# 从稳定扩散管道中复制的函数,用于准备额外的步骤参数
def prepare_extra_step_kwargs(self, generator, eta):
# 准备调度器步骤的额外参数,因为并非所有调度器具有相同的签名
# eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应在 [0, 1] 范围内
# 检查调度器是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
# 如果接受,则将 eta 添加到额外参数中
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
# 如果接受,则将 generator 添加到额外参数中
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 从稳定扩散 XL 管道中复制的函数,用于提升 VAE 精度
def upcast_vae(self):
# 获取 VAE 的数据类型
dtype = self.vae.dtype
# 将 VAE 转换为 float32 类型
self.vae.to(dtype=torch.float32)
# 检查是否使用 torch 2.0 或 xformers
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
FusedAttnProcessor2_0,
),
)
# 如果使用了 xformers 或 torch 2.0,则不需要将注意力块放在 float32 中,以节省大量内存
if use_torch_2_0_or_xformers:
# 将后量化卷积转换为原始数据类型
self.vae.post_quant_conv.to(dtype)
# 将解码器的输入卷积转换为原始数据类型
self.vae.decoder.conv_in.to(dtype)
# 将解码器的中间块转换为原始数据类型
self.vae.decoder.mid_block.to(dtype)
# 从稳定扩散 XL 管道中复制的函数,用于获取附加时间 ID
# 定义一个私有方法,用于获取添加时间的 ID
def _get_add_time_ids(
# 输入参数:原始大小、裁剪的左上角坐标、目标大小、数据类型和文本编码器的投影维度(可选)
self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
):
# 将原始大小、裁剪坐标和目标大小合并为一个列表
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 计算通过时间嵌入所需的维度
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取模型预期的时间嵌入维度
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
# 检查计算得到的维度与预期是否一致
if expected_add_embed_dim != passed_add_embed_dim:
# 如果不一致,抛出值错误,提示用户检查配置
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将添加时间的 ID 转换为张量,并指定数据类型
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
# 返回添加时间的 ID 张量
return add_time_ids
# 从 StableDiffusionPipeline 复制的方法,用于准备潜在的张量
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 根据输入参数计算潜在张量的形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器的长度是否与批量大小一致
if isinstance(generator, list) and len(generator) != batch_size:
# 如果不一致,抛出值错误,提示用户检查生成器长度
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在张量,随机生成一个
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在张量,将其移动到指定设备
latents = latents.to(device)
# 根据调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在张量
return latents
# 检查输入参数的方法
def check_inputs(
# 输入参数:提示、第二个提示、高度、宽度、回调步骤以及可选的负提示和嵌入
self,
prompt,
prompt_2,
height,
width,
callback_steps,
negative_prompt=None,
negative_prompt_2=None,
prompt_embeds=None,
negative_prompt_embeds=None,
pooled_prompt_embeds=None,
negative_pooled_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
# 从 StableDiffusionXLPipeline 复制的方法,用于编码提示
# 定义一个方法用于编码提示信息
def encode_prompt(
# 输入的提示字符串
self,
prompt: str,
# 第二个提示字符串,默认为 None
prompt_2: Optional[str] = None,
# 设备类型,默认为 None
device: Optional[torch.device] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: int = 1,
# 是否使用分类器自由引导,默认为 True
do_classifier_free_guidance: bool = True,
# 负提示字符串,默认为 None
negative_prompt: Optional[str] = None,
# 第二个负提示字符串,默认为 None
negative_prompt_2: Optional[str] = None,
# 提示的张量表示,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示的张量表示,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 池化后的提示张量表示,默认为 None
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 池化后的负提示张量表示,默认为 None
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# Lora 的缩放因子,默认为 None
lora_scale: Optional[float] = None,
# 跳过的 clip 数,默认为 None
clip_skip: Optional[int] = None,
# 从 diffusers.pipelines.text_to_video_synthesis.pipeline_text_to_video_zero.TextToVideoZeroPipeline.forward_loop 复制的代码
def forward_loop(self, x_t0, t0, t1, generator):
"""
从 t0 到 t1 执行 DDPM 向前过程,即根据相应方差添加噪声。
Args:
x_t0:
t0 时刻的潜在编码。
t0:
t0 时刻的时间步。
t1:
t1 时刻的时间步。
generator (`torch.Generator` 或 `List[torch.Generator]`, *可选*):
用于生成确定性的 [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html)。
Returns:
x_t1:
对 x_t0 应用从 t0 到 t1 的向前过程。
"""
# 生成与 x_t0 相同形状的随机噪声张量
eps = randn_tensor(x_t0.size(), generator=generator, dtype=x_t0.dtype, device=x_t0.device)
# 计算在 t0 到 t1 之间的 alpha 向量的乘积
alpha_vec = torch.prod(self.scheduler.alphas[t0:t1])
# 计算向前过程结果 x_t1
x_t1 = torch.sqrt(alpha_vec) * x_t0 + torch.sqrt(1 - alpha_vec) * eps
# 返回结果 x_t1
return x_t1
# 定义一个方法用于执行向后过程
def backward_loop(
# 潜在变量
latents,
# 时间步序列
timesteps,
# 提示的张量表示
prompt_embeds,
# 引导的缩放因子
guidance_scale,
# 回调函数
callback,
# 回调步骤
callback_steps,
# 预热步骤数量
num_warmup_steps,
# 额外步骤的参数
extra_step_kwargs,
# 添加文本嵌入
add_text_embeds,
# 添加时间 ID
add_time_ids,
# 交叉注意力的参数,默认为 None
cross_attention_kwargs=None,
# 引导重缩放因子,默认为 0.0
guidance_rescale: float = 0.0,
# 不计算梯度的装饰器
@torch.no_grad()
# 定义可调用对象的方法,允许使用特定参数生成视频
def __call__(
self,
# 输入提示,支持字符串或字符串列表
prompt: Union[str, List[str]],
# 第二个提示,支持字符串或字符串列表,默认为 None
prompt_2: Optional[Union[str, List[str]]] = None,
# 视频长度,默认为 8 秒
video_length: Optional[int] = 8,
# 视频高度,默认为 None
height: Optional[int] = None,
# 视频宽度,默认为 None
width: Optional[int] = None,
# 推理步骤数,默认为 50
num_inference_steps: int = 50,
# 去噪结束阈值,默认为 None
denoising_end: Optional[float] = None,
# 引导缩放因子,默认为 7.5
guidance_scale: float = 7.5,
# 负提示,支持字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 第二个负提示,支持字符串或字符串列表,默认为 None
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 每个提示生成的视频数量,默认为 1
num_videos_per_prompt: Optional[int] = 1,
# 额外噪声强度,默认为 0.0
eta: float = 0.0,
# 随机数生成器,可以是单个或多个 torch.Generator,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 帧 ID 列表,默认为 None
frame_ids: Optional[List[int]] = None,
# 提示的嵌入,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示的嵌入,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 聚合提示的嵌入,默认为 None
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 负聚合提示的嵌入,默认为 None
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 潜在变量,默认为 None
latents: Optional[torch.Tensor] = None,
# 动作场强度 X,默认为 12
motion_field_strength_x: float = 12,
# 动作场强度 Y,默认为 12
motion_field_strength_y: float = 12,
# 输出类型,默认为 "tensor"
output_type: Optional[str] = "tensor",
# 是否返回字典,默认为 True
return_dict: bool = True,
# 回调函数,接受特定参数,默认为 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 每多少步骤调用一次回调,默认为 1
callback_steps: int = 1,
# 跨注意力的额外参数,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 引导重标定,默认为 0.0
guidance_rescale: float = 0.0,
# 原始尺寸,默认为 None
original_size: Optional[Tuple[int, int]] = None,
# 裁剪坐标的左上角,默认为 (0, 0)
crops_coords_top_left: Tuple[int, int] = (0, 0),
# 目标尺寸,默认为 None
target_size: Optional[Tuple[int, int]] = None,
# 初始时间 t0,默认为 44
t0: int = 44,
# 结束时间 t1,默认为 47
t1: int = 47,
.\diffusers\pipelines\text_to_video_synthesis\__init__.py
# 导入类型检查相关的模块
from typing import TYPE_CHECKING
# 从上层目录的 utils 模块导入所需的工具和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入慢导入标志
OptionalDependencyNotAvailable, # 导入可选依赖未可用异常
_LazyModule, # 导入懒加载模块
get_objects_from_module, # 导入从模块获取对象的函数
is_torch_available, # 导入检查是否可用 PyTorch 的函数
is_transformers_available, # 导入检查是否可用 Transformers 的函数
)
# 初始化一个空字典用于存放虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存放导入结构
_import_structure = {}
# 尝试检查是否可用 Transformers 和 PyTorch
try:
if not (is_transformers_available() and is_torch_available()): # 如果两者不可用
raise OptionalDependencyNotAvailable() # 抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
from ...utils import dummy_torch_and_transformers_objects # 导入虚拟对象模块,忽略未使用警告
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else: # 如果没有异常
# 更新导入结构字典,添加相应的管道输出
_import_structure["pipeline_output"] = ["TextToVideoSDPipelineOutput"]
# 添加文本到视频合成的管道
_import_structure["pipeline_text_to_video_synth"] = ["TextToVideoSDPipeline"]
# 添加图像到图像的视频合成管道
_import_structure["pipeline_text_to_video_synth_img2img"] = ["VideoToVideoSDPipeline"]
# 添加文本到视频零样本合成管道
_import_structure["pipeline_text_to_video_zero"] = ["TextToVideoZeroPipeline"]
# 添加文本到视频零样本 SDXL 合成管道
_import_structure["pipeline_text_to_video_zero_sdxl"] = ["TextToVideoZeroSDXLPipeline"]
# 如果在类型检查阶段或慢导入标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 再次检查是否可用 Transformers 和 PyTorch
if not (is_transformers_available() and is_torch_available()): # 如果两者不可用
raise OptionalDependencyNotAvailable() # 抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
from ...utils.dummy_torch_and_transformers_objects import * # 导入虚拟对象,忽略未使用警告
else: # 如果没有异常
# 从各自的模块中导入所需的管道类
from .pipeline_output import TextToVideoSDPipelineOutput
from .pipeline_text_to_video_synth import TextToVideoSDPipeline
from .pipeline_text_to_video_synth_img2img import VideoToVideoSDPipeline
from .pipeline_text_to_video_zero import TextToVideoZeroPipeline
from .pipeline_text_to_video_zero_sdxl import TextToVideoZeroSDXLPipeline
else: # 如果不在类型检查阶段也不需要慢导入
import sys # 导入系统模块
# 将当前模块替换为懒加载模块
sys.modules[__name__] = _LazyModule(
__name__, # 模块名
globals()["__file__"], # 文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块规范
)
# 将虚拟对象字典中的对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\unclip\pipeline_unclip.py
# 版权声明,指明版权归属及许可信息
# Copyright 2024 Kakao Brain and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可协议,使用该文件需要遵循许可条款
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 许可协议的副本可以在以下网址获取
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律规定或书面同意,否则软件按“现状”提供,不提供任何形式的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 具体的许可条款可以查看许可协议
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入 inspect 模块,用于获取对象的签名和源代码
import inspect
# 从 typing 模块导入类型注解
from typing import List, Optional, Tuple, Union
# 导入 torch 库以进行张量运算
import torch
# 从 torch.nn.functional 导入常用的函数接口
from torch.nn import functional as F
# 导入 CLIP 文本模型和分词器
from transformers import CLIPTextModelWithProjection, CLIPTokenizer
# 从 CLIP 模型导入文本模型输出类
from transformers.models.clip.modeling_clip import CLIPTextModelOutput
# 从当前目录导入多个模型和调度器
from ...models import PriorTransformer, UNet2DConditionModel, UNet2DModel
from ...schedulers import UnCLIPScheduler
from ...utils import logging
from ...utils.torch_utils import randn_tensor
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
from .text_proj import UnCLIPTextProjModel
# 初始化日志记录器,用于记录运行信息
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个用于文本到图像生成的管道类
class UnCLIPPipeline(DiffusionPipeline):
"""
使用 unCLIP 进行文本到图像生成的管道。
该模型继承自 [`DiffusionPipeline`]。有关通用方法的文档请参见超类文档
(下载、保存、在特定设备上运行等)。
参数:
text_encoder ([`~transformers.CLIPTextModelWithProjection`]):
冻结的文本编码器。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行分词的 `CLIPTokenizer`。
prior ([`PriorTransformer`]):
经典的 unCLIP 先验,用于从文本嵌入近似图像嵌入。
text_proj ([`UnCLIPTextProjModel`]):
用于准备和组合嵌入的实用类,嵌入会在传递给解码器之前进行处理。
decoder ([`UNet2DConditionModel`]):
将图像嵌入反转为图像的解码器。
super_res_first ([`UNet2DModel`]):
超分辨率 UNet。在超分辨率扩散过程的所有步骤中使用,除了最后一步。
super_res_last ([`UNet2DModel`]):
超分辨率 UNet。在超分辨率扩散过程的最后一步使用。
prior_scheduler ([`UnCLIPScheduler`]):
在先验去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
decoder_scheduler ([`UnCLIPScheduler`]):
在解码器去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
super_res_scheduler ([`UnCLIPScheduler`]):
在超分辨率去噪过程中使用的调度器(修改版 [`DDPMScheduler`]).
"""
# 指定在 CPU 卸载时要排除的组件
_exclude_from_cpu_offload = ["prior"]
# 定义先验字段,类型为 PriorTransformer
prior: PriorTransformer
# 定义解码器模型类型
decoder: UNet2DConditionModel
# 定义文本投影模型类型
text_proj: UnCLIPTextProjModel
# 定义文本编码器模型类型
text_encoder: CLIPTextModelWithProjection
# 定义分词器类型
tokenizer: CLIPTokenizer
# 定义超分辨率模型的第一个部分
super_res_first: UNet2DModel
# 定义超分辨率模型的最后一个部分
super_res_last: UNet2DModel
# 定义优先级调度器
prior_scheduler: UnCLIPScheduler
# 定义解码器调度器
decoder_scheduler: UnCLIPScheduler
# 定义超分辨率调度器
super_res_scheduler: UnCLIPScheduler
# 定义模型的CPU卸载顺序
model_cpu_offload_seq = "text_encoder->text_proj->decoder->super_res_first->super_res_last"
# 初始化函数,定义所需的参数
def __init__(
# 定义优先变换器参数
prior: PriorTransformer,
# 定义解码器模型参数
decoder: UNet2DConditionModel,
# 定义文本编码器参数
text_encoder: CLIPTextModelWithProjection,
# 定义分词器参数
tokenizer: CLIPTokenizer,
# 定义文本投影模型参数
text_proj: UnCLIPTextProjModel,
# 定义超分辨率模型的第一个部分参数
super_res_first: UNet2DModel,
# 定义超分辨率模型的最后一个部分参数
super_res_last: UNet2DModel,
# 定义优先级调度器参数
prior_scheduler: UnCLIPScheduler,
# 定义解码器调度器参数
decoder_scheduler: UnCLIPScheduler,
# 定义超分辨率调度器参数
super_res_scheduler: UnCLIPScheduler,
):
# 调用父类的初始化函数
super().__init__()
# 注册模块,存储所需的模型和调度器
self.register_modules(
prior=prior,
decoder=decoder,
text_encoder=text_encoder,
tokenizer=tokenizer,
text_proj=text_proj,
super_res_first=super_res_first,
super_res_last=super_res_last,
prior_scheduler=prior_scheduler,
decoder_scheduler=decoder_scheduler,
super_res_scheduler=super_res_scheduler,
)
# 准备潜在变量的函数
def prepare_latents(self, shape, dtype, device, generator, latents, scheduler):
# 如果没有给定潜在变量,生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果给定的潜在变量形状与预期不符,则抛出异常
if latents.shape != shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
# 将潜在变量移动到指定设备
latents = latents.to(device)
# 将潜在变量乘以调度器的初始噪声标准差
latents = latents * scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 编码提示的函数
def _encode_prompt(
self,
# 提示内容
prompt,
# 设备类型
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否执行分类器自由引导
do_classifier_free_guidance,
# 文本模型输出,可选
text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None,
# 文本注意力掩码,可选
text_attention_mask: Optional[torch.Tensor] = None,
@torch.no_grad()
# 调用函数,用于生成图像
def __call__(
# 提示内容,可选
prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: int = 1,
# 优先模型的推理步骤数量,默认为25
prior_num_inference_steps: int = 25,
# 解码器的推理步骤数量,默认为25
decoder_num_inference_steps: int = 25,
# 超分辨率的推理步骤数量,默认为7
super_res_num_inference_steps: int = 7,
# 随机数生成器,可选
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 优先潜在变量,可选
prior_latents: Optional[torch.Tensor] = None,
# 解码器潜在变量,可选
decoder_latents: Optional[torch.Tensor] = None,
# 超分辨率潜在变量,可选
super_res_latents: Optional[torch.Tensor] = None,
# 文本模型输出,可选
text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None,
# 文本注意力掩码,可选
text_attention_mask: Optional[torch.Tensor] = None,
# 优先引导尺度,默认为4.0
prior_guidance_scale: float = 4.0,
# 解码器引导尺度,默认为8.0
decoder_guidance_scale: float = 8.0,
# 输出类型,默认为"pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式,默认为True
return_dict: bool = True,