diffusers 源码解析(二十)
.\diffusers\pipelines\blip_diffusion\pipeline_blip_diffusion.py
# 版权所有 2024 Salesforce.com, inc. # 指明版权归属
# 版权所有 2024 The HuggingFace Team. All rights reserved. # 指明另一个版权归属
# 根据 Apache License 2.0 许可协议进行授权; # 说明代码的许可协议
# 除非符合许可协议,否则不可使用此文件。 # 指出使用条件
# 可以在以下地址获取许可协议的副本: # 提供许可协议的获取方式
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则软件按"原样"提供。 # 指出软件不提供任何保证
# 请参见许可协议了解特定的权限和限制。 # 指出许可协议的内容
from typing import List, Optional, Union # 从 typing 模块导入类型提示工具
import PIL.Image # 导入 PIL 库的图像处理功能
import torch # 导入 PyTorch 库
from transformers import CLIPTokenizer # 从 transformers 导入 CLIPTokenizer
from ...models import AutoencoderKL, UNet2DConditionModel # 从相对路径导入模型
from ...schedulers import PNDMScheduler # 从相对路径导入调度器
from ...utils import ( # 从相对路径导入工具函数
logging, # 导入日志记录工具
replace_example_docstring, # 导入替换示例文档字符串的工具
)
from ...utils.torch_utils import randn_tensor # 从工具模块导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 从相对路径导入管道工具
from .blip_image_processing import BlipImageProcessor # 从当前目录导入图像处理工具
from .modeling_blip2 import Blip2QFormerModel # 从当前目录导入 Blip2 模型
from .modeling_ctx_clip import ContextCLIPTextModel # 从当前目录导入 ContextCLIP 文本模型
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器实例,pylint 禁用无效名称警告
EXAMPLE_DOC_STRING = """ # 示例文档字符串,展示如何使用 BlipDiffusionPipeline
Examples: # 示例部分的开始
```py # 开始代码块
>>> from diffusers.pipelines import BlipDiffusionPipeline # 导入 BlipDiffusionPipeline
>>> from diffusers.utils import load_image # 导入加载图像的工具
>>> import torch # 导入 PyTorch 库
>>> blip_diffusion_pipe = BlipDiffusionPipeline.from_pretrained( # 创建 BlipDiffusionPipeline 的实例
... "Salesforce/blipdiffusion", torch_dtype=torch.float16 # 从预训练模型加载,设置数据类型为 float16
... ).to("cuda") # 将模型转移到 GPU
>>> cond_subject = "dog" # 定义条件主题为“狗”
>>> tgt_subject = "dog" # 定义目标主题为“狗”
>>> text_prompt_input = "swimming underwater" # 定义文本提示输入
>>> cond_image = load_image( # 加载条件图像
... "https://huggingface.co/datasets/ayushtues/blipdiffusion_images/resolve/main/dog.jpg" # 图像的 URL
... ) # 结束加载图像的函数调用
>>> guidance_scale = 7.5 # 设置引导尺度
>>> num_inference_steps = 25 # 设置推理步骤数
>>> negative_prompt = "over-exposure, under-exposure, saturated, duplicate, out of frame, lowres, cropped, worst quality, low quality, jpeg artifacts, morbid, mutilated, out of frame, ugly, bad anatomy, bad proportions, deformed, blurry, duplicate" # 定义负面提示
>>> output = blip_diffusion_pipe( # 调用管道生成输出
... text_prompt_input, # 传入文本提示输入
... cond_image, # 传入条件图像
... cond_subject, # 传入条件主题
... tgt_subject, # 传入目标主题
... guidance_scale=guidance_scale, # 传入引导尺度
... num_inference_steps=num_inference_steps, # 传入推理步骤数
... neg_prompt=negative_prompt, # 传入负面提示
... height=512, # 设置输出图像高度
... width=512, # 设置输出图像宽度
... ).images # 获取生成的图像
>>> output[0].save("image.png") # 保存生成的第一张图像为 "image.png"
```py # 结束代码块
"""
class BlipDiffusionPipeline(DiffusionPipeline): # 定义 BlipDiffusionPipeline 类,继承自 DiffusionPipeline
"""
Pipeline for Zero-Shot Subject Driven Generation using Blip Diffusion. # 说明该管道用于零-shot 主题驱动生成
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the # 指出该模型继承自 DiffusionPipeline,并建议查看超类文档以获取通用方法
# 库实现所有管道的功能(例如下载或保存,在特定设备上运行等)
Args:
tokenizer ([`CLIPTokenizer`]):
文本编码器的分词器
text_encoder ([`ContextCLIPTextModel`]):
用于编码文本提示的文本编码器
vae ([`AutoencoderKL`]):
VAE 模型,用于将潜在变量映射到图像
unet ([`UNet2DConditionModel`]):
条件 U-Net 架构,用于去噪图像嵌入
scheduler ([`PNDMScheduler`]):
与 `unet` 一起使用以生成图像潜在变量的调度器
qformer ([`Blip2QFormerModel`]):
QFormer 模型,用于从文本和图像中获取多模态嵌入
image_processor ([`BlipImageProcessor`]):
图像处理器,用于图像的预处理和后处理
ctx_begin_pos (int, `optional`, defaults to 2):
文本编码器中上下文标记的位置
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "qformer->text_encoder->unet->vae"
def __init__(
self,
tokenizer: CLIPTokenizer,
text_encoder: ContextCLIPTextModel,
vae: AutoencoderKL,
unet: UNet2DConditionModel,
scheduler: PNDMScheduler,
qformer: Blip2QFormerModel,
image_processor: BlipImageProcessor,
ctx_begin_pos: int = 2,
mean: List[float] = None,
std: List[float] = None,
):
# 调用父类构造函数
super().__init__()
# 注册模块,包括分词器、文本编码器、VAE、U-Net、调度器、QFormer 和图像处理器
self.register_modules(
tokenizer=tokenizer,
text_encoder=text_encoder,
vae=vae,
unet=unet,
scheduler=scheduler,
qformer=qformer,
image_processor=image_processor,
)
# 将上下文开始位置、均值和标准差注册到配置中
self.register_to_config(ctx_begin_pos=ctx_begin_pos, mean=mean, std=std)
# 获取查询嵌入的方法,输入图像和源主题
def get_query_embeddings(self, input_image, src_subject):
# 使用 QFormer 获取图像输入和文本输入的嵌入,返回字典
return self.qformer(image_input=input_image, text_input=src_subject, return_dict=False)
# 从原始 Blip Diffusion 代码复制,指定目标主题并通过重复增强提示
def _build_prompt(self, prompts, tgt_subjects, prompt_strength=1.0, prompt_reps=20):
# 初始化一个空列表,用于存放构建的提示
rv = []
# 遍历每个提示和目标主题
for prompt, tgt_subject in zip(prompts, tgt_subjects):
# 构建包含目标主题的提示
prompt = f"a {tgt_subject} {prompt.strip()}"
# 一个技巧来增强提示的效果
rv.append(", ".join([prompt] * int(prompt_strength * prompt_reps)))
# 返回构建的提示列表
return rv
# 从 diffusers.pipelines.consistency_models.pipeline_consistency_models.ConsistencyModelPipeline.prepare_latents 复制的代码
# 准备潜在变量,包含批量大小、通道数、高度和宽度等参数
def prepare_latents(self, batch_size, num_channels, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状
shape = (batch_size, num_channels, height, width)
# 检查生成器是否为列表且长度与批量大小不匹配,抛出值错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在变量,则生成新的随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将提供的潜在变量转移到指定设备和数据类型
latents = latents.to(device=device, dtype=dtype)
# 将初始噪声缩放到调度器所需的标准差
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 编码提示,生成文本嵌入
def encode_prompt(self, query_embeds, prompt, device=None):
# 如果未指定设备,则使用执行设备
device = device or self._execution_device
# 获取最大长度,考虑查询嵌入的上下文
max_len = self.text_encoder.text_model.config.max_position_embeddings
max_len -= self.qformer.config.num_query_tokens
# 将提示进行分词处理,并调整为最大长度
tokenized_prompt = self.tokenizer(
prompt,
padding="max_length",
truncation=True,
max_length=max_len,
return_tensors="pt",
).to(device)
# 获取查询嵌入的批量大小
batch_size = query_embeds.shape[0]
# 为每个样本设置上下文起始位置
ctx_begin_pos = [self.config.ctx_begin_pos] * batch_size
# 使用文本编码器获取文本嵌入
text_embeddings = self.text_encoder(
input_ids=tokenized_prompt.input_ids,
ctx_embeddings=query_embeds,
ctx_begin_pos=ctx_begin_pos,
)[0]
# 返回生成的文本嵌入
return text_embeddings
# 禁用梯度计算,并替换示例文档字符串
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 定义调用方法的输入参数,包括提示、参考图像等
prompt: List[str],
reference_image: PIL.Image.Image,
source_subject_category: List[str],
target_subject_category: List[str],
latents: Optional[torch.Tensor] = None,
guidance_scale: float = 7.5,
height: int = 512,
width: int = 512,
num_inference_steps: int = 50,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
neg_prompt: Optional[str] = "",
prompt_strength: float = 1.0,
prompt_reps: int = 20,
output_type: Optional[str] = "pil",
return_dict: bool = True,
.\diffusers\pipelines\blip_diffusion\__init__.py
# 从 dataclasses 模块导入 dataclass 装饰器,用于简化数据类的定义
from dataclasses import dataclass
# 从 typing 模块导入 List、Optional 和 Union 类型注解
from typing import List, Optional, Union
# 导入 numpy 库并简化为 np
import numpy as np
# 导入 PIL 库
import PIL
# 从 PIL 中导入 Image 类,用于处理图像
from PIL import Image
# 从上层模块导入可选依赖检查和可用性函数
from ...utils import OptionalDependencyNotAvailable, is_torch_available, is_transformers_available
# 尝试检查 Transformers 和 Torch 库是否可用
try:
# 如果任一库不可用,则抛出可选依赖不可用异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用异常
except OptionalDependencyNotAvailable:
# 从 dummy 模块导入 ShapEPipeline 类作为替代
from ...utils.dummy_torch_and_transformers_objects import ShapEPipeline
# 如果两个库都可用,则导入相关模型和处理器
else:
# 从 blip_image_processing 模块导入 BlipImageProcessor 类
from .blip_image_processing import BlipImageProcessor
# 从 modeling_blip2 模块导入 Blip2QFormerModel 类
from .modeling_blip2 import Blip2QFormerModel
# 从 modeling_ctx_clip 模块导入 ContextCLIPTextModel 类
from .modeling_ctx_clip import ContextCLIPTextModel
# 从 pipeline_blip_diffusion 模块导入 BlipDiffusionPipeline 类
from .pipeline_blip_diffusion import BlipDiffusionPipeline
.\diffusers\pipelines\cogvideo\pipeline_cogvideox.py
# 版权声明,表明此文件的所有权和使用许可
# Copyright 2024 The CogVideoX team, Tsinghua University & ZhipuAI and The HuggingFace Team.
# All rights reserved.
#
# 根据 Apache 2.0 许可证许可,使用条款
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 你可以在以下地址获得许可证副本
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律要求或书面协议另有约定,否则此文件以“按原样”方式分发,不提供任何明示或暗示的担保或条件。
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入用于检查函数和方法的模块
import inspect
# 导入数学库以使用数学函数
import math
# 从 typing 模块导入类型注释工具
from typing import Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 T5 编码器模型和分词器
from transformers import T5EncoderModel, T5Tokenizer
# 从相对路径导入回调相关的类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 从相对路径导入模型相关的类
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel
# 从相对路径导入获取 3D 旋转位置嵌入的函数
from ...models.embeddings import get_3d_rotary_pos_embed
# 从相对路径导入扩散管道工具
from ...pipelines.pipeline_utils import DiffusionPipeline
# 从相对路径导入调度器
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler
# 从相对路径导入日志工具和替换示例文档字符串的函数
from ...utils import logging, replace_example_docstring
# 从相对路径导入生成随机张量的工具
from ...utils.torch_utils import randn_tensor
# 从相对路径导入视频处理器
from ...video_processor import VideoProcessor
# 从当前包导入管道输出相关的类
from .pipeline_output import CogVideoXPipelineOutput
# 创建一个日志记录器,用于记录当前模块的信息
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,展示如何使用该模块的功能
EXAMPLE_DOC_STRING = """
Examples:
```python
>>> import torch
>>> from diffusers import CogVideoXPipeline
>>> from diffusers.utils import export_to_video
>>> # 模型: "THUDM/CogVideoX-2b" 或 "THUDM/CogVideoX-5b"
>>> pipe = CogVideoXPipeline.from_pretrained("THUDM/CogVideoX-2b", torch_dtype=torch.float16).to("cuda")
>>> prompt = (
... "一只穿着小红外套和小帽子的熊猫,坐在宁静的竹林中的木凳上。"
... "熊猫的毛茸茸的爪子拨动着微型木吉他,演奏出柔和的旋律。附近,几只其他的熊猫好奇地聚集,"
... "有些还在节奏中鼓掌。阳光透过高大的竹子,洒下柔和的光辉,"
... "照亮了这个场景。熊猫的脸上流露出专注和快乐,随着音乐的演奏而展现。"
... "背景中有一条小溪流和生机勃勃的绿叶,增强了这个独特音乐表演的宁静和魔幻气氛。"
... )
>>> video = pipe(prompt=prompt, guidance_scale=6, num_inference_steps=50).frames[0]
>>> export_to_video(video, "output.mp4", fps=8)
```py
"""
# 定义一个函数,用于计算调整大小和裁剪区域,以适应网格
# 该函数类似于 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.get_resize_crop_region_for_grid
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
# 目标宽度赋值给变量 tw
tw = tgt_width
# 目标高度赋值给变量 th
th = tgt_height
# 从源图像的尺寸中提取高度和宽度
h, w = src
# 计算源图像的高宽比
r = h / w
# 检查缩放比例 r 是否大于给定阈值 th 和 tw 的比值
if r > (th / tw):
# 如果是,则设定新的高度为 th
resize_height = th
# 计算对应的宽度,保持宽高比
resize_width = int(round(th / h * w))
else:
# 否则,设定新的宽度为 tw
resize_width = tw
# 计算对应的高度,保持宽高比
resize_height = int(round(tw / w * h))
# 计算裁剪的上边缘位置,以居中显示
crop_top = int(round((th - resize_height) / 2.0))
# 计算裁剪的左边缘位置,以居中显示
crop_left = int(round((tw - resize_width) / 2.0))
# 返回裁剪区域的坐标,包含左上角和右下角
return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 中复制
def retrieve_timesteps(
# 调度器对象,用于获取时间步
scheduler,
# 用于生成样本的推理步骤数(可选)
num_inference_steps: Optional[int] = None,
# 指定设备(可选),可以是字符串或 torch.device
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步列表(可选)
timesteps: Optional[List[int]] = None,
# 自定义 sigma 列表(可选)
sigmas: Optional[List[float]] = None,
# 额外的关键字参数,传递给调度器的 set_timesteps 方法
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法并从调度器中检索时间步。处理自定义时间步。
任何关键字参数都将传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
用于获取时间步的调度器。
num_inference_steps (`int`):
在生成样本时使用的扩散步骤数。如果使用,`timesteps` 必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步移动到的设备。如果为 `None`,时间步不会被移动。
timesteps (`List[int]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义时间步。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,其中第一个元素是来自调度器的时间步安排,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了时间步和 sigma,若是则抛出错误
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传递了自定义时间步
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受时间步参数
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,则抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,传递自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器获取时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果传递了自定义 sigma
elif sigmas is not None:
# 检查调度器的 set_timesteps 方法是否接受 sigma 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,则抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,传递自定义 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器获取时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 否则,设置推理步骤数以及相关设备和额外参数
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器中的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤数
return timesteps, num_inference_steps
# 定义一个名为 CogVideoXPipeline 的类,继承自 DiffusionPipeline 类
class CogVideoXPipeline(DiffusionPipeline):
r"""
使用 CogVideoX 进行文本到视频生成的管道。
此模型继承自 [`DiffusionPipeline`]。有关库为所有管道实现的通用方法(例如下载或保存,运行在特定设备等),请查看超类文档。
参数:
vae ([`AutoencoderKL`]):
变分自编码器 (VAE) 模型,用于将视频编码和解码为潜在表示。
text_encoder ([`T5EncoderModel`]):
冻结的文本编码器。CogVideoX 使用
[T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);具体使用
[t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
tokenizer (`T5Tokenizer`):
类
[T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer) 的标记器。
transformer ([`CogVideoXTransformer3DModel`]):
一个文本条件的 `CogVideoXTransformer3DModel` 用于去噪编码的视频潜在。
scheduler ([`SchedulerMixin`]):
与 `transformer` 结合使用的调度器,用于去噪编码的视频潜在。
"""
# 定义可选组件的列表,初始化为空
_optional_components = []
# 定义模型 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->transformer->vae"
# 定义回调张量输入的列表
_callback_tensor_inputs = [
"latents", # 潜在张量输入
"prompt_embeds", # 提示嵌入张量输入
"negative_prompt_embeds", # 负提示嵌入张量输入
]
# 初始化函数,接受多个参数以构建管道
def __init__(
self,
tokenizer: T5Tokenizer, # T5 标记器实例
text_encoder: T5EncoderModel, # T5 文本编码器实例
vae: AutoencoderKLCogVideoX, # 变分自编码器实例
transformer: CogVideoXTransformer3DModel, # CogVideoX 3D 转换器实例
scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler], # 调度器实例,支持多种类型
):
# 调用超类的初始化函数
super().__init__()
# 注册模块,整合各个组件
self.register_modules(
tokenizer=tokenizer, text_encoder=text_encoder, vae=vae, transformer=transformer, scheduler=scheduler
)
# 根据 VAE 的配置计算空间缩放因子
self.vae_scale_factor_spatial = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
# 根据 VAE 的配置计算时间缩放因子
self.vae_scale_factor_temporal = (
self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
)
# 初始化视频处理器,使用空间缩放因子
self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)
# 定义获取 T5 提示嵌入的函数
def _get_t5_prompt_embeds(
self,
prompt: Union[str, List[str]] = None, # 输入的提示,可以是字符串或字符串列表
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量,默认为 1
max_sequence_length: int = 226, # 最大序列长度,默认为 226
device: Optional[torch.device] = None, # 设备类型,默认为 None
dtype: Optional[torch.dtype] = None, # 数据类型,默认为 None
# 处理输入参数,优先使用已设置的设备
):
device = device or self._execution_device
# 使用已定义的 dtype,默认取文本编码器的 dtype
dtype = dtype or self.text_encoder.dtype
# 如果输入 prompt 是字符串,则将其转换为列表
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取 prompt 的批大小
batch_size = len(prompt)
# 使用 tokenizer 处理 prompt,并返回张量格式的输入
text_inputs = self.tokenizer(
prompt,
padding="max_length", # 填充到最大长度
max_length=max_sequence_length, # 最大序列长度
truncation=True, # 超出部分截断
add_special_tokens=True, # 添加特殊标记
return_tensors="pt", # 返回 PyTorch 张量
)
# 提取输入 ID
text_input_ids = text_inputs.input_ids
# 获取未截断的输入 ID
untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 检查是否需要警告用户输入被截断
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被移除的文本并记录警告
removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}"
)
# 获取文本输入的嵌入表示
prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
# 转换嵌入的 dtype 和 device
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 为每个生成的提示重复文本嵌入,使用适合 MPS 的方法
_, seq_len, _ = prompt_embeds.shape
# 重复嵌入以匹配视频生成数量
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
# 调整嵌入的形状以符合批处理
prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)
# 返回最终的文本嵌入
return prompt_embeds
# 定义编码提示的函数
def encode_prompt(
self,
# 输入的提示,可以是字符串或字符串列表
prompt: Union[str, List[str]],
# 可选的负提示
negative_prompt: Optional[Union[str, List[str]]] = None,
# 控制分类器自由引导的开关
do_classifier_free_guidance: bool = True,
# 每个提示生成的视频数量
num_videos_per_prompt: int = 1,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 最大序列长度
max_sequence_length: int = 226,
# 可选的设备
device: Optional[torch.device] = None,
# 可选的数据类型
dtype: Optional[torch.dtype] = None,
# 准备潜在变量的函数
def prepare_latents(
self, batch_size, num_channels_latents, num_frames, height, width, dtype, device, generator, latents=None
):
# 定义形状元组,包含批次大小、帧数、通道数、高度和宽度
shape = (
batch_size,
(num_frames - 1) // self.vae_scale_factor_temporal + 1, # 计算处理后的帧数
num_channels_latents, # 潜在通道数
height // self.vae_scale_factor_spatial, # 根据空间缩放因子调整高度
width // self.vae_scale_factor_spatial, # 根据空间缩放因子调整宽度
)
# 检查生成器是否是列表,且长度与批次大小不匹配
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出值错误,说明生成器列表长度与批次大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量为 None,则生成随机潜在变量
if latents is None:
# 使用给定形状生成随机张量,指定生成器、设备和数据类型
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果潜在变量不为 None,则将其移动到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 解码潜在变量,返回解码后的帧
def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
# 重新排列潜在变量的维度,以适应解码器的输入格式
latents = latents.permute(0, 2, 1, 3, 4) # [batch_size, num_channels, num_frames, height, width]
# 根据缩放因子调整潜在变量的值
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量并获取样本帧
frames = self.vae.decode(latents).sample
# 返回解码后的帧
return frames
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因为并非所有调度器的签名相同
# eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
# 应在 [0, 1] 范围内
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤参数字典
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 从 diffusers.pipelines.latte.pipeline_latte.LattePipeline.check_inputs 复制
def check_inputs(
self,
prompt, # 输入的提示文本
height, # 生成图像的高度
width, # 生成图像的宽度
negative_prompt, # 负提示文本,用于引导生成
callback_on_step_end_tensor_inputs, # 每步结束时的回调,用于处理张量输入
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负提示嵌入
):
# 检查高度和宽度是否能被8整除,若不能则抛出错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调输入是否不为空且是否都在已注册的回调输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了提示和提示嵌入,若是则抛出错误
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否同时未提供提示和提示嵌入,若是则抛出错误
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示的类型是否为字符串或列表,若不是则抛出错误
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了提示和负提示嵌入,若是则抛出错误
if prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查是否同时提供了负提示和负提示嵌入,若是则抛出错误
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入是否都不为空,且它们的形状是否相同,若不同则抛出错误
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
def fuse_qkv_projections(self) -> None:
# 启用融合 QKV 投影
r"""Enables fused QKV projections."""
self.fusing_transformer = True
# 调用变换器进行 QKV 投影的融合
self.transformer.fuse_qkv_projections()
def unfuse_qkv_projections(self) -> None:
# 禁用 QKV 投影融合(如果已启用)
r"""Disable QKV projection fusion if enabled."""
# 如果没有启用融合,则记录警告信息
if not self.fusing_transformer:
logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
else:
# 调用变换器进行 QKV 投影的取消融合
self.transformer.unfuse_qkv_projections()
# 更新状态为未融合
self.fusing_transformer = False
# 准备旋转位置嵌入的函数
def _prepare_rotary_positional_embeddings(
self,
height: int, # 输入的高度
width: int, # 输入的宽度
num_frames: int, # 输入的帧数
device: torch.device, # 计算设备(如CPU或GPU)
) -> Tuple[torch.Tensor, torch.Tensor]: # 返回两个张量的元组
# 根据 VAE 缩放因子和变换器的补丁大小计算网格高度
grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 根据 VAE 缩放因子和变换器的补丁大小计算网格宽度
grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础宽度大小
base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础高度大小
base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 获取网格的裁剪区域坐标
grid_crops_coords = get_resize_crop_region_for_grid(
(grid_height, grid_width), base_size_width, base_size_height
)
# 获取三维旋转位置嵌入的余弦和正弦频率
freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
embed_dim=self.transformer.config.attention_head_dim, # 嵌入维度
crops_coords=grid_crops_coords, # 裁剪坐标
grid_size=(grid_height, grid_width), # 网格大小
temporal_size=num_frames, # 时间维度大小
use_real=True, # 是否使用实数
)
# 将余弦频率移动到指定设备
freqs_cos = freqs_cos.to(device=device)
# 将正弦频率移动到指定设备
freqs_sin = freqs_sin.to(device=device)
# 返回余弦和正弦频率
return freqs_cos, freqs_sin
# 获取指导缩放比例的属性
@property
def guidance_scale(self):
return self._guidance_scale
# 获取时间步数的属性
@property
def num_timesteps(self):
return self._num_timesteps
# 获取中断状态的属性
@property
def interrupt(self):
return self._interrupt
# 关闭梯度计算并替换文档字符串
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义调用方法
def __call__(
self,
prompt: Optional[Union[str, List[str]]] = None, # 输入提示
negative_prompt: Optional[Union[str, List[str]]] = None, # 负面提示
height: int = 480, # 默认高度
width: int = 720, # 默认宽度
num_frames: int = 49, # 默认帧数
num_inference_steps: int = 50, # 默认推理步骤
timesteps: Optional[List[int]] = None, # 可选的时间步
guidance_scale: float = 6, # 默认指导缩放比例
use_dynamic_cfg: bool = False, # 是否使用动态配置
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量
eta: float = 0.0, # 控制噪声的参数
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器
latents: Optional[torch.FloatTensor] = None, # 可选的潜变量
prompt_embeds: Optional[torch.FloatTensor] = None, # 提示嵌入
negative_prompt_embeds: Optional[torch.FloatTensor] = None, # 负面提示嵌入
output_type: str = "pil", # 输出类型
return_dict: bool = True, # 是否返回字典格式
callback_on_step_end: Optional[ # 步骤结束时的回调
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"], # 步骤结束时的张量输入
max_sequence_length: int = 226, # 最大序列长度
.\diffusers\pipelines\cogvideo\pipeline_cogvideox_image2video.py
# 版权声明,说明文件的版权归属及使用许可
# Copyright 2024 The CogVideoX team, Tsinghua University & ZhipuAI and The HuggingFace Team.
# All rights reserved.
#
# 授权条款,说明使用该文件的条件
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 允许用户在遵守许可的情况下使用该文件
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非另有协议,否则软件以“原样”方式分发,不提供任何明示或暗示的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以了解权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入所需的模块
import inspect # 用于获取对象的活跃信息
import math # 提供数学函数
from typing import Callable, Dict, List, Optional, Tuple, Union # 类型提示的支持
# 导入图像处理库
import PIL # 图像处理库
import torch # 深度学习框架
from transformers import T5EncoderModel, T5Tokenizer # 导入 T5 模型和分词器
# 导入自定义回调和处理器
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 回调相关
from ...image_processor import PipelineImageInput # 图像输入处理器
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel # 模型定义
from ...models.embeddings import get_3d_rotary_pos_embed # 获取 3D 旋转位置嵌入
from ...pipelines.pipeline_utils import DiffusionPipeline # 扩散管道
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler # 调度器
from ...utils import (
logging, # 日志工具
replace_example_docstring, # 替换示例文档字符串
)
from ...utils.torch_utils import randn_tensor # 随机张量生成工具
from ...video_processor import VideoProcessor # 视频处理器
from .pipeline_output import CogVideoXPipelineOutput # 管道输出定义
# 创建日志记录器
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,提供使用示例
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import CogVideoXImageToVideoPipeline
>>> from diffusers.utils import export_to_video, load_image
>>> pipe = CogVideoXImageToVideoPipeline.from_pretrained("THUDM/CogVideoX-5b-I2V", torch_dtype=torch.bfloat16) # 从预训练模型创建管道
>>> pipe.to("cuda") # 将管道移动到 GPU
>>> prompt = "An astronaut hatching from an egg, on the surface of the moon, the darkness and depth of space realised in the background. High quality, ultrarealistic detail and breath-taking movie-like camera shot." # 定义生成视频的提示
>>> image = load_image( # 加载输入图像
... "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/astronaut.jpg"
... )
>>> video = pipe(image, prompt, use_dynamic_cfg=True) # 生成视频
>>> export_to_video(video.frames[0], "output.mp4", fps=8) # 导出生成的视频
```py
"""
# 定义调整图像大小和裁剪区域的函数
# 类似于 diffusers.pipelines.hunyuandit.pipeline_hunyuandit.get_resize_crop_region_for_grid
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
tw = tgt_width # 目标宽度
th = tgt_height # 目标高度
h, w = src # 源图像的高度和宽度
r = h / w # 计算源图像的纵横比
# 根据纵横比决定调整后的高度和宽度
if r > (th / tw):
resize_height = th # 设置调整后的高度为目标高度
resize_width = int(round(th / h * w)) # 根据比例计算调整后的宽度
else:
resize_width = tw # 设置调整后的宽度为目标宽度
resize_height = int(round(tw / w * h)) # 根据比例计算调整后的高度
# 计算裁剪区域的起始位置
crop_top = int(round((th - resize_height) / 2.0)) # 上边裁剪位置
crop_left = int(round((tw - resize_width) / 2.0)) # 左边裁剪位置
# 返回裁剪区域的起始和结束坐标
return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制而来
def retrieve_timesteps(
# 调度器对象
scheduler,
# 推理步骤的数量(可选)
num_inference_steps: Optional[int] = None,
# 设备信息(可选)
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步(可选)
timesteps: Optional[List[int]] = None,
# 自定义 sigma 值(可选)
sigmas: Optional[List[float]] = None,
# 其他关键字参数
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法并在调用后从调度器检索时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
用于获取时间步的调度器。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步应移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`, *可选*):
自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递了 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
自定义 sigma,用于覆盖调度器的时间步间隔策略。如果传递了 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了自定义时间步和 sigma
if timesteps is not None and sigmas is not None:
raise ValueError("只能传递 `timesteps` 或 `sigmas` 中的一个。请选择一个设置自定义值")
# 如果传递了自定义时间步
if timesteps is not None:
# 检查调度器是否接受时间步参数
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则抛出错误
if not accepts_timesteps:
raise ValueError(
f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
f" 时间步计划。请检查您是否使用了正确的调度器。"
)
# 调用调度器的 `set_timesteps` 方法设置自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器获取设置的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果传递了自定义 sigma
elif sigmas is not None:
# 检查调度器是否接受 sigma 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则抛出错误
if not accept_sigmas:
raise ValueError(
f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
f" sigma 计划。请检查您是否使用了正确的调度器。"
)
# 调用调度器的 `set_timesteps` 方法设置自定义 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器获取设置的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果不是前一个条件的情况,执行以下代码
else:
# 设置推理步骤数,并指定设备和其他关键字参数
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取当前调度器的时间步长
timesteps = scheduler.timesteps
# 返回时间步长和推理步骤数
return timesteps, num_inference_steps
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的代码
def retrieve_latents(
# 输入的编码器输出,类型为 torch.Tensor
encoder_output: torch.Tensor,
# 可选的随机数生成器,用于采样
generator: Optional[torch.Generator] = None,
# 采样模式,默认为 "sample"
sample_mode: str = "sample"
):
# 检查 encoder_output 是否有 latent_dist 属性且模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从 latent_dist 中采样并返回结果
return encoder_output.latent_dist.sample(generator)
# 检查 encoder_output 是否有 latent_dist 属性且模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的模式
return encoder_output.latent_dist.mode()
# 检查 encoder_output 是否有 latents 属性
elif hasattr(encoder_output, "latents"):
# 返回 encoder_output 中的 latents
return encoder_output.latents
# 如果以上条件都不满足,抛出 AttributeError
else:
raise AttributeError("Could not access latents of provided encoder_output")
class CogVideoXImageToVideoPipeline(DiffusionPipeline):
r"""
使用 CogVideoX 的图像到视频生成的管道。
该模型继承自 [`DiffusionPipeline`]。请查看父类文档以获取库实现的通用方法
(例如下载或保存,运行在特定设备等)。
参数:
vae ([`AutoencoderKL`]):
变分自编码器(VAE)模型,用于将视频编码和解码为潜在表示。
text_encoder ([`T5EncoderModel`]):
冻结的文本编码器。CogVideoX 使用
[T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);特别是
[t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
tokenizer (`T5Tokenizer`):
类的分词器
[T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer)。
transformer ([`CogVideoXTransformer3DModel`]):
一个文本条件的 `CogVideoXTransformer3DModel`,用于去噪编码的视频潜在。
scheduler ([`SchedulerMixin`]):
一个调度器,结合 `transformer` 用于去噪编码的视频潜在。
"""
# 可选组件列表,初始化为空
_optional_components = []
# 指定 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->transformer->vae"
# 需要回调的张量输入列表
_callback_tensor_inputs = [
# 潜在张量
"latents",
# 提示嵌入
"prompt_embeds",
# 负面提示嵌入
"negative_prompt_embeds",
]
def __init__(
# 初始化方法的参数:分词器
self,
tokenizer: T5Tokenizer,
# 文本编码器
text_encoder: T5EncoderModel,
# VAE 模型
vae: AutoencoderKLCogVideoX,
# 变换模型
transformer: CogVideoXTransformer3DModel,
# 调度器
scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler],
):
# 调用父类的构造函数以初始化基类部分
super().__init__()
# 注册各个模块,传入相关参数
self.register_modules(
tokenizer=tokenizer, # 注册分词器
text_encoder=text_encoder, # 注册文本编码器
vae=vae, # 注册变分自编码器
transformer=transformer, # 注册变换器
scheduler=scheduler, # 注册调度器
)
# 计算空间缩放因子,默认值为8
self.vae_scale_factor_spatial = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
# 获取时间压缩比,如果 VAE 存在则使用其配置
self.vae_scale_factor_temporal = (
self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
)
# 创建视频处理器,使用空间缩放因子
self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._get_t5_prompt_embeds 复制而来
def _get_t5_prompt_embeds(
self,
prompt: Union[str, List[str]] = None, # 输入提示,支持单个字符串或字符串列表
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量
max_sequence_length: int = 226, # 最大序列长度
device: Optional[torch.device] = None, # 设备类型,默认为 None
dtype: Optional[torch.dtype] = None, # 数据类型,默认为 None
):
# 如果未指定设备,则使用执行设备
device = device or self._execution_device
# 如果未指定数据类型,则使用文本编码器的数据类型
dtype = dtype or self.text_encoder.dtype
# 如果提示是字符串,则将其转为列表
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取批处理大小
batch_size = len(prompt)
# 对提示进行编码,返回张量,填充到最大长度
text_inputs = self.tokenizer(
prompt,
padding="max_length", # 填充到最大长度
max_length=max_sequence_length, # 最大长度限制
truncation=True, # 允许截断
add_special_tokens=True, # 添加特殊标记
return_tensors="pt", # 返回 PyTorch 张量
)
# 获取编码后的输入 ID
text_input_ids = text_inputs.input_ids
# 获取未截断的输入 ID
untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 如果未截断的 ID 长度大于等于文本输入 ID 长度且两者不相等,则进行警告
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的文本部分并记录警告
removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}" # 输出截断的提示文本
)
# 获取提示的嵌入表示
prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
# 转换嵌入为指定的数据类型和设备
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 复制文本嵌入以生成每个提示的视频,使用适合 MPS 的方法
_, seq_len, _ = prompt_embeds.shape # 获取嵌入的形状
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1) # 重复嵌入以适应视频数量
prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1) # 变形为合适的形状
# 返回处理后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.encode_prompt 复制而来
# 定义一个用于编码提示信息的函数,接受多种参数
def encode_prompt(
self,
prompt: Union[str, List[str]], # 输入的提示,可以是字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负提示,类似格式
do_classifier_free_guidance: bool = True, # 是否启用无分类器引导
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入
max_sequence_length: int = 226, # 最大序列长度
device: Optional[torch.device] = None, # 指定的设备类型
dtype: Optional[torch.dtype] = None, # 指定的数据类型
def prepare_latents(
self,
image: torch.Tensor, # 输入图像的张量
batch_size: int = 1, # 每批次的样本数量
num_channels_latents: int = 16, # 潜在变量的通道数
num_frames: int = 13, # 视频的帧数
height: int = 60, # 图像的高度
width: int = 90, # 图像的宽度
dtype: Optional[torch.dtype] = None, # 指定的数据类型
device: Optional[torch.device] = None, # 指定的设备类型
generator: Optional[torch.Generator] = None, # 随机数生成器
latents: Optional[torch.Tensor] = None, # 可选的潜在变量张量
):
# 计算有效的帧数,以适应 VAE 的时间缩放因子
num_frames = (num_frames - 1) // self.vae_scale_factor_temporal + 1
# 定义张量的形状,包括批次、帧数和空间维度
shape = (
batch_size,
num_frames,
num_channels_latents,
height // self.vae_scale_factor_spatial,
width // self.vae_scale_factor_spatial,
)
# 检查生成器列表的长度是否与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 在图像张量中插入一个维度,以适配后续处理
image = image.unsqueeze(2) # [B, C, F, H, W]
# 如果生成器是列表,逐个处理每个图像
if isinstance(generator, list):
image_latents = [
retrieve_latents(self.vae.encode(image[i].unsqueeze(0)), generator[i]) for i in range(batch_size)
]
else:
# 使用单一生成器处理所有图像
image_latents = [retrieve_latents(self.vae.encode(img.unsqueeze(0)), generator) for img in image]
# 合并图像潜在变量,并调整维度
image_latents = torch.cat(image_latents, dim=0).to(dtype).permute(0, 2, 1, 3, 4) # [B, F, C, H, W]
# 按比例缩放图像潜在变量
image_latents = self.vae.config.scaling_factor * image_latents
# 定义潜在变量的填充形状
padding_shape = (
batch_size,
num_frames - 1,
num_channels_latents,
height // self.vae_scale_factor_spatial,
width // self.vae_scale_factor_spatial,
)
# 创建填充张量
latent_padding = torch.zeros(padding_shape, device=device, dtype=dtype)
# 将填充与图像潜在变量合并
image_latents = torch.cat([image_latents, latent_padding], dim=1)
# 如果没有提供潜在变量,则生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将提供的潜在变量移动到指定设备
latents = latents.to(device)
# 按照调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回潜在变量和图像潜在变量
return latents, image_latents
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.decode_latents 复制的代码
# 解码潜在变量并返回张量格式的帧
def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
# 重新排列潜在变量的维度为 [batch_size, num_channels, num_frames, height, width]
latents = latents.permute(0, 2, 1, 3, 4) # [batch_size, num_channels, num_frames, height, width]
# 将潜在变量缩放为 VAE 配置中的因子
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量并获取采样帧
frames = self.vae.decode(latents).sample
# 返回解码得到的帧
return frames
# 从 diffusers.pipelines.animatediff.pipeline_animatediff_video2video 导入的方法
def get_timesteps(self, num_inference_steps, timesteps, strength, device):
# 根据 init_timestep 获取原始时间步
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算时间步的起始位置,确保不小于零
t_start = max(num_inference_steps - init_timestep, 0)
# 根据调度器的顺序获取相关时间步
timesteps = timesteps[t_start * self.scheduler.order :]
# 返回过滤后的时间步和剩余的推理步骤数
return timesteps, num_inference_steps - t_start
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 导入的方法
def prepare_extra_step_kwargs(self, generator, eta):
# 准备额外的调度器步骤参数,因为不同调度器的参数不尽相同
# eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应 DDIM 论文中的 η,范围应在 [0, 1] 之间
# 检查调度器是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果调度器接受 eta,则将其添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator,则将其添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 检查输入参数的有效性
def check_inputs(
self,
image,
prompt,
height,
width,
negative_prompt,
callback_on_step_end_tensor_inputs,
video=None,
latents=None,
prompt_embeds=None,
negative_prompt_embeds=None,
):
# 检查 image 是否是合法类型:torch.Tensor、PIL.Image.Image 或 list
if (
not isinstance(image, torch.Tensor) # 如果 image 不是 torch.Tensor
and not isinstance(image, PIL.Image.Image) # 并且不是 PIL.Image.Image
and not isinstance(image, list) # 并且不是 list
):
# 抛出类型错误,提示 image 的类型不正确
raise ValueError(
"`image` has to be of type `torch.Tensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is"
f" {type(image)}" # 显示当前 image 的类型
)
# 检查 height 和 width 是否能被 8 整除
if height % 8 != 0 or width % 8 != 0:
# 抛出值错误,提示 height 和 width 不符合要求
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调输入是否存在,并且是否全在 _callback_tensor_inputs 中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs # 确保每个 k 都在 _callback_tensor_inputs 中
):
# 抛出值错误,提示回调输入不符合要求
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查 prompt 和 prompt_embeds 是否同时存在
if prompt is not None and prompt_embeds is not None:
# 抛出值错误,提示不能同时提供 prompt 和 prompt_embeds
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都为 None
elif prompt is None and prompt_embeds is None:
# 抛出值错误,提示至少提供一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 是否为合法类型
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 抛出值错误,提示 prompt 的类型不正确
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查 prompt 和 negative_prompt_embeds 是否同时存在
if prompt is not None and negative_prompt_embeds is not None:
# 抛出值错误,提示不能同时提供 prompt 和 negative_prompt_embeds
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 negative_prompt 和 negative_prompt_embeds 是否同时存在
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出值错误,提示不能同时提供 negative_prompt 和 negative_prompt_embeds
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 是否都存在
if prompt_embeds is not None and negative_prompt_embeds is not None:
# 检查它们的形状是否一致
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 抛出值错误,提示它们的形状不匹配
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 检查 video 和 latents 是否同时存在
if video is not None and latents is not None:
# 抛出值错误,提示只能提供一个
raise ValueError("Only one of `video` or `latents` should be provided")
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.fuse_qkv_projections 复制而来
# 定义一个启用融合 QKV 投影的方法,不返回任何值
def fuse_qkv_projections(self) -> None:
# 方法的文档字符串,描述其功能
r"""Enables fused QKV projections."""
# 设置属性 fusing_transformer 为 True,表示启用融合
self.fusing_transformer = True
# 调用 transformer 对象的方法,进行 QKV 投影融合
self.transformer.fuse_qkv_projections()
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.unfuse_qkv_projections 复制的方法
# 定义一个禁用 QKV 投影融合的方法,不返回任何值
def unfuse_qkv_projections(self) -> None:
# 方法的文档字符串,描述其功能
r"""Disable QKV projection fusion if enabled."""
# 检查属性 fusing_transformer 是否为 False
if not self.fusing_transformer:
# 如果没有融合,记录警告日志
logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
else:
# 调用 transformer 对象的方法,解除 QKV 投影的融合
self.transformer.unfuse_qkv_projections()
# 设置属性 fusing_transformer 为 False,表示禁用融合
self.fusing_transformer = False
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._prepare_rotary_positional_embeddings 复制的方法
# 定义一个准备旋转位置嵌入的方法,返回两个张量
def _prepare_rotary_positional_embeddings(
self,
height: int,
width: int,
num_frames: int,
device: torch.device,
) -> Tuple[torch.Tensor, torch.Tensor]:
# 根据输入高度和宽度,计算网格的高度
grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 根据输入高度和宽度,计算网格的宽度
grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础宽度,固定为 720
base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础高度,固定为 480
base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 获取网格裁剪区域的坐标
grid_crops_coords = get_resize_crop_region_for_grid(
(grid_height, grid_width), base_size_width, base_size_height
)
# 获取旋转位置嵌入的余弦和正弦频率
freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
embed_dim=self.transformer.config.attention_head_dim,
crops_coords=grid_crops_coords,
grid_size=(grid_height, grid_width),
temporal_size=num_frames,
)
# 将余弦频率张量转移到指定设备
freqs_cos = freqs_cos.to(device=device)
# 将正弦频率张量转移到指定设备
freqs_sin = freqs_sin.to(device=device)
# 返回余弦和正弦频率张量
return freqs_cos, freqs_sin
# 定义一个属性,获取指导尺度的值
@property
def guidance_scale(self):
# 返回私有属性 _guidance_scale 的值
return self._guidance_scale
# 定义一个属性,获取时间步数的值
@property
def num_timesteps(self):
# 返回私有属性 _num_timesteps 的值
return self._num_timesteps
# 定义一个属性,获取中断状态的值
@property
def interrupt(self):
# 返回私有属性 _interrupt 的值
return self._interrupt
# 采用无梯度上下文装饰器,避免计算梯度
@torch.no_grad()
# 替换示例文档字符串的装饰器
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用方法,允许实例像函数一样被调用
def __call__(
self,
image: PipelineImageInput, # 输入图像,类型为PipelineImageInput
prompt: Optional[Union[str, List[str]]] = None, # 提示文本,可以是字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None, # 负面提示文本,可以是字符串或字符串列表
height: int = 480, # 输出图像的高度,默认为480
width: int = 720, # 输出图像的宽度,默认为720
num_frames: int = 49, # 生成的视频帧数,默认为49
num_inference_steps: int = 50, # 推理步骤的数量,默认为50
timesteps: Optional[List[int]] = None, # 可选的时间步列表
guidance_scale: float = 6, # 引导尺度,影响生成图像的质量,默认为6
use_dynamic_cfg: bool = False, # 是否使用动态配置,默认为False
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量,默认为1
eta: float = 0.0, # 影响采样过程的参数,默认为0.0
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 可选的随机数生成器
latents: Optional[torch.FloatTensor] = None, # 可选的潜在变量,类型为浮点张量
prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的提示嵌入,类型为浮点张量
negative_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的负面提示嵌入,类型为浮点张量
output_type: str = "pil", # 输出类型,默认为"PIL"格式
return_dict: bool = True, # 是否返回字典格式的结果,默认为True
callback_on_step_end: Optional[ # 在步骤结束时调用的可选回调函数
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"], # 在步骤结束时的张量输入列表,默认为["latents"]
max_sequence_length: int = 226, # 最大序列长度,默认为226
.\diffusers\pipelines\cogvideo\pipeline_cogvideox_video2video.py
# 版权声明,指明版权归 CogVideoX 团队、清华大学、ZhipuAI 和 HuggingFace 团队所有
# 所有权利保留
#
# 根据 Apache License 2.0(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件是以“原样”基础分发,
# 不提供任何形式的担保或条件,无论是明示或暗示的
# 有关许可证的特定权限和限制,请参见许可证
import inspect # 导入 inspect 模块,用于获取对象的信息
import math # 导入 math 模块,提供数学函数
from typing import Callable, Dict, List, Optional, Tuple, Union # 导入类型提示相关的类
import torch # 导入 PyTorch 库,进行深度学习
from PIL import Image # 从 PIL 库导入 Image,用于图像处理
from transformers import T5EncoderModel, T5Tokenizer # 导入 T5 模型及其分词器
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 导入回调相关类
from ...models import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel # 导入模型类
from ...models.embeddings import get_3d_rotary_pos_embed # 导入获取 3D 旋转位置嵌入的函数
from ...pipelines.pipeline_utils import DiffusionPipeline # 导入扩散管道类
from ...schedulers import CogVideoXDDIMScheduler, CogVideoXDPMScheduler # 导入调度器类
from ...utils import ( # 导入工具模块中的函数
logging, # 导入日志记录模块
replace_example_docstring, # 导入替换示例文档字符串的函数
)
from ...utils.torch_utils import randn_tensor # 从工具模块导入生成随机张量的函数
from ...video_processor import VideoProcessor # 导入视频处理器类
from .pipeline_output import CogVideoXPipelineOutput # 导入管道输出类
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器
EXAMPLE_DOC_STRING = """ # 示例文档字符串,展示用法
Examples: # 示例部分
```python # Python 代码块开始
>>> import torch # 导入 PyTorch 库
>>> from diffusers import CogVideoXDPMScheduler, CogVideoXVideoToVideoPipeline # 导入特定模块
>>> from diffusers.utils import export_to_video, load_video # 导入工具函数
>>> # 模型:可以选择 "THUDM/CogVideoX-2b" 或 "THUDM/CogVideoX-5b"
>>> pipe = CogVideoXVideoToVideoPipeline.from_pretrained("THUDM/CogVideoX-5b", torch_dtype=torch.bfloat16) # 加载预训练管道
>>> pipe.to("cuda") # 将管道移动到 GPU
>>> pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config) # 配置调度器
>>> input_video = load_video( # 加载输入视频
... "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/hiker.mp4" # 视频链接
... )
>>> prompt = ( # 定义生成视频的提示
... "An astronaut stands triumphantly at the peak of a towering mountain. Panorama of rugged peaks and "
... "valleys. Very futuristic vibe and animated aesthetic. Highlights of purple and golden colors in "
... "the scene. The sky is looks like an animated/cartoonish dream of galaxies, nebulae, stars, planets, "
... "moons, but the remainder of the scene is mostly realistic."
... )
>>> video = pipe( # 调用管道生成视频
... video=input_video, prompt=prompt, strength=0.8, guidance_scale=6, num_inference_steps=50 # 传入参数
... ).frames[0] # 获取生成的视频帧
>>> export_to_video(video, "output.mp4", fps=8) # 导出生成的视频
```py # Python 代码块结束
"""
# 根据源图像的大小和目标宽高计算缩放裁剪区域
def get_resize_crop_region_for_grid(src, tgt_width, tgt_height):
# 设置目标宽度和高度
tw = tgt_width
th = tgt_height
# 解构源图像的高度和宽度
h, w = src
# 计算源图像的宽高比
r = h / w
# 判断源图像的宽高比与目标宽高比的关系
if r > (th / tw):
# 如果源图像更高,则以目标高度缩放
resize_height = th
# 计算相应的宽度
resize_width = int(round(th / h * w))
else:
# 否则以目标宽度缩放
resize_width = tw
# 计算相应的高度
resize_height = int(round(tw / w * h))
# 计算裁剪区域的顶部坐标
crop_top = int(round((th - resize_height) / 2.0))
# 计算裁剪区域的左侧坐标
crop_left = int(round((tw - resize_width) / 2.0))
# 返回裁剪区域的坐标和调整后的尺寸
return (crop_top, crop_left), (crop_top + resize_height, crop_left + resize_width)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制的函数
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法并在调用后从调度器检索时间步。处理自定义时间步。任何额外参数将传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
用于获取时间步的调度器。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步应移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义时间步。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步计划,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了时间步和 sigma
if timesteps is not None and sigmas is not None:
raise ValueError("只能传递 `timesteps` 或 `sigmas` 之一。请选择一个设置自定义值")
# 如果传递了时间步
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps:
raise ValueError(
f"当前调度器类 {scheduler.__class__} 的 `set_timesteps` 不支持自定义"
f" 时间步计划。请检查您是否使用了正确的调度器。"
)
# 调用调度器的 set_timesteps 方法
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器中的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 检查 sigmas 是否不为 None,即是否提供了自定义 sigma 值
elif sigmas is not None:
# 检查当前调度器的 set_timesteps 方法是否接受 sigmas 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受 sigmas,抛出值错误异常,并提示用户
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的时间步长,使用提供的 sigmas、设备和其他参数
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取当前调度器的时间步长
timesteps = scheduler.timesteps
# 计算推理步骤的数量,即时间步长的长度
num_inference_steps = len(timesteps)
else:
# 如果没有提供 sigmas,使用推理步骤的数量设置调度器的时间步长
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取当前调度器的时间步长
timesteps = scheduler.timesteps
# 返回时间步长和推理步骤的数量
return timesteps, num_inference_steps
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img 模块复制的函数
def retrieve_latents(
encoder_output: torch.Tensor, # 输入参数,编码器输出,类型为 torch.Tensor
generator: Optional[torch.Generator] = None, # 可选的随机数生成器,用于采样
sample_mode: str = "sample" # 采样模式,默认为 "sample"
):
# 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从 latent_dist 中采样,并返回样本
return encoder_output.latent_dist.sample(generator)
# 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的众数(最可能的值)
return encoder_output.latent_dist.mode()
# 检查 encoder_output 是否具有 latents 属性
elif hasattr(encoder_output, "latents"):
# 返回 latents 属性
return encoder_output.latents
# 如果都不满足,则引发 AttributeError
else:
raise AttributeError("Could not access latents of provided encoder_output")
class CogVideoXVideoToVideoPipeline(DiffusionPipeline):
r"""
使用 CogVideoX 的视频到视频生成管道。
此模型继承自 [`DiffusionPipeline`]。请查看超类文档,以获取库实现的所有管道的通用方法
(例如下载或保存,运行在特定设备等)。
Args:
vae ([`AutoencoderKL`]):
用于将视频编码和解码到潜在表示的变分自编码器(VAE)模型。
text_encoder ([`T5EncoderModel`]):
冻结的文本编码器。CogVideoX 使用
[T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel);特别是
[t5-v1_1-xxl](https://huggingface.co/PixArt-alpha/PixArt-alpha/tree/main/t5-v1_1-xxl) 变体。
tokenizer (`T5Tokenizer`):
类的标记器
[T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer)。
transformer ([`CogVideoXTransformer3DModel`]):
一个文本条件的 `CogVideoXTransformer3DModel`,用于去噪编码的视频潜在表示。
scheduler ([`SchedulerMixin`]):
用于与 `transformer` 结合使用的调度器,以去噪编码的视频潜在表示。
"""
_optional_components = [] # 可选组件的列表,当前为空
model_cpu_offload_seq = "text_encoder->transformer->vae" # 模型的 CPU 卸载顺序
_callback_tensor_inputs = [ # 用于回调的张量输入列表
"latents", # 潜在表示
"prompt_embeds", # 提示嵌入
"negative_prompt_embeds", # 负面提示嵌入
]
def __init__(
self,
tokenizer: T5Tokenizer, # 标记器实例
text_encoder: T5EncoderModel, # 文本编码器实例
vae: AutoencoderKLCogVideoX, # VAE 实例
transformer: CogVideoXTransformer3DModel, # 转换器实例
scheduler: Union[CogVideoXDDIMScheduler, CogVideoXDPMScheduler], # 调度器实例,可为两种类型之一
# 初始化父类
):
super().__init__()
# 注册所需模块,包括tokenizer、text_encoder、vae、transformer和scheduler
self.register_modules(
tokenizer=tokenizer, text_encoder=text_encoder, vae=vae, transformer=transformer, scheduler=scheduler
)
# 计算空间的vae缩放因子,如果vae存在则根据块的输出通道数计算,否则默认为8
self.vae_scale_factor_spatial = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
# 计算时间的vae缩放因子,如果vae存在则使用其时间压缩比,否则默认为4
self.vae_scale_factor_temporal = (
self.vae.config.temporal_compression_ratio if hasattr(self, "vae") and self.vae is not None else 4
)
# 初始化视频处理器,使用空间的vae缩放因子
self.video_processor = VideoProcessor(vae_scale_factor=self.vae_scale_factor_spatial)
# 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._get_t5_prompt_embeds复制的方法
def _get_t5_prompt_embeds(
self,
prompt: Union[str, List[str]] = None,
num_videos_per_prompt: int = 1,
max_sequence_length: int = 226,
device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None,
):
# 设置执行设备,若未指定则使用默认执行设备
device = device or self._execution_device
# 设置数据类型,若未指定则使用text_encoder的数据类型
dtype = dtype or self.text_encoder.dtype
# 将输入的prompt转换为列表格式
prompt = [prompt] if isinstance(prompt, str) else prompt
# 计算批次大小
batch_size = len(prompt)
# 使用tokenizer处理文本输入,返回张量格式,并进行填充、截断和添加特殊标记
text_inputs = self.tokenizer(
prompt,
padding="max_length",
max_length=max_sequence_length,
truncation=True,
add_special_tokens=True,
return_tensors="pt",
)
# 获取处理后的输入ID
text_input_ids = text_inputs.input_ids
# 获取未截断的ID
untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 检查未截断ID是否大于等于处理后的ID,并且两者不相等
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的文本,并发出警告
removed_text = self.tokenizer.batch_decode(untruncated_ids[:, max_sequence_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}"
)
# 通过text_encoder生成prompt的嵌入表示,并将其移动到指定设备
prompt_embeds = self.text_encoder(text_input_ids.to(device))[0]
# 转换嵌入的dtype和设备
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 为每个生成的prompt重复文本嵌入,使用适合MPS的方法
_, seq_len, _ = prompt_embeds.shape
prompt_embeds = prompt_embeds.repeat(1, num_videos_per_prompt, 1)
# 重新调整嵌入的形状,以适应批次大小和生成数量
prompt_embeds = prompt_embeds.view(batch_size * num_videos_per_prompt, seq_len, -1)
# 返回最终的文本嵌入
return prompt_embeds
# 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.encode_prompt复制的方法
# 定义编码提示的函数,接受多种参数以设置提示信息和生成参数
def encode_prompt(
self,
# 提示内容,可以是字符串或字符串列表
prompt: Union[str, List[str]],
# 负提示内容,可选
negative_prompt: Optional[Union[str, List[str]]] = None,
# 是否进行分类器自由引导
do_classifier_free_guidance: bool = True,
# 每个提示生成的视频数量
num_videos_per_prompt: int = 1,
# 提示的嵌入向量,可选
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示的嵌入向量,可选
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 最大序列长度
max_sequence_length: int = 226,
# 设备类型,可选
device: Optional[torch.device] = None,
# 数据类型,可选
dtype: Optional[torch.dtype] = None,
# 定义准备潜在变量的函数,接受视频和其他参数
def prepare_latents(
self,
# 输入视频,可选
video: Optional[torch.Tensor] = None,
# 批次大小
batch_size: int = 1,
# 潜在通道数量
num_channels_latents: int = 16,
# 视频高度
height: int = 60,
# 视频宽度
width: int = 90,
# 数据类型,可选
dtype: Optional[torch.dtype] = None,
# 设备类型,可选
device: Optional[torch.device] = None,
# 随机数生成器,可选
generator: Optional[torch.Generator] = None,
# 现有潜在变量,可选
latents: Optional[torch.Tensor] = None,
# 时间步长,可选
timestep: Optional[torch.Tensor] = None,
):
# 计算视频帧数,如果潜在变量未提供则根据视频尺寸计算
num_frames = (video.size(2) - 1) // self.vae_scale_factor_temporal + 1 if latents is None else latents.size(1)
# 设置潜在变量的形状
shape = (
batch_size,
num_frames,
num_channels_latents,
height // self.vae_scale_factor_spatial,
width // self.vae_scale_factor_spatial,
)
# 检查生成器列表的长度是否与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在变量
if latents is None:
# 如果生成器是列表,则检查长度
if isinstance(generator, list):
if len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 为每个视频初始化潜在变量
init_latents = [
retrieve_latents(self.vae.encode(video[i].unsqueeze(0)), generator[i]) for i in range(batch_size)
]
else:
# 单一生成器情况下为视频初始化潜在变量
init_latents = [retrieve_latents(self.vae.encode(vid.unsqueeze(0)), generator) for vid in video]
# 将初始潜在变量连接并转移到目标数据类型,调整维度顺序
init_latents = torch.cat(init_latents, dim=0).to(dtype).permute(0, 2, 1, 3, 4) # [B, F, C, H, W]
# 通过配置的缩放因子调整潜在变量
init_latents = self.vae.config.scaling_factor * init_latents
# 生成随机噪声
noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 将噪声添加到初始潜在变量中
latents = self.scheduler.add_noise(init_latents, noise, timestep)
else:
# 如果潜在变量已提供,则将其转移到目标设备
latents = latents.to(device)
# 根据调度器要求缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在变量
return latents
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.decode_latents 拷贝而来
def decode_latents(self, latents: torch.Tensor) -> torch.Tensor:
# 将输入的张量进行维度变换,排列为 [batch_size, num_channels, num_frames, height, width]
latents = latents.permute(0, 2, 1, 3, 4) # [batch_size, num_channels, num_frames, height, width]
# 使用 VAE 的缩放因子对 latents 进行缩放
latents = 1 / self.vae.config.scaling_factor * latents
# 解码 latents,生成相应的帧并返回
frames = self.vae.decode(latents).sample
# 返回解码后的帧
return frames
# 从 diffusers.pipelines.animatediff.pipeline_animatediff_video2video.AnimateDiffVideoToVideoPipeline.get_timesteps 拷贝而来
def get_timesteps(self, num_inference_steps, timesteps, strength, device):
# 根据初始时间步计算原始时间步
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算开始时间步,确保不小于0
t_start = max(num_inference_steps - init_timestep, 0)
# 从时间步数组中截取相关部分
timesteps = timesteps[t_start * self.scheduler.order :]
# 返回调整后的时间步和剩余的推理步骤
return timesteps, num_inference_steps - t_start
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 拷贝而来
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的参数,因为并非所有调度器具有相同的参数签名
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
# eta 对应于 DDIM 论文中的 η,参考链接:https://arxiv.org/abs/2010.02502
# eta 应该在 [0, 1] 之间
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数字典
return extra_step_kwargs
def check_inputs(
self,
prompt,
height,
width,
strength,
negative_prompt,
callback_on_step_end_tensor_inputs,
video=None,
latents=None,
prompt_embeds=None,
negative_prompt_embeds=None,
):
# 检查高度和宽度是否能被8整除,如果不能则抛出错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查strength的值是否在0到1之间,如果不在范围内则抛出错误
if strength < 0 or strength > 1:
raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")
# 检查callback_on_step_end_tensor_inputs是否不为None且是否所有元素都在_callback_tensor_inputs中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 抛出错误,如果callback_on_step_end_tensor_inputs中的某些元素不在_callback_tensor_inputs中
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查prompt和prompt_embeds是否同时不为None
if prompt is not None and prompt_embeds is not None:
# 抛出错误,提示不能同时提供prompt和prompt_embeds
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prompt_embeds is None:
# 抛出错误,提示必须提供prompt或prompt_embeds其中之一
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 抛出错误,提示prompt的类型必须是str或list
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查prompt和negative_prompt_embeds是否同时不为None
if prompt is not None and negative_prompt_embeds is not None:
# 抛出错误,提示不能同时提供prompt和negative_prompt_embeds
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查negative_prompt和negative_prompt_embeds是否同时不为None
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出错误,提示不能同时提供negative_prompt和negative_prompt_embeds
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查prompt_embeds和negative_prompt_embeds是否都不为None
if prompt_embeds is not None and negative_prompt_embeds is not None:
# 检查两个embeds的形状是否相同,如果不同则抛出错误
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 检查video和latents是否同时不为None
if video is not None and latents is not None:
# 抛出错误,提示只能提供video或latents其中之一
raise ValueError("Only one of `video` or `latents` should be provided")
# 从diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline复制的方法
def fuse_qkv_projections(self) -> None:
# 文档字符串,说明该方法启用融合的QKV投影
r"""Enables fused QKV projections."""
# 设置fusing_transformer属性为True,表示启用融合
self.fusing_transformer = True
# 调用transformer对象的fuse_qkv_projections方法
self.transformer.fuse_qkv_projections()
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline.unfuse_qkv_projections 复制的代码
def unfuse_qkv_projections(self) -> None:
r"""禁用 QKV 投影融合(如果已启用)。"""
# 检查是否启用了投影融合
if not self.fusing_transformer:
# 如果没有启用,记录警告信息
logger.warning("The Transformer was not initially fused for QKV projections. Doing nothing.")
else:
# 如果启用了,执行解除 QKV 投影融合操作
self.transformer.unfuse_qkv_projections()
# 将融合标志设置为 False
self.fusing_transformer = False
# 从 diffusers.pipelines.cogvideo.pipeline_cogvideox.CogVideoXPipeline._prepare_rotary_positional_embeddings 复制的代码
def _prepare_rotary_positional_embeddings(
self,
height: int, # 输入的高度
width: int, # 输入的宽度
num_frames: int, # 输入的帧数
device: torch.device, # 指定的设备(CPU 或 GPU)
) -> Tuple[torch.Tensor, torch.Tensor]: # 返回的类型为一对张量
# 计算网格高度,基于输入高度和其他参数
grid_height = height // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算网格宽度,基于输入宽度和其他参数
grid_width = width // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础宽度大小
base_size_width = 720 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 计算基础高度大小
base_size_height = 480 // (self.vae_scale_factor_spatial * self.transformer.config.patch_size)
# 获取用于网格的裁剪区域坐标
grid_crops_coords = get_resize_crop_region_for_grid(
(grid_height, grid_width), base_size_width, base_size_height
)
# 生成三维旋转位置嵌入的余弦和正弦频率
freqs_cos, freqs_sin = get_3d_rotary_pos_embed(
embed_dim=self.transformer.config.attention_head_dim, # 嵌入维度
crops_coords=grid_crops_coords, # 裁剪坐标
grid_size=(grid_height, grid_width), # 网格大小
temporal_size=num_frames, # 时间序列大小
)
# 将余弦频率张量移动到指定设备
freqs_cos = freqs_cos.to(device=device)
# 将正弦频率张量移动到指定设备
freqs_sin = freqs_sin.to(device=device)
# 返回余弦和正弦频率张量
return freqs_cos, freqs_sin
@property
def guidance_scale(self):
# 返回指导尺度的值
return self._guidance_scale
@property
def num_timesteps(self):
# 返回时间步数的值
return self._num_timesteps
@property
def interrupt(self):
# 返回中断标志的值
return self._interrupt
@torch.no_grad() # 在不计算梯度的上下文中运行
@replace_example_docstring(EXAMPLE_DOC_STRING) # 替换示例文档字符串
# 定义可调用的类方法,允许传入多个参数以处理视频生成
def __call__(
# 视频图像列表,默认为 None
self,
video: List[Image.Image] = None,
# 生成视频的提示文本,可以是字符串或字符串列表,默认为 None
prompt: Optional[Union[str, List[str]]] = None,
# 生成视频的负面提示文本,可以是字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 输出视频的高度,默认为 480 像素
height: int = 480,
# 输出视频的宽度,默认为 720 像素
width: int = 720,
# 进行推断的步骤数量,默认为 50 步
num_inference_steps: int = 50,
# 选定的时间步列表,默认为 None
timesteps: Optional[List[int]] = None,
# 控制强度的浮点数,默认为 0.8
strength: float = 0.8,
# 引导缩放比例,默认为 6
guidance_scale: float = 6,
# 是否使用动态配置的布尔值,默认为 False
use_dynamic_cfg: bool = False,
# 每个提示生成视频的数量,默认为 1
num_videos_per_prompt: int = 1,
# eta 参数,默认为 0.0
eta: float = 0.0,
# 随机数生成器,可以是 torch.Generator 或其列表,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的潜在张量,默认为 None
latents: Optional[torch.FloatTensor] = None,
# 可选的提示嵌入,默认为 None
prompt_embeds: Optional[torch.FloatTensor] = None,
# 可选的负面提示嵌入,默认为 None
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
# 输出类型,默认为 "pil"
output_type: str = "pil",
# 是否返回字典格式,默认为 True
return_dict: bool = True,
# 步骤结束时调用的回调函数,可以是单一或多个回调,默认为 None
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 用于步骤结束回调的张量输入列表,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 最大序列长度,默认为 226
max_sequence_length: int = 226,
.\diffusers\pipelines\cogvideo\pipeline_output.py
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 导入 PyTorch 库
import torch
# 从 diffusers.utils 导入 BaseOutput 基类
from diffusers.utils import BaseOutput
# 定义 CogVideoXPipelineOutput 类,继承自 BaseOutput
@dataclass
class CogVideoXPipelineOutput(BaseOutput):
r"""
CogVideo 管道的输出类。
参数:
frames (`torch.Tensor`, `np.ndarray`, 或 List[List[PIL.Image.Image]]):
视频输出的列表 - 可以是长度为 `batch_size` 的嵌套列表,每个子列表包含
去噪的 PIL 图像序列,长度为 `num_frames`。也可以是形状为
`(batch_size, num_frames, channels, height, width)` 的 NumPy 数组或 Torch 张量。
"""
# 定义输出的帧,类型为 torch.Tensor
frames: torch.Tensor
.\diffusers\pipelines\cogvideo\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从相对路径的 utils 模块导入所需的工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 用于标识慢速导入的标志
OptionalDependencyNotAvailable, # 用于处理可选依赖项未安装的异常
_LazyModule, # 用于创建懒加载模块
get_objects_from_module, # 从模块中获取对象的函数
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 创建一个空字典,用于存放虚拟对象
_dummy_objects = {}
# 创建一个空字典,用于存放模块的导入结构
_import_structure = {}
# 尝试检查依赖项的可用性
try:
# 如果 Transformers 或 PyTorch 不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未可用的异常
except OptionalDependencyNotAvailable:
# 从 utils 模块导入虚拟对象,避免导入失败
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典,填充虚拟对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果依赖项可用,更新导入结构
else:
_import_structure["pipeline_cogvideox"] = ["CogVideoXPipeline"] # 添加 CogVideoXPipeline
_import_structure["pipeline_cogvideox_image2video"] = ["CogVideoXImageToVideoPipeline"] # 添加图像转视频管道
_import_structure["pipeline_cogvideox_video2video"] = ["CogVideoXVideoToVideoPipeline"] # 添加视频转视频管道
# 根据类型检查或慢速导入的标志进行判断
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 尝试检查依赖项的可用性
try:
# 如果 Transformers 或 PyTorch 不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未可用的异常
except OptionalDependencyNotAvailable:
# 从虚拟对象模块导入所有对象
from ...utils.dummy_torch_and_transformers_objects import *
else:
# 导入实际的管道类
from .pipeline_cogvideox import CogVideoXPipeline
from .pipeline_cogvideox_image2video import CogVideoXImageToVideoPipeline
from .pipeline_cogvideox_video2video import CogVideoXVideoToVideoPipeline
# 否则处理懒加载模块
else:
import sys
# 用 _LazyModule 创建当前模块的懒加载实例
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure, # 传递导入结构
module_spec=__spec__, # 传递模块规格
)
# 将虚拟对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\consistency_models\pipeline_consistency_models.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获得许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,否则根据许可证分发的软件是“按原样”提供的,
# 不附带任何形式的保证或条件,无论是明示还是暗示。
# 请参阅许可证以获取有关权限和限制的具体信息。
from typing import Callable, List, Optional, Union # 导入类型注解,用于函数签名和变量类型标注
import torch # 导入 PyTorch 库,供后续深度学习模型使用
from ...models import UNet2DModel # 从模型模块导入 UNet2DModel 类
from ...schedulers import CMStochasticIterativeScheduler # 从调度模块导入 CMStochasticIterativeScheduler 类
from ...utils import ( # 从工具模块导入多个工具函数和类
logging,
replace_example_docstring,
)
from ...utils.torch_utils import randn_tensor # 从 PyTorch 工具模块导入 randn_tensor 函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 从管道工具模块导入 DiffusionPipeline 和 ImagePipelineOutput 类
logger = logging.get_logger(__name__) # 创建日志记录器,记录当前模块的日志信息
EXAMPLE_DOC_STRING = """ # 示例文档字符串,提供用法示例
Examples:
```py
>>> import torch # 导入 PyTorch 库
>>> from diffusers import ConsistencyModelPipeline # 从 diffusers 导入 ConsistencyModelPipeline 类
>>> device = "cuda" # 设置设备为 CUDA(GPU)
>>> # 加载 cd_imagenet64_l2 检查点。
>>> model_id_or_path = "openai/diffusers-cd_imagenet64_l2" # 指定模型 ID 或路径
>>> pipe = ConsistencyModelPipeline.from_pretrained(model_id_or_path, torch_dtype=torch.float16) # 从预训练模型加载管道
>>> pipe.to(device) # 将管道移到指定设备上
>>> # 单步采样
>>> image = pipe(num_inference_steps=1).images[0] # 使用单步推理生成图像
>>> image.save("cd_imagenet64_l2_onestep_sample.png") # 保存生成的图像
>>> # 单步采样,类条件图像生成
>>> # ImageNet-64 类标签 145 对应于国王企鹅
>>> image = pipe(num_inference_steps=1, class_labels=145).images[0] # 生成特定类的图像
>>> image.save("cd_imagenet64_l2_onestep_sample_penguin.png") # 保存生成的图像
>>> # 多步采样,类条件图像生成
>>> # 可以显式指定时间步,以下时间步来自原始 GitHub 仓库:
>>> # https://github.com/openai/consistency_models/blob/main/scripts/launch.sh#L77
>>> image = pipe(num_inference_steps=None, timesteps=[22, 0], class_labels=145).images[0] # 生成特定类的多步图像
>>> image.save("cd_imagenet64_l2_multistep_sample_penguin.png") # 保存生成的图像
```py
"""
class ConsistencyModelPipeline(DiffusionPipeline): # 定义 ConsistencyModelPipeline 类,继承自 DiffusionPipeline
r""" # 类的文档字符串,描述其功能
Pipeline for unconditional or class-conditional image generation. # 描述此管道用于无条件或类条件图像生成
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods # 说明此模型继承自 DiffusionPipeline,并建议查看超类文档以了解通用方法
implemented for all pipelines (downloading, saving, running on a particular device, etc.). # 说明所实现的方法,包括下载、保存和在特定设备上运行等
# 函数参数说明
Args:
unet ([`UNet2DModel`]): # 传入一个 UNet2DModel 对象,用于对编码后的图像潜变量去噪。
A `UNet2DModel` to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]): # 传入一个调度器,结合 unet 用于去噪,当前仅与 CMStochasticIterativeScheduler 兼容。
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Currently only
compatible with [`CMStochasticIterativeScheduler`].
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "unet"
# 构造函数,初始化类的实例
def __init__(self, unet: UNet2DModel, scheduler: CMStochasticIterativeScheduler) -> None:
# 调用父类的构造函数
super().__init__()
# 注册 unet 和 scheduler 模块
self.register_modules(
unet=unet,
scheduler=scheduler,
)
# 初始化安全检查器为 None
self.safety_checker = None
# 准备潜变量的函数
def prepare_latents(self, batch_size, num_channels, height, width, dtype, device, generator, latents=None):
# 定义潜变量的形状
shape = (batch_size, num_channels, height, width)
# 检查生成器列表的长度是否与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有传入潜变量,则生成随机潜变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将潜变量移动到指定设备并转换数据类型
latents = latents.to(device=device, dtype=dtype)
# 根据调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜变量
return latents
# 后处理图像的函数,遵循 diffusers.VaeImageProcessor.postprocess
def postprocess_image(self, sample: torch.Tensor, output_type: str = "pil"):
# 检查输出类型是否合法
if output_type not in ["pt", "np", "pil"]:
raise ValueError(
f"output_type={output_type} is not supported. Make sure to choose one of ['pt', 'np', or 'pil']"
)
# 等同于 diffusers.VaeImageProcessor.denormalize
sample = (sample / 2 + 0.5).clamp(0, 1) # 将样本值归一化到 [0, 1] 范围内
if output_type == "pt": # 如果输出类型为 pt,直接返回样本
return sample
# 等同于 diffusers.VaeImageProcessor.pt_to_numpy
sample = sample.cpu().permute(0, 2, 3, 1).numpy() # 转换为 NumPy 数组
if output_type == "np": # 如果输出类型为 np,返回样本
return sample
# 如果输出类型必须为 'pil'
sample = self.numpy_to_pil(sample) # 将 NumPy 数组转换为 PIL 图像
return sample # 返回最终的图像
# 准备类别标签,根据给定的批大小和设备,将类别标签转换为张量
def prepare_class_labels(self, batch_size, device, class_labels=None):
# 检查 UNet 配置中类别嵌入的数量是否不为 None
if self.unet.config.num_class_embeds is not None:
# 如果 class_labels 是一个列表,将其转换为整型张量
if isinstance(class_labels, list):
class_labels = torch.tensor(class_labels, dtype=torch.int)
# 如果 class_labels 是一个整数,确保批大小为 1,并将其转换为张量
elif isinstance(class_labels, int):
assert batch_size == 1, "Batch size must be 1 if classes is an int"
class_labels = torch.tensor([class_labels], dtype=torch.int)
# 如果 class_labels 为 None,随机生成 batch_size 个类别标签
elif class_labels is None:
# 随机生成 batch_size 类别标签
# TODO: 应该在这里使用生成器吗?randn_tensor 的整数等价物未在 ...utils 中公开
class_labels = torch.randint(0, self.unet.config.num_class_embeds, size=(batch_size,))
# 将类别标签移动到指定的设备上
class_labels = class_labels.to(device)
else:
# 如果没有类别嵌入,类别标签设为 None
class_labels = None
# 返回处理后的类别标签
return class_labels
# 检查输入参数的有效性
def check_inputs(self, num_inference_steps, timesteps, latents, batch_size, img_size, callback_steps):
# 确保提供了 num_inference_steps 或 timesteps 其中之一
if num_inference_steps is None and timesteps is None:
raise ValueError("Exactly one of `num_inference_steps` or `timesteps` must be supplied.")
# 如果同时提供了 num_inference_steps 和 timesteps,发出警告
if num_inference_steps is not None and timesteps is not None:
logger.warning(
f"Both `num_inference_steps`: {num_inference_steps} and `timesteps`: {timesteps} are supplied;"
" `timesteps` will be used over `num_inference_steps`."
)
# 如果 latents 不为 None,检查其形状是否符合预期
if latents is not None:
expected_shape = (batch_size, 3, img_size, img_size)
# 如果 latents 的形状不符合预期,则抛出错误
if latents.shape != expected_shape:
raise ValueError(f"The shape of latents is {latents.shape} but is expected to be {expected_shape}.")
# 检查 callback_steps 是否为正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 装饰器,禁止梯度计算,提供调用示例文档字符串
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 定义默认参数和类型注解,初始化调用方法
batch_size: int = 1,
class_labels: Optional[Union[torch.Tensor, List[int], int]] = None,
num_inference_steps: int = 1,
timesteps: List[int] = None,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: int = 1,
.\diffusers\pipelines\consistency_models\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从上级模块导入相关工具
from ...utils import (
# 导入常量 DIFFUSERS_SLOW_IMPORT
DIFFUSERS_SLOW_IMPORT,
# 导入延迟加载模块的工具类
_LazyModule,
)
# 定义要导入的模块结构,包含 'pipeline_consistency_models' 模块及其内容
_import_structure = {
"pipeline_consistency_models": ["ConsistencyModelPipeline"],
}
# 判断是否处于类型检查阶段或需要慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 从 'pipeline_consistency_models' 模块导入 ConsistencyModelPipeline 类
from .pipeline_consistency_models import ConsistencyModelPipeline
# 否则执行以下代码
else:
import sys
# 使用 _LazyModule 创建一个延迟加载模块,并将其赋值给当前模块名
sys.modules[__name__] = _LazyModule(
# 当前模块名
__name__,
# 当前模块文件路径
globals()["__file__"],
# 导入结构
_import_structure,
# 模块规范
module_spec=__spec__,
)
.\diffusers\pipelines\controlnet\multicontrolnet.py
# 导入操作系统模块
import os
# 从 typing 模块导入类型注解
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 torch 模块导入神经网络相关功能
from torch import nn
# 从上级目录导入 ControlNetModel 和 ControlNetOutput
from ...models.controlnet import ControlNetModel, ControlNetOutput
# 从上级目录导入 ModelMixin 类
from ...models.modeling_utils import ModelMixin
# 从上级目录导入 logging 工具
from ...utils import logging
# 创建一个日志记录器,使用当前模块的名称
logger = logging.get_logger(__name__)
# 定义 MultiControlNetModel 类,继承自 ModelMixin
class MultiControlNetModel(ModelMixin):
r"""
多个 `ControlNetModel` 的包装类,用于 Multi-ControlNet
该模块是多个 `ControlNetModel` 实例的包装器。`forward()` API 设计为与 `ControlNetModel` 兼容。
参数:
controlnets (`List[ControlNetModel]`):
在去噪过程中为 unet 提供额外的条件。必须将多个 `ControlNetModel` 作为列表设置。
"""
# 初始化方法,接收一个 ControlNetModel 的列表或元组
def __init__(self, controlnets: Union[List[ControlNetModel], Tuple[ControlNetModel]]):
# 调用父类的初始化方法
super().__init__()
# 将控制网模型保存到模块列表中
self.nets = nn.ModuleList(controlnets)
# 前向传播方法,处理输入数据
def forward(
self,
sample: torch.Tensor, # 输入样本
timestep: Union[torch.Tensor, float, int], # 当前时间步
encoder_hidden_states: torch.Tensor, # 编码器的隐藏状态
controlnet_cond: List[torch.tensor], # 控制网络的条件
conditioning_scale: List[float], # 条件缩放因子
class_labels: Optional[torch.Tensor] = None, # 可选的类标签
timestep_cond: Optional[torch.Tensor] = None, # 可选的时间步条件
attention_mask: Optional[torch.Tensor] = None, # 可选的注意力掩码
added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None, # 可选的附加条件参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None, # 可选的交叉注意力参数
guess_mode: bool = False, # 是否使用猜测模式
return_dict: bool = True, # 是否返回字典格式的输出
) -> Union[ControlNetOutput, Tuple]: # 返回类型可以是 ControlNetOutput 或元组
# 遍历每个控制网络条件和缩放因子
for i, (image, scale, controlnet) in enumerate(zip(controlnet_cond, conditioning_scale, self.nets)):
# 调用控制网络进行前向传播,获取下采样和中间样本
down_samples, mid_sample = controlnet(
sample=sample, # 输入样本
timestep=timestep, # 当前时间步
encoder_hidden_states=encoder_hidden_states, # 编码器隐藏状态
controlnet_cond=image, # 控制网络条件
conditioning_scale=scale, # 条件缩放
class_labels=class_labels, # 类标签
timestep_cond=timestep_cond, # 时间步条件
attention_mask=attention_mask, # 注意力掩码
added_cond_kwargs=added_cond_kwargs, # 附加条件参数
cross_attention_kwargs=cross_attention_kwargs, # 交叉注意力参数
guess_mode=guess_mode, # 猜测模式
return_dict=return_dict, # 返回格式
)
# 合并样本
if i == 0: # 如果是第一个控制网络
# 初始化下采样和中间样本
down_block_res_samples, mid_block_res_sample = down_samples, mid_sample
else: # 如果不是第一个控制网络
# 将当前下采样样本与之前的样本合并
down_block_res_samples = [
samples_prev + samples_curr # 累加下采样样本
for samples_prev, samples_curr in zip(down_block_res_samples, down_samples)
]
# 累加中间样本
mid_block_res_sample += mid_sample
# 返回合并后的下采样样本和中间样本
return down_block_res_samples, mid_block_res_sample
# 定义一个方法,用于将模型及其配置文件保存到指定目录
def save_pretrained(
self, # 代表类实例
save_directory: Union[str, os.PathLike], # 保存目录,可以是字符串或路径类型
is_main_process: bool = True, # 指示当前进程是否为主进程,默认为 True
save_function: Callable = None, # 自定义保存函数,默认为 None
safe_serialization: bool = True, # 是否使用安全序列化方式,默认为 True
variant: Optional[str] = None, # 可选参数,指定保存权重的格式
):
"""
保存模型及其配置文件到指定目录,以便可以使用
`[`~pipelines.controlnet.MultiControlNetModel.from_pretrained`]` 类方法重新加载。
参数:
save_directory (`str` 或 `os.PathLike`):
要保存的目录,如果不存在则会创建。
is_main_process (`bool`, *可选*, 默认为 `True`):
调用此方法的进程是否为主进程,适用于分布式训练,避免竞争条件。
save_function (`Callable`):
用于保存状态字典的函数,适用于分布式训练。
safe_serialization (`bool`, *可选*, 默认为 `True`):
是否使用 `safetensors` 保存模型,或使用传统的 PyTorch 方法。
variant (`str`, *可选*):
如果指定,权重将以 pytorch_model.<variant>.bin 格式保存。
"""
# 遍历网络模型列表,并获取索引
for idx, controlnet in enumerate(self.nets):
# 确定后缀名,如果是第一个模型则无后缀
suffix = "" if idx == 0 else f"_{idx}"
# 调用每个控制网络的保存方法,传入相关参数
controlnet.save_pretrained(
save_directory + suffix, # 结合目录和后缀形成完整的保存路径
is_main_process=is_main_process, # 传递主进程标识
save_function=save_function, # 传递保存函数
safe_serialization=safe_serialization, # 传递序列化方式
variant=variant, # 传递权重格式
)