diffusers 源码解析(五十六)
# 版权声明,说明版权所有者及保留权利
# Copyright 2024 Google Brain and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版授权使用本文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 只有在遵循许可证的情况下才能使用此文件
# you may not use this file except in compliance with the License.
# 可以在以下网址获取许可证副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则本软件按“原样”提供,不附带任何形式的担保
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以获取特定的权限和限制信息
# See the License for the specific language governing permissions and
# limitations under the License.
# 免责声明:此文件受到 https://github.com/yang-song/score_sde_pytorch 的强烈影响
# DISCLAIMER: This file is strongly influenced by https://github.com/yang-song/score_sde_pytorch
# 导入数学模块
import math
# 从 typing 导入 Union 类型
from typing import Union
# 导入 PyTorch 库
import torch
# 从配置实用程序导入 ConfigMixin 和注册配置装饰器
from ...configuration_utils import ConfigMixin, register_to_config
# 从 PyTorch 实用程序导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从调度实用程序导入调度器混合类
from ..scheduling_utils import SchedulerMixin
# 定义 ScoreSdeVpScheduler 类,继承自 SchedulerMixin 和 ConfigMixin
class ScoreSdeVpScheduler(SchedulerMixin, ConfigMixin):
"""
`ScoreSdeVpScheduler` 是一种保方差的随机微分方程 (SDE) 调度器。
该模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。查看超类文档以获取库为所有调度器实现的通用方法,例如加载和保存。
参数:
num_train_timesteps (`int`, defaults to 2000):
训练模型的扩散步数。
beta_min (`int`, defaults to 0.1):
beta_max (`int`, defaults to 20):
sampling_eps (`int`, defaults to 1e-3):
采样结束值,时间步逐渐从 1 减少到 epsilon。
"""
# 定义类的顺序属性
order = 1
# 注册构造函数到配置
@register_to_config
def __init__(self, num_train_timesteps=2000, beta_min=0.1, beta_max=20, sampling_eps=1e-3):
# 初始化 sigmas 为 None
self.sigmas = None
# 初始化 discrete_sigmas 为 None
self.discrete_sigmas = None
# 初始化 timesteps 为 None
self.timesteps = None
# 定义设置时间步的方法
def set_timesteps(self, num_inference_steps, device: Union[str, torch.device] = None):
"""
设置扩散链所使用的连续时间步(在推理之前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步数。
device (`str` 或 `torch.device`, *可选*):
要移动时间步的设备。如果为 `None`,则时间步不会移动。
"""
# 生成从 1 到 sampling_eps 的线性空间,数量为 num_inference_steps,指定设备
self.timesteps = torch.linspace(1, self.config.sampling_eps, num_inference_steps, device=device)
# 定义一个步骤预测函数,通过逆向 SDE 从前一时间步预测样本
def step_pred(self, score, x, t, generator=None):
"""
从之前的时间步预测样本,通过逆向 SDE。该函数从学习的模型输出(通常是预测的噪声)传播扩散过程。
参数:
score (): 模型输出的分数
x (): 当前样本
t (): 当前时间步
generator (`torch.Generator`, *可选*): 随机数生成器
"""
# 检查 timesteps 是否被设置
if self.timesteps is None:
# 如果未设置 timesteps,抛出错误
raise ValueError(
"`self.timesteps` is not set, you need to run 'set_timesteps' after creating the scheduler"
)
# TODO(Patrick) 更好的注释 + 非 PyTorch 处理
# 后处理模型得分
log_mean_coeff = -0.25 * t**2 * (self.config.beta_max - self.config.beta_min) - 0.5 * t * self.config.beta_min
# 计算标准差,使用对数均值系数
std = torch.sqrt(1.0 - torch.exp(2.0 * log_mean_coeff))
# 将标准差展平为一维
std = std.flatten()
# 使标准差的维度与得分的维度匹配
while len(std.shape) < len(score.shape):
std = std.unsqueeze(-1)
# 对得分进行标准化处理
score = -score / std
# 计算时间增量
dt = -1.0 / len(self.timesteps)
# 计算当前时间步的 beta 值
beta_t = self.config.beta_min + t * (self.config.beta_max - self.config.beta_min)
# 将 beta_t 展平为一维
beta_t = beta_t.flatten()
# 使 beta_t 的维度与样本 x 的维度匹配
while len(beta_t.shape) < len(x.shape):
beta_t = beta_t.unsqueeze(-1)
# 计算漂移项
drift = -0.5 * beta_t * x
# 计算扩散项
diffusion = torch.sqrt(beta_t)
# 更新漂移项,包含模型得分的影响
drift = drift - diffusion**2 * score
# 计算 x 的均值
x_mean = x + drift * dt
# 添加噪声
noise = randn_tensor(x.shape, layout=x.layout, generator=generator, device=x.device, dtype=x.dtype)
# 更新样本 x,加入扩散项和噪声
x = x_mean + diffusion * math.sqrt(-dt) * noise
# 返回更新后的样本和均值
return x, x_mean
# 定义 __len__ 方法,返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查时的导入
from typing import TYPE_CHECKING
# 从上级模块的 utils 导入多个功能
from ...utils import (
# 导入慢速导入的标志
DIFFUSERS_SLOW_IMPORT,
# 导入处理可选依赖项不可用的异常
OptionalDependencyNotAvailable,
# 导入延迟加载模块的工具
_LazyModule,
# 导入从模块获取对象的函数
get_objects_from_module,
# 导入检查 PyTorch 是否可用的函数
is_torch_available,
# 导入检查 transformers 是否可用的函数
is_transformers_available,
)
# 初始化一个空字典,用于存储占位对象
_dummy_objects = {}
# 初始化一个空字典,用于存储导入结构
_import_structure = {}
# 尝试执行以下代码块
try:
# 检查 transformers 和 torch 是否都可用
if not (is_transformers_available() and is_torch_available()):
# 如果不可用,则引发可选依赖项不可用异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从 utils 导入占位对象以避免错误
from ...utils import dummy_pt_objects # noqa F403
# 更新占位对象字典,获取 dummy_pt_objects 中的对象
_dummy_objects.update(get_objects_from_module(dummy_pt_objects))
# 如果没有异常,执行以下代码
else:
# 定义可用模块的导入结构
_import_structure["scheduling_karras_ve"] = ["KarrasVeScheduler"]
_import_structure["scheduling_sde_vp"] = ["ScoreSdeVpScheduler"]
# 如果处于类型检查或启用了慢速导入,执行以下代码
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 尝试执行以下代码块
try:
# 检查 torch 是否可用
if not is_torch_available():
# 如果不可用,则引发可选依赖项不可用异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从 dummy_pt_objects 导入所有对象
from ...utils.dummy_pt_objects import * # noqa F403
else:
# 从调度模块导入 KarrasVeScheduler
from .scheduling_karras_ve import KarrasVeScheduler
# 从调度模块导入 ScoreSdeVpScheduler
from .scheduling_sde_vp import ScoreSdeVpScheduler
# 否则,执行以下代码
else:
# 导入 sys 模块以进行模块管理
import sys
# 使用 LazyModule 封装当前模块,支持延迟加载
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
# 遍历占位对象字典,将每个对象设置到当前模块中
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
# 导入数学库
import math
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入所需的类型
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从配置工具模块导入 ConfigMixin 和 register_to_config 函数
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具模块导入 BaseOutput 类
from ..utils import BaseOutput
# 从调度工具模块导入 SchedulerMixin 类
from .scheduling_utils import SchedulerMixin
# 定义生成 Gumbel 噪声的函数
def gumbel_noise(t, generator=None):
# 获取生成器设备,或使用输入张量 t 的设备
device = generator.device if generator is not None else t.device
# 创建与 t 形状相同的全零张量,并填充均匀分布的随机数
noise = torch.zeros_like(t, device=device).uniform_(0, 1, generator=generator).to(t.device)
# 返回 Gumbel 噪声
return -torch.log((-torch.log(noise.clamp(1e-20))).clamp(1e-20))
# 定义根据随机选择的 top-k 进行掩蔽的函数
def mask_by_random_topk(mask_len, probs, temperature=1.0, generator=None):
# 计算信心值,将概率取对数并添加 Gumbel 噪声
confidence = torch.log(probs.clamp(1e-20)) + temperature * gumbel_noise(probs, generator=generator)
# 对信心值进行排序
sorted_confidence = torch.sort(confidence, dim=-1).values
# 根据掩蔽长度获取截止值
cut_off = torch.gather(sorted_confidence, 1, mask_len.long())
# 创建掩蔽,标记信心值低于截止值的元素
masking = confidence < cut_off
# 返回掩蔽结果
return masking
# 定义 AmusedSchedulerOutput 数据类
@dataclass
class AmusedSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
前一个时间步的计算样本 `(x_{t-1})`。`prev_sample` 应作为下一次模型输入用于去噪循环。
pred_original_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或进行引导。
"""
# 定义前一个样本和预测原始样本
prev_sample: torch.Tensor
pred_original_sample: torch.Tensor = None
# 定义 AmusedScheduler 类,继承自 SchedulerMixin 和 ConfigMixin
class AmusedScheduler(SchedulerMixin, ConfigMixin):
# 设置调度器的顺序
order = 1
# 定义温度张量
temperatures: torch.Tensor
# 注册到配置的构造函数
@register_to_config
def __init__(
self,
mask_token_id: int,
masking_schedule: str = "cosine",
):
# 初始化温度和时间步
self.temperatures = None
self.timesteps = None
# 设置时间步的函数
def set_timesteps(
self,
num_inference_steps: int,
temperature: Union[int, Tuple[int, int], List[int]] = (2, 0),
device: Union[str, torch.device] = None,
):
# 生成反向的时间步张量
self.timesteps = torch.arange(num_inference_steps, device=device).flip(0)
# 如果温度是元组或列表,生成线性间隔的温度张量
if isinstance(temperature, (tuple, list)):
self.temperatures = torch.linspace(temperature[0], temperature[1], num_inference_steps, device=device)
# 否则生成固定温度到 0.01 的线性间隔温度张量
else:
self.temperatures = torch.linspace(temperature, 0.01, num_inference_steps, device=device)
# 定义执行一步的函数
def step(
self,
model_output: torch.Tensor,
timestep: torch.long,
sample: torch.LongTensor,
starting_mask_ratio: int = 1,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
# 定义函数返回类型为 AmusedSchedulerOutput 或元组
) -> Union[AmusedSchedulerOutput, Tuple]:
# 检查输入样本和模型输出的维度,判断是否为二维输入
two_dim_input = sample.ndim == 3 and model_output.ndim == 4
# 如果是二维输入
if two_dim_input:
# 解包模型输出的形状信息
batch_size, codebook_size, height, width = model_output.shape
# 重新调整样本的形状为 (batch_size, height * width)
sample = sample.reshape(batch_size, height * width)
# 重新调整模型输出的形状并转置
model_output = model_output.reshape(batch_size, codebook_size, height * width).permute(0, 2, 1)
# 创建一个布尔图,标识样本中的未知标记位置
unknown_map = sample == self.config.mask_token_id
# 对模型输出应用 softmax 函数,得到概率分布
probs = model_output.softmax(dim=-1)
# 获取概率的设备信息
device = probs.device
# 将概率移动到生成器所在的设备上(如果存在生成器)
probs_ = probs.to(generator.device) if generator is not None else probs # handles when generator is on CPU
# 如果概率在 CPU 上且不是浮点32格式,转换为浮点32格式
if probs_.device.type == "cpu" and probs_.dtype != torch.float32:
probs_ = probs_.float() # multinomial is not implemented for cpu half precision
# 重新调整概率的形状为 (-1, 最后一个维度大小)
probs_ = probs_.reshape(-1, probs.size(-1))
# 根据概率分布进行抽样,获取预测的原始样本
pred_original_sample = torch.multinomial(probs_, 1, generator=generator).to(device=device)
# 调整预测样本的形状以匹配概率的形状
pred_original_sample = pred_original_sample[:, 0].view(*probs.shape[:-1])
# 在未知位置用原样本替换预测样本
pred_original_sample = torch.where(unknown_map, pred_original_sample, sample)
# 如果时间步为0,设置先前样本为预测的原样本
if timestep == 0:
prev_sample = pred_original_sample
else:
# 获取样本的序列长度
seq_len = sample.shape[1]
# 查找当前时间步的索引
step_idx = (self.timesteps == timestep).nonzero()
# 计算时间步的比例
ratio = (step_idx + 1) / len(self.timesteps)
# 根据遮罩调度的类型计算遮罩比例
if self.config.masking_schedule == "cosine":
mask_ratio = torch.cos(ratio * math.pi / 2)
elif self.config.masking_schedule == "linear":
mask_ratio = 1 - ratio
else:
# 抛出未知的遮罩调度错误
raise ValueError(f"unknown masking schedule {self.config.masking_schedule}")
# 计算最终的遮罩比例
mask_ratio = starting_mask_ratio * mask_ratio
# 计算需要遮罩的长度
mask_len = (seq_len * mask_ratio).floor()
# 确保不遮罩超过之前遮罩的数量
mask_len = torch.min(unknown_map.sum(dim=-1, keepdim=True) - 1, mask_len)
# 确保至少遮罩一个标记
mask_len = torch.max(torch.tensor([1], device=model_output.device), mask_len)
# 根据预测样本获取相应的概率
selected_probs = torch.gather(probs, -1, pred_original_sample[:, :, None])[:, :, 0]
# 忽略输入中给定的标记,覆盖其置信度
selected_probs = torch.where(unknown_map, selected_probs, torch.finfo(selected_probs.dtype).max)
# 通过随机 top-k 方法进行遮罩
masking = mask_by_random_topk(mask_len, selected_probs, self.temperatures[step_idx], generator)
# 用遮罩覆盖置信度较低的标记
prev_sample = torch.where(masking, self.config.mask_token_id, pred_original_sample)
# 如果是二维输入,调整样本的形状
if two_dim_input:
prev_sample = prev_sample.reshape(batch_size, height, width)
pred_original_sample = pred_original_sample.reshape(batch_size, height, width)
# 如果不返回字典格式,返回先前样本和预测样本的元组
if not return_dict:
return (prev_sample, pred_original_sample)
# 返回 AmusedSchedulerOutput 对象
return AmusedSchedulerOutput(prev_sample, pred_original_sample)
# 定义一个方法,用于向样本添加噪声
def add_noise(self, sample, timesteps, generator=None):
# 获取当前时间步在 timesteps 中的位置索引
step_idx = (self.timesteps == timesteps).nonzero()
# 计算掩码比例,根据时间步的位置索引
ratio = (step_idx + 1) / len(self.timesteps)
# 根据配置选择掩码调度策略
if self.config.masking_schedule == "cosine":
# 使用余弦函数计算掩码比例
mask_ratio = torch.cos(ratio * math.pi / 2)
elif self.config.masking_schedule == "linear":
# 线性掩码比例计算
mask_ratio = 1 - ratio
else:
# 如果调度策略未知,抛出错误
raise ValueError(f"unknown masking schedule {self.config.masking_schedule}")
# 生成随机掩码索引,基于样本形状和设备
mask_indices = (
torch.rand(
sample.shape, device=generator.device if generator is not None else sample.device, generator=generator
).to(sample.device) # 将随机张量移至样本设备
< mask_ratio # 与掩码比例进行比较,生成布尔掩码
)
# 创建样本的克隆,以便进行掩码操作
masked_sample = sample.clone()
# 将掩码位置的样本值替换为掩码标记 ID
masked_sample[mask_indices] = self.config.mask_token_id
# 返回添加噪声后的样本
return masked_sample
# 导入数学库
import math
# 从数据类模块导入数据类装饰器
from dataclasses import dataclass
# 导入可选类型、元组和联合类型
from typing import Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从配置工具中导入混合类和注册配置的装饰器
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具模块导入基本输出类
from ..utils import BaseOutput
# 从 PyTorch 工具中导入随机张量生成函数
from ..utils.torch_utils import randn_tensor
# 从调度工具中导入调度混合类
from .scheduling_utils import SchedulerMixin
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制的函数
def betas_for_alpha_bar(
num_diffusion_timesteps, # 传入的扩散时间步数
max_beta=0.999, # 最大的 beta 值,默认为 0.999
alpha_transform_type="cosine", # alpha 变换类型,默认为 "cosine"
):
"""
创建一个 beta 调度程序,该调度程序离散化给定的 alpha_t_bar 函数,该函数定义了随时间变化的 (1-beta) 的累积乘积,从 t = [0,1]。
包含一个函数 alpha_bar,该函数接受参数 t 并将其转换为扩散过程中到该部分的 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 使用的最大 beta 值;使用小于 1 的值以防止奇异性。
alpha_transform_type (`str`, *可选*, 默认为 `cosine`): alpha_bar 的噪声调度类型。
选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度程序用于步骤模型输出的 betas
"""
# 如果 alpha_transform_type 为 "cosine"
if alpha_transform_type == "cosine":
# 定义 alpha_bar 函数,计算基于余弦的 alpha 值
def alpha_bar_fn(t):
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 如果 alpha_transform_type 为 "exp"
elif alpha_transform_type == "exp":
# 定义 alpha_bar 函数,计算基于指数的 alpha 值
def alpha_bar_fn(t):
return math.exp(t * -12.0)
# 如果传入的 alpha_transform_type 不支持
else:
# 抛出错误,提示不支持的 alpha_transform_type
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表,用于存储 beta 值
betas = []
# 遍历每个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值,并确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 返回作为 PyTorch 张量的 beta 值
return torch.tensor(betas, dtype=torch.float32)
# 定义一个数据类,用于调度器的输出
@dataclass
class ConsistencyDecoderSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
先前时间步的计算样本 `(x_{t-1})`。`prev_sample` 应作为去噪循环中的下一个模型输入。
"""
# 定义前一个样本的张量
prev_sample: torch.Tensor
# 定义一致性解码调度器类,继承自调度混合类和配置混合类
class ConsistencyDecoderScheduler(SchedulerMixin, ConfigMixin):
# 定义调度器的顺序
order = 1
# 使用装饰器注册到配置
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1024, # 训练时间步数,默认为 1024
sigma_data: float = 0.5, # 数据的 sigma 值,默认为 0.5
):
# 计算与 alpha_bar 相关的 beta 值
betas = betas_for_alpha_bar(num_train_timesteps)
# 计算 alpha 值,alpha = 1 - beta
alphas = 1.0 - betas
# 计算 alpha 的累积乘积
alphas_cumprod = torch.cumprod(alphas, dim=0)
# 计算累积 alpha 的平方根
self.sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
# 计算 1 - 累积 alpha 的平方根
self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod)
# 计算 sigma 值,sigma = sqrt(1 / alpha_cumprod - 1)
sigmas = torch.sqrt(1.0 / alphas_cumprod - 1)
# 计算累积 alpha 的倒数的平方根
sqrt_recip_alphas_cumprod = torch.sqrt(1.0 / alphas_cumprod)
# 计算 c_skip,用于后续的跳跃连接
self.c_skip = sqrt_recip_alphas_cumprod * sigma_data**2 / (sigmas**2 + sigma_data**2)
# 计算 c_out,输出通道的缩放因子
self.c_out = sigmas * sigma_data / (sigmas**2 + sigma_data**2) ** 0.5
# 计算 c_in,输入通道的缩放因子
self.c_in = sqrt_recip_alphas_cumprod / (sigmas**2 + sigma_data**2) ** 0.5
def set_timesteps(
# 定义设置时间步的函数,接收可选的推理步骤数和设备类型
self,
num_inference_steps: Optional[int] = None,
device: Union[str, torch.device] = None,
):
# 如果推理步骤数不是 2,抛出错误
if num_inference_steps != 2:
raise ValueError("Currently more than 2 inference steps are not supported.")
# 设置时间步为指定的张量
self.timesteps = torch.tensor([1008, 512], dtype=torch.long, device=device)
# 将 sqrt_alphas_cumprod 移动到指定设备
self.sqrt_alphas_cumprod = self.sqrt_alphas_cumprod.to(device)
# 将 sqrt_one_minus_alphas_cumprod 移动到指定设备
self.sqrt_one_minus_alphas_cumprod = self.sqrt_one_minus_alphas_cumprod.to(device)
# 将 c_skip 移动到指定设备
self.c_skip = self.c_skip.to(device)
# 将 c_out 移动到指定设备
self.c_out = self.c_out.to(device)
# 将 c_in 移动到指定设备
self.c_in = self.c_in.to(device)
@property
# 初始化噪声的标准差,使用 timesteps[0] 的值
def init_noise_sigma(self):
return self.sqrt_one_minus_alphas_cumprod[self.timesteps[0]]
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器的互换性。
Args:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *optional*):
扩散链中的当前时间步。
Returns:
`torch.Tensor`:
一个缩放后的输入样本。
"""
# 返回缩放后的样本,使用当前时间步对应的 c_in
return sample * self.c_in[timestep]
def step(
# 定义模型推理的步骤函数,接收模型输出、时间步、样本等参数
self,
model_output: torch.Tensor,
timestep: Union[float, torch.Tensor],
sample: torch.Tensor,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
# 定义函数返回类型为一致性解码器调度器输出或元组
) -> Union[ConsistencyDecoderSchedulerOutput, Tuple]:
"""
通过逆向 SDE 预测前一个时间步的样本。此函数从学习到的模型输出(通常是预测的噪声)传播扩散过程。
参数:
model_output (`torch.Tensor`):
来自学习扩散模型的直接输出。
timestep (`float`):
扩散链中的当前时间步。
sample (`torch.Tensor`):
由扩散过程创建的当前样本实例。
generator (`torch.Generator`, *可选*):
随机数生成器。
return_dict (`bool`, *可选*, 默认为 `True`):
是否返回一致性解码器调度器输出或元组。
返回:
[`~schedulers.scheduling_consistency_models.ConsistencyDecoderSchedulerOutput`] 或 `tuple`:
如果 return_dict 为 `True`,返回一致性解码器调度器输出;否则返回一个元组,元组的第一个元素是样本张量。
"""
# 计算当前时间步的输出,结合模型输出和样本
x_0 = self.c_out[timestep] * model_output + self.c_skip[timestep] * sample
# 获取当前时间步在时间步列表中的索引
timestep_idx = torch.where(self.timesteps == timestep)[0]
# 检查当前时间步是否为最后一个时间步
if timestep_idx == len(self.timesteps) - 1:
# 如果是最后一个时间步,前一个样本为当前输出
prev_sample = x_0
else:
# 否则生成噪声,并计算前一个样本
noise = randn_tensor(x_0.shape, generator=generator, dtype=x_0.dtype, device=x_0.device)
prev_sample = (
self.sqrt_alphas_cumprod[self.timesteps[timestep_idx + 1]].to(x_0.dtype) * x_0
+ self.sqrt_one_minus_alphas_cumprod[self.timesteps[timestep_idx + 1]].to(x_0.dtype) * noise
)
# 如果不返回字典,则返回前一个样本的元组
if not return_dict:
return (prev_sample,)
# 否则返回一致性解码器调度器输出
return ConsistencyDecoderSchedulerOutput(prev_sample=prev_sample)
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)进行许可;
# 除非符合许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,软件在许可证下分发是以“原样”基础进行的,
# 不提供任何形式的保证或条件,无论是明示还是暗示。
# 请参见许可证以获取管理权限和
# 限制的具体语言。
from dataclasses import dataclass # 从 dataclasses 模块导入 dataclass 装饰器
from typing import List, Optional, Tuple, Union # 导入类型注解
import numpy as np # 导入 numpy 库并简写为 np
import torch # 导入 PyTorch 库
from ..configuration_utils import ConfigMixin, register_to_config # 从上级模块导入配置相关的工具
from ..utils import BaseOutput, logging # 从上级模块导入基本输出类和日志工具
from ..utils.torch_utils import randn_tensor # 从上级模块导入随机张量生成函数
from .scheduling_utils import SchedulerMixin # 从当前模块导入调度器混合类
logger = logging.get_logger(__name__) # 创建一个日志记录器,使用当前模块的名称,禁用 pylint 命名检查
@dataclass
class CMStochasticIterativeSchedulerOutput(BaseOutput): # 定义一个数据类,继承自 BaseOutput
"""
调度器的 `step` 函数的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
计算的前一步样本 `(x_{t-1})`。`prev_sample` 应作为下一个模型输入用于
去噪循环。
"""
prev_sample: torch.Tensor # 前一步样本的张量属性
class CMStochasticIterativeScheduler(SchedulerMixin, ConfigMixin): # 定义调度器类,继承自 SchedulerMixin 和 ConfigMixin
"""
一致性模型的多步和一步采样。
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。请查看超类文档以了解库为所有调度器实现的
通用方法,例如加载和保存。
# 函数参数说明
Args:
num_train_timesteps (`int`, defaults to 40): # 训练模型的扩散步骤数,默认为40
The number of diffusion steps to train the model.
sigma_min (`float`, defaults to 0.002): # sigma调度中的最小噪声幅度,默认为0.002
Minimum noise magnitude in the sigma schedule. Defaults to 0.002 from the original implementation.
sigma_max (`float`, defaults to 80.0): # sigma调度中的最大噪声幅度,默认为80.0
Maximum noise magnitude in the sigma schedule. Defaults to 80.0 from the original implementation.
sigma_data (`float`, defaults to 0.5): # EDM中数据分布的标准差,默认为0.5
The standard deviation of the data distribution from the EDM
[paper](https://huggingface.co/papers/2206.00364). Defaults to 0.5 from the original implementation.
s_noise (`float`, defaults to 1.0): # 用于采样时抵消细节丢失的额外噪声量,默认为1.0
The amount of additional noise to counteract loss of detail during sampling. A reasonable range is [1.000,
1.011]. Defaults to 1.0 from the original implementation.
rho (`float`, defaults to 7.0): # 用于从EDM计算Karras sigma调度的参数,默认为7.0
The parameter for calculating the Karras sigma schedule from the EDM
[paper](https://huggingface.co/papers/2206.00364). Defaults to 7.0 from the original implementation.
clip_denoised (`bool`, defaults to `True`): # 是否将去噪输出限制在`(-1, 1)`范围内
Whether to clip the denoised outputs to `(-1, 1)`.
timesteps (`List` or `np.ndarray` or `torch.Tensor`, *optional*): # 可选的明确时间步调度,需按升序排列
An explicit timestep schedule that can be optionally specified. The timesteps are expected to be in
increasing order.
"""
# 设置步骤的初始顺序为1
order = 1
@register_to_config
def __init__((
self,
num_train_timesteps: int = 40, # 设置训练时的扩散步骤数
sigma_min: float = 0.002, # 设置sigma调度的最小噪声幅度
sigma_max: float = 80.0, # 设置sigma调度的最大噪声幅度
sigma_data: float = 0.5, # 设置数据分布的标准差
s_noise: float = 1.0, # 设置额外噪声量
rho: float = 7.0, # 设置Karras sigma调度的参数
clip_denoised: bool = True, # 设置是否对去噪输出进行限制
):
# 初始化噪声分布的标准差
self.init_noise_sigma = sigma_max
# 创建从0到1的线性渐变,长度为训练步骤数
ramp = np.linspace(0, 1, num_train_timesteps)
# 将渐变转换为Karras sigma
sigmas = self._convert_to_karras(ramp)
# 将sigma转换为时间步
timesteps = self.sigma_to_t(sigmas)
# 可设值初始化
self.num_inference_steps = None # 推理步骤数初始化为None
# 将sigma转换为torch张量
self.sigmas = torch.from_numpy(sigmas)
# 将时间步转换为torch张量
self.timesteps = torch.from_numpy(timesteps)
self.custom_timesteps = False # 设置自定义时间步标志为False
self.is_scale_input_called = False # 标记是否调用了输入缩放
self._step_index = None # 初始化步骤索引为None
self._begin_index = None # 初始化开始索引为None
# 将sigma移动到CPU,避免过多的CPU/GPU通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
@property
def step_index(self):
"""
当前时间步的索引计数器。每次调度器步骤后增加1。
"""
return self._step_index # 返回当前步骤索引
@property
def begin_index(self):
"""
第一个时间步的索引。应该通过`set_begin_index`方法从管道中设置。
"""
return self._begin_index # 返回开始索引
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index 复制的代码
# 设置调度器的起始索引,默认为 0
def set_begin_index(self, begin_index: int = 0):
# 调用此函数应在推理前执行
"""
Sets the begin index for the scheduler. This function should be run from pipeline before the inference.
Args:
begin_index (`int`):
The begin index for the scheduler.
"""
# 将给定的起始索引存储到实例变量中
self._begin_index = begin_index
# 缩放一致性模型输入,通过公式 `(sigma**2 + sigma_data**2) ** 0.5`
def scale_model_input(self, sample: torch.Tensor, timestep: Union[float, torch.Tensor]) -> torch.Tensor:
# 获取与当前时间步对应的 sigma 值
"""
Scales the consistency model input by `(sigma**2 + sigma_data**2) ** 0.5`.
Args:
sample (`torch.Tensor`):
The input sample.
timestep (`float` or `torch.Tensor`):
The current timestep in the diffusion chain.
Returns:
`torch.Tensor`:
A scaled input sample.
"""
# 如果步骤索引为 None,则初始化步骤索引
if self.step_index is None:
self._init_step_index(timestep)
# 从 sigmas 列表中获取当前步骤的 sigma 值
sigma = self.sigmas[self.step_index]
# 按照给定公式缩放输入样本
sample = sample / ((sigma**2 + self.config.sigma_data**2) ** 0.5)
# 标记已调用缩放输入的方法
self.is_scale_input_called = True
# 返回缩放后的样本
return sample
# 从 Karras sigmas 获取缩放时间步,以输入一致性模型
def sigma_to_t(self, sigmas: Union[float, np.ndarray]):
#
"""
Gets scaled timesteps from the Karras sigmas for input to the consistency model.
Args:
sigmas (`float` or `np.ndarray`):
A single Karras sigma or an array of Karras sigmas.
Returns:
`float` or `np.ndarray`:
A scaled input timestep or scaled input timestep array.
"""
# 检查 sigmas 是否为 numpy 数组,如果不是,则转换为数组
if not isinstance(sigmas, np.ndarray):
sigmas = np.array(sigmas, dtype=np.float64)
# 根据 Karras 的公式计算时间步
timesteps = 1000 * 0.25 * np.log(sigmas + 1e-44)
# 返回计算得到的时间步
return timesteps
# 设置推理的时间步,参数为可选
def set_timesteps(
self,
num_inference_steps: Optional[int] = None,
device: Union[str, torch.device] = None,
timesteps: Optional[List[int]] = None,
# 修改后的 _convert_to_karras 实现,接受 ramp 作为参数
def _convert_to_karras(self, ramp):
# 构造 Karras 等人的噪声调度
"""Constructs the noise schedule of Karras et al. (2022)."""
# 获取配置中的最小和最大 sigma 值
sigma_min: float = self.config.sigma_min
sigma_max: float = self.config.sigma_max
# 获取配置中的 rho 值
rho = self.config.rho
# 计算最小和最大 sigma 的倒数
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
# 根据 ramp 计算 sigma 值
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
# 返回计算得到的 sigma 值
return sigmas
# 获取缩放因子
def get_scalings(self, sigma):
# 从配置中获取 sigma_data 值
sigma_data = self.config.sigma_data
# 计算 c_skip 和 c_out
c_skip = sigma_data**2 / (sigma**2 + sigma_data**2)
c_out = sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
# 返回计算得到的缩放因子
return c_skip, c_out
# 获取用于一致性模型参数化中的缩放因子,以满足边界条件
def get_scalings_for_boundary_condition(self, sigma):
"""
获取用于一致性模型参数化的缩放因子(参见论文的附录 C),以强制边界条件。
<Tip>
在 `c_skip` 和 `c_out` 的方程中,`epsilon` 被设置为 `sigma_min`。
</Tip>
参数:
sigma (`torch.Tensor`):
当前的 sigma 值,来自 Karras sigma 调度。
返回:
`tuple`:
一个包含两个元素的元组,其中 `c_skip`(当前样本的权重)是第一个元素,`c_out`
(一致性模型输出的权重)是第二个元素。
"""
# 从配置中获取最小的 sigma 值
sigma_min = self.config.sigma_min
# 从配置中获取数据 sigma 值
sigma_data = self.config.sigma_data
# 计算 c_skip 的缩放因子
c_skip = sigma_data**2 / ((sigma - sigma_min) ** 2 + sigma_data**2)
# 计算 c_out 的缩放因子
c_out = (sigma - sigma_min) * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
# 返回 c_skip 和 c_out
return c_skip, c_out
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.index_for_timestep 复制
# 获取给定时间步的索引
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果未提供调度时间步,则使用默认时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 找到与当前时间步匹配的索引
indices = (schedule_timesteps == timestep).nonzero()
# 对于**第一个** `step` 采取的 sigma 索引
# 始终是第二个索引(或如果只有一个,则为最后一个索引)
# 这样可以确保在去噪调度中间开始时不会意外跳过一个 sigma
pos = 1 if len(indices) > 1 else 0
# 返回对应索引的值
return indices[pos].item()
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._init_step_index 复制
# 初始化步骤索引
def _init_step_index(self, timestep):
# 如果开始索引为 None,计算当前时间步的索引
if self.begin_index is None:
# 如果时间步是张量,将其转换为与时间步设备相同
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
# 根据时间步获取步骤索引
self._step_index = self.index_for_timestep(timestep)
else:
# 否则,使用已定义的开始索引
self._step_index = self._begin_index
# 执行一步操作
def step(
self,
model_output: torch.Tensor,
timestep: Union[float, torch.Tensor],
sample: torch.Tensor,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.add_noise 复制
# 向样本添加噪声
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor,
# 函数返回一个张量,表示处理后的噪声样本
) -> torch.Tensor:
# 将 sigmas 转移到与原始样本相同的设备和数据类型
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
# 检查设备类型是否为 mps,且 timesteps 是否为浮点型
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps):
# mps 不支持 float64 类型
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32)
# 将 timesteps 转移到相同设备,转换为 float32
timesteps = timesteps.to(original_samples.device, dtype=torch.float32)
else:
# 将 timesteps 转移到原始样本的设备,不改变数据类型
schedule_timesteps = self.timesteps.to(original_samples.device)
timesteps = timesteps.to(original_samples.device)
# 当 scheduler 用于训练或管道未实现 set_begin_index 时,begin_index 为 None
if self.begin_index is None:
# 根据每个 timesteps 找到对应的步索引
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# 在第一次去噪步骤后调用 add_noise(用于修复)
step_indices = [self.step_index] * timesteps.shape[0]
else:
# 在第一次去噪步骤前调用 add_noise 创建初始潜在图像 (img2img)
step_indices = [self.begin_index] * timesteps.shape[0]
# 根据步索引获取对应的 sigmas,并展平为一维
sigma = sigmas[step_indices].flatten()
# 如果 sigma 的维度小于原始样本的维度,则在最后添加一个维度
while len(sigma.shape) < len(original_samples.shape):
sigma = sigma.unsqueeze(-1)
# 将噪声样本与 sigma 相加,生成带噪声的样本
noisy_samples = original_samples + noise * sigma
# 返回带噪声的样本
return noisy_samples
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权声明,说明此文件属于 TSAIL 团队和 HuggingFace 团队,所有权利保留
#
# 根据 Apache 许可证 2.0 版(“许可证”)进行许可;
# 除非遵循许可证,否则您不能使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件是按“现状”提供的,
# 不附带任何形式的担保或条件,无论是明示还是暗示。
# 请参阅许可证以获取有关特定语言治理权限和
# 限制的详细信息。
# 免责声明:此文件受 https://github.com/LuChengTHU/dpm-solver 和 https://github.com/NVlabs/edm 的强烈影响
# 导入数学库以便进行数学计算
import math
# 从 typing 模块导入类型提示相关的类型
from typing import List, Optional, Tuple, Union
# 导入 NumPy 库,提供数组操作和数学函数
import numpy as np
# 导入 PyTorch 库,用于深度学习和张量计算
import torch
# 从配置工具导入配置混合类和注册到配置的装饰器
from ..configuration_utils import ConfigMixin, register_to_config
# 从调度模块导入布朗树噪声采样器
from .scheduling_dpmsolver_sde import BrownianTreeNoiseSampler
# 从调度工具导入调度混合类和调度输出类
from .scheduling_utils import SchedulerMixin, SchedulerOutput
# 定义一个名为 CosineDPMSolverMultistepScheduler 的调度器类
class CosineDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
"""
实现了一种带余弦调度的 `DPMSolverMultistepScheduler` 变体,由 Nichol 和 Dhariwal (2021) 提出。
此调度器用于 Stable Audio Open [1]。
[1] Evans, Parker 等人. "Stable Audio Open" https://arxiv.org/abs/2407.14358
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。
请查看超类文档以获取库为所有调度器实现的通用方法,例如加载和保存。
# 参数说明
Args:
# 噪声调度中的最小噪声幅度,默认值为 0.3
sigma_min (`float`, *optional*, defaults to 0.3):
Minimum noise magnitude in the sigma schedule. This was set to 0.3 in Stable Audio Open [1].
# 噪声调度中的最大噪声幅度,默认值为 500
sigma_max (`float`, *optional*, defaults to 500):
Maximum noise magnitude in the sigma schedule. This was set to 500 in Stable Audio Open [1].
# 数据分布的标准差,默认值为 1.0
sigma_data (`float`, *optional*, defaults to 1.0):
The standard deviation of the data distribution. This is set to 1.0 in Stable Audio Open [1].
# 用于计算 `sigmas` 的调度方式,默认使用指数调度
sigma_schedule (`str`, *optional*, defaults to `exponential`):
Sigma schedule to compute the `sigmas`. By default, we the schedule introduced in the EDM paper
(https://arxiv.org/abs/2206.00364). Other acceptable value is "exponential". The exponential schedule was
incorporated in this model: https://huggingface.co/stabilityai/cosxl.
# 训练模型的扩散步骤数量,默认值为 1000
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
# DPMSolver 的顺序,默认值为 2
solver_order (`int`, defaults to 2):
The DPMSolver order which can be `1` or `2`. It is recommended to use `solver_order=2`.
# 调度函数的预测类型,默认值为 `v_prediction`
prediction_type (`str`, defaults to `v_prediction`, *optional*):
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://imagen.research.google/video/paper.pdf) paper).
# 第二阶求解器的类型,默认值为 `midpoint`
solver_type (`str`, defaults to `midpoint`):
Solver type for the second-order solver; can be `midpoint` or `heun`. The solver type slightly affects the
sample quality, especially for a small number of steps. It is recommended to use `midpoint` solvers.
# 最后步骤中是否使用低阶求解器,默认值为 True
lower_order_final (`bool`, defaults to `True`):
Whether to use lower-order solvers in the final steps. Only valid for < 15 inference steps. This can
stabilize the sampling of DPMSolver for steps < 15, especially for steps <= 10.
# 最后一步中是否使用欧拉法,默认值为 False
euler_at_final (`bool`, defaults to `False`):
Whether to use Euler's method in the final step. It is a trade-off between numerical stability and detail
richness. This can stabilize the sampling of the SDE variant of DPMSolver for small number of inference
steps, but sometimes may result in blurring.
# 采样过程中噪声调度的最终 `sigma` 值,默认值为 `"zero"`
final_sigmas_type (`str`, defaults to `"zero"`):
The final `sigma` value for the noise schedule during the sampling process. If `"sigma_min"`, the final
sigma is the same as the last sigma in the training schedule. If `zero`, the final sigma is set to 0.
"""
# 初始化兼容列表
_compatibles = []
# 初始化求解器顺序,默认值为 1
order = 1
# 注册到配置
@register_to_config
# 初始化类的构造函数,设置默认参数
def __init__(
# 最小标准差,默认为0.3
sigma_min: float = 0.3,
# 最大标准差,默认为500
sigma_max: float = 500,
# 数据标准差,默认为1.0
sigma_data: float = 1.0,
# 标准差调度方式,默认为"exponential"
sigma_schedule: str = "exponential",
# 训练时间步数,默认为1000
num_train_timesteps: int = 1000,
# 求解器的阶数,默认为2
solver_order: int = 2,
# 预测类型,默认为"v_prediction"
prediction_type: str = "v_prediction",
# ρ值,默认为7.0
rho: float = 7.0,
# 求解器类型,默认为"midpoint"
solver_type: str = "midpoint",
# 是否在最后使用低阶求解器,默认为True
lower_order_final: bool = True,
# 是否在最后使用欧拉法,默认为False
euler_at_final: bool = False,
# 最终标准差类型,默认为"zero"
final_sigmas_type: Optional[str] = "zero", # "zero", "sigma_min"
):
# 检查求解器类型是否有效
if solver_type not in ["midpoint", "heun"]:
# 如果求解器类型是"logrho"、"bh1"或"bh2",注册为"midpoint"
if solver_type in ["logrho", "bh1", "bh2"]:
self.register_to_config(solver_type="midpoint")
# 如果求解器类型无效,抛出未实现错误
else:
raise NotImplementedError(f"{solver_type} is not implemented for {self.__class__}")
# 创建从0到1的线性间隔,长度为训练时间步数
ramp = torch.linspace(0, 1, num_train_timesteps)
# 根据调度类型计算标准差
if sigma_schedule == "karras":
sigmas = self._compute_karras_sigmas(ramp)
elif sigma_schedule == "exponential":
sigmas = self._compute_exponential_sigmas(ramp)
# 预处理噪声
self.timesteps = self.precondition_noise(sigmas)
# 将计算得到的标准差与零张量连接
self.sigmas = torch.cat([sigmas, torch.zeros(1, device=sigmas.device)])
# 可设值初始化
self.num_inference_steps = None # 推理步骤数初始化为None
self.model_outputs = [None] * solver_order # 根据求解器阶数初始化输出列表
self.lower_order_nums = 0 # 低阶数初始化为0
self._step_index = None # 步骤索引初始化为None
self._begin_index = None # 开始索引初始化为None
# 将标准差张量移到CPU以减少CPU/GPU间通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
@property
def init_noise_sigma(self):
# 返回初始噪声分布的标准差
return (self.config.sigma_max**2 + 1) ** 0.5
@property
def step_index(self):
"""
当前时间步的索引计数器。每次调度步骤后增加1。
"""
return self._step_index
@property
def begin_index(self):
"""
第一个时间步的索引。应通过`set_begin_index`方法从管道设置。
"""
return self._begin_index
# 从diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler复制的设置开始索引方法
def set_begin_index(self, begin_index: int = 0):
"""
设置调度器的开始索引。此函数应在推理前从管道运行。
参数:
begin_index (`int`):
调度器的开始索引。
"""
self._begin_index = begin_index # 设置开始索引
# 从diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler复制的预处理输入方法
def precondition_inputs(self, sample, sigma):
# 计算输入样本的缩放因子
c_in = 1 / ((sigma**2 + self.config.sigma_data**2) ** 0.5)
# 对样本进行缩放处理
scaled_sample = sample * c_in
return scaled_sample # 返回缩放后的样本
def precondition_noise(self, sigma):
# 如果sigma不是张量,则将其转换为张量
if not isinstance(sigma, torch.Tensor):
sigma = torch.tensor([sigma])
# 返回噪声预处理结果
return sigma.atan() / math.pi * 2 # 计算预处理噪声并返回
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.precondition_outputs 复制而来
def precondition_outputs(self, sample, model_output, sigma):
# 获取配置中的 sigma 数据
sigma_data = self.config.sigma_data
# 计算 c_skip 值,用于后续的去噪过程
c_skip = sigma_data**2 / (sigma**2 + sigma_data**2)
# 根据预测类型选择不同的计算方式
if self.config.prediction_type == "epsilon":
# 计算 c_out 值,使用 epsilon 预测类型
c_out = sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
elif self.config.prediction_type == "v_prediction":
# 计算 c_out 值,使用 v_prediction 预测类型
c_out = -sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
else:
# 如果预测类型不支持,抛出异常
raise ValueError(f"Prediction type {self.config.prediction_type} is not supported.")
# 计算去噪后的结果
denoised = c_skip * sample + c_out * model_output
# 返回去噪后的样本
return denoised
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.scale_model_input 复制而来
def scale_model_input(self, sample: torch.Tensor, timestep: Union[float, torch.Tensor]) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器的互换性。
按 `(sigma**2 + 1) ** 0.5` 缩放去噪模型输入,以匹配 Euler 算法。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *optional*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 如果步骤索引为 None,初始化步骤索引
if self.step_index is None:
self._init_step_index(timestep)
# 获取当前步骤对应的 sigma 值
sigma = self.sigmas[self.step_index]
# 预处理输入样本
sample = self.precondition_inputs(sample, sigma)
# 标记输入缩放已被调用
self.is_scale_input_called = True
# 返回预处理后的样本
return sample
# 定义设置离散时间步的方法,接受推理步骤数和设备作为参数
def set_timesteps(self, num_inference_steps: int = None, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步(在推理之前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数,与预训练模型结合。
device (`str` 或 `torch.device`, *可选*):
时间步应该移动到的设备。如果为 `None`,则时间步不移动。
"""
# 将输入的推理步骤数赋值给类属性
self.num_inference_steps = num_inference_steps
# 创建一个从 0 到 1 的线性空间,步数为 num_inference_steps
ramp = torch.linspace(0, 1, self.num_inference_steps)
# 根据配置选择不同的 sigma 计算方式
if self.config.sigma_schedule == "karras":
# 计算 Karras 方法的 sigma 值
sigmas = self._compute_karras_sigmas(ramp)
elif self.config.sigma_schedule == "exponential":
# 计算指数方法的 sigma 值
sigmas = self._compute_exponential_sigmas(ramp)
# 将 sigma 转换为 float32 类型,并移动到指定设备
sigmas = sigmas.to(dtype=torch.float32, device=device)
# 对 sigma 进行预处理以获取时间步
self.timesteps = self.precondition_noise(sigmas)
# 根据配置选择最后一个 sigma 值的类型
if self.config.final_sigmas_type == "sigma_min":
sigma_last = self.config.sigma_min
elif self.config.final_sigmas_type == "zero":
sigma_last = 0
else:
# 抛出异常,如果 final_sigmas_type 不是预期的类型
raise ValueError(
f"`final_sigmas_type` must be one of 'zero', or 'sigma_min', but got {self.config.final_sigmas_type}"
)
# 将计算得到的 sigma 和最后的 sigma 拼接
self.sigmas = torch.cat([sigmas, torch.tensor([sigma_last], dtype=torch.float32, device=device)])
# 初始化模型输出列表,长度为配置的求解器顺序
self.model_outputs = [
None,
] * self.config.solver_order
# 初始化较低阶数目
self.lower_order_nums = 0
# 为允许重复时间步的调度器添加索引计数器
self._step_index = None
self._begin_index = None
# 将 sigma 移动到 CPU,避免过多的 CPU/GPU 通信
self.sigmas = self.sigmas.to("cpu")
# 如果使用噪声采样器,重新初始化它
self.noise_sampler = None
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler._compute_karras_sigmas 复制而来
def _compute_karras_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
"""构建 Karras 等人的噪声调度(2022)。"""
# 如果 sigma_min 和 sigma_max 为 None,则使用配置中的默认值
sigma_min = sigma_min or self.config.sigma_min
sigma_max = sigma_max or self.config.sigma_max
# 获取配置中的 rho 值
rho = self.config.rho
# 计算 sigma_min 和 sigma_max 的倒数
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
# 根据 ramp 计算 sigmas 的值
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
return sigmas
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler._compute_exponential_sigmas 复制而来
# 计算指数 sigma 值,基于 k-diffusion 的实现
def _compute_exponential_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
# 如果 sigma_min 为 None,使用配置中的 sigma_min
sigma_min = sigma_min or self.config.sigma_min
# 如果 sigma_max 为 None,使用配置中的 sigma_max
sigma_max = sigma_max or self.config.sigma_max
# 创建从 sigma_min 到 sigma_max 的对数等间隔值,计算其指数并反转顺序
sigmas = torch.linspace(math.log(sigma_min), math.log(sigma_max), len(ramp)).exp().flip(0)
# 返回计算出的 sigma 值
return sigmas
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._sigma_to_t 复制的函数
def _sigma_to_t(self, sigma, log_sigmas):
# 获取 sigma 的对数值,确保不小于 1e-10
log_sigma = np.log(np.maximum(sigma, 1e-10))
# 计算 log_sigma 与 log_sigmas 之间的距离
dists = log_sigma - log_sigmas[:, np.newaxis]
# 计算 sigmas 范围的低索引和高索引
low_idx = np.cumsum((dists >= 0), axis=0).argmax(axis=0).clip(max=log_sigmas.shape[0] - 2)
high_idx = low_idx + 1
# 获取低和高的 log_sigma 值
low = log_sigmas[low_idx]
high = log_sigmas[high_idx]
# 计算加权值用于插值
w = (low - log_sigma) / (low - high)
w = np.clip(w, 0, 1)
# 将插值转化为时间范围
t = (1 - w) * low_idx + w * high_idx
# 调整 t 的形状以匹配 sigma 的形状
t = t.reshape(sigma.shape)
# 返回计算出的时间 t
return t
def _sigma_to_alpha_sigma_t(self, sigma):
# 设置 alpha_t 为 1,因为输入在进入 unet 之前已经预先缩放
alpha_t = torch.tensor(1) # Inputs are pre-scaled before going into unet, so alpha_t = 1
# sigma_t 直接赋值为输入的 sigma
sigma_t = sigma
# 返回 alpha_t 和 sigma_t
return alpha_t, sigma_t
def convert_model_output(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
) -> torch.Tensor:
"""
将模型输出转换为 DPMSolver/DPMSolver++ 算法所需的相应类型。DPM-Solver 旨在离散化噪声预测模型的积分,
而 DPM-Solver++ 则旨在离散化数据预测模型的积分。
<Tip>
算法和模型类型是解耦的。您可以使用 DPMSolver 或 DPMSolver++ 处理噪声预测和数据预测模型。
</Tip>
Args:
model_output (`torch.Tensor`):
从学习到的扩散模型直接输出的张量。
sample (`torch.Tensor`):
扩散过程中创建的当前样本实例。
Returns:
`torch.Tensor`:
转换后的模型输出。
"""
# 根据当前步骤索引获取对应的 sigma 值
sigma = self.sigmas[self.step_index]
# 预处理模型输出,得到预测的 x0 值
x0_pred = self.precondition_outputs(sample, model_output, sigma)
# 返回预测结果 x0_pred
return x0_pred
def dpm_solver_first_order_update(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
noise: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
One step for the first-order DPMSolver (equivalent to DDIM).
Args:
model_output (`torch.Tensor`):
The direct output from the learned diffusion model.
sample (`torch.Tensor`):
A current instance of a sample created by the diffusion process.
Returns:
`torch.Tensor`:
The sample tensor at the previous timestep.
"""
# 获取当前步骤和下一个步骤的噪声水平
sigma_t, sigma_s = self.sigmas[self.step_index + 1], self.sigmas[self.step_index]
# 将噪声水平转换为 alpha 和 sigma
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t)
alpha_s, sigma_s = self._sigma_to_alpha_sigma_t(sigma_s)
# 计算当前和前一步的 lambda 值
lambda_t = torch.log(alpha_t) - torch.log(sigma_t)
lambda_s = torch.log(alpha_s) - torch.log(sigma_s)
# 计算 h 值,表示两个 lambda 之间的差异
h = lambda_t - lambda_s
# 确保噪声不为 None,进行断言检查
assert noise is not None
# 根据当前样本、模型输出和噪声计算新的样本 x_t
x_t = (
(sigma_t / sigma_s * torch.exp(-h)) * sample
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * model_output
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise
)
# 返回计算得到的样本 x_t
return x_t
def multistep_dpm_solver_second_order_update(
self,
model_output_list: List[torch.Tensor],
sample: torch.Tensor = None,
noise: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
一步执行二阶多步DPMSolver。
参数:
model_output_list (`List[torch.Tensor]`):
当前和后续时间步长从学习到的扩散模型的直接输出。
sample (`torch.Tensor`):
由扩散过程创建的当前样本实例。
返回:
`torch.Tensor`:
前一个时间步的样本张量。
"""
# 从 sigma 列表中获取当前、前一和前两步的 sigma 值
sigma_t, sigma_s0, sigma_s1 = (
self.sigmas[self.step_index + 1],
self.sigmas[self.step_index],
self.sigmas[self.step_index - 1],
)
# 将 sigma 转换为 alpha 和 sigma_t
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t)
alpha_s0, sigma_s0 = self._sigma_to_alpha_sigma_t(sigma_s0)
alpha_s1, sigma_s1 = self._sigma_to_alpha_sigma_t(sigma_s1)
# 计算 lambda 值,用于后续计算
lambda_t = torch.log(alpha_t) - torch.log(sigma_t)
lambda_s0 = torch.log(alpha_s0) - torch.log(sigma_s0)
lambda_s1 = torch.log(alpha_s1) - torch.log(sigma_s1)
# 从模型输出列表中获取最后两个输出
m0, m1 = model_output_list[-1], model_output_list[-2]
# 计算 h 和 h_0,用于噪声调整
h, h_0 = lambda_t - lambda_s0, lambda_s0 - lambda_s1
r0 = h_0 / h # 计算比例 r0
D0, D1 = m0, (1.0 / r0) * (m0 - m1) # 计算 D0 和 D1
# sde-dpmsolver++
assert noise is not None # 确保噪声不为空
if self.config.solver_type == "midpoint":
# 使用 midpoint 方法计算 x_t
x_t = (
(sigma_t / sigma_s0 * torch.exp(-h)) * sample
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * D0
+ 0.5 * (alpha_t * (1 - torch.exp(-2.0 * h))) * D1
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise
)
elif self.config.solver_type == "heun":
# 使用 heun 方法计算 x_t
x_t = (
(sigma_t / sigma_s0 * torch.exp(-h)) * sample
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * D0
+ (alpha_t * ((1.0 - torch.exp(-2.0 * h)) / (-2.0 * h) + 1.0)) * D1
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise
)
return x_t # 返回计算得到的 x_t
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.index_for_timestep 复制的代码
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果没有提供调度时间步,使用默认时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 找到与当前时间步匹配的索引候选
index_candidates = (schedule_timesteps == timestep).nonzero()
# 如果没有找到匹配,设置为最后一个时间步
if len(index_candidates) == 0:
step_index = len(self.timesteps) - 1
# 如果找到多个匹配,选择第二个索引
elif len(index_candidates) > 1:
step_index = index_candidates[1].item()
else:
# 否则选择第一个匹配的索引
step_index = index_candidates[0].item()
return step_index # 返回计算得到的步索引
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler._init_step_index 复制而来
def _init_step_index(self, timestep):
"""
初始化调度器的 step_index 计数器。
"""
# 如果 begin_index 为 None,表示尚未设置开始索引
if self.begin_index is None:
# 检查 timestep 是否为 PyTorch 的张量
if isinstance(timestep, torch.Tensor):
# 将 timestep 转换到与 timesteps 设备相同
timestep = timestep.to(self.timesteps.device)
# 根据当前的 timestep 计算并设置 step_index
self._step_index = self.index_for_timestep(timestep)
else:
# 如果 begin_index 已经设置,则直接使用它
self._step_index = self._begin_index
def step(
self,
model_output: torch.Tensor,
timestep: Union[int, torch.Tensor],
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.add_noise 复制而来
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor,
) -> torch.Tensor:
# 确保 sigmas 和 timesteps 的设备和数据类型与 original_samples 相同
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
# 如果设备为 mps 且 timesteps 是浮点类型
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps):
# mps 不支持 float64 类型
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32)
# 将 timesteps 转换为与 original_samples 相同的设备和数据类型
timesteps = timesteps.to(original_samples.device, dtype=torch.float32)
else:
# 将 schedule_timesteps 转换为与 original_samples 相同的设备
schedule_timesteps = self.timesteps.to(original_samples.device)
# 将 timesteps 转换为与 original_samples 相同的设备
timesteps = timesteps.to(original_samples.device)
# 当 scheduler 用于训练时,或者管道未实现 set_begin_index 时,begin_index 为 None
if self.begin_index is None:
# 计算每个 timesteps 对应的 step indices
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# add_noise 在第一次去噪步骤后被调用(用于修复)
step_indices = [self.step_index] * timesteps.shape[0]
else:
# add noise 在第一次去噪步骤之前被调用以创建初始潜在图像(img2img)
step_indices = [self.begin_index] * timesteps.shape[0]
# 获取与当前 step_indices 相关联的 sigma 值,并将其展平
sigma = sigmas[step_indices].flatten()
# 扩展 sigma 以匹配 original_samples 的形状
while len(sigma.shape) < len(original_samples.shape):
sigma = sigma.unsqueeze(-1)
# 添加噪声到原始样本
noisy_samples = original_samples + noise * sigma
# 返回带噪声的样本
return noisy_samples
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权声明,说明文件的版权所有者及其许可信息
# Copyright 2024 Stanford University Team and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可协议授权使用
# Licensed under the Apache License, Version 2.0 (the "License");
# 只能在遵守许可的情况下使用此文件
# you may not use this file except in compliance with the License.
# 可以在以下地址获取许可的副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律要求或书面协议另有约定,否则软件在“按原样”基础上分发,不附带任何明示或暗示的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以了解特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 声明:此代码深受以下项目的影响
# DISCLAIMER: This code is strongly influenced by https://github.com/pesser/pytorch_diffusion
# and https://github.com/hojonathanho/diffusion
# 导入数学库
import math
# 从数据类模块导入数据类装饰器
from dataclasses import dataclass
# 导入类型注解相关的类型
from typing import List, Optional, Tuple, Union
# 导入 NumPy 库
import numpy as np
# 导入 PyTorch 库
import torch
# 从配置工具中导入配置混合类和注册配置函数
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具中导入基础输出类
from ..utils import BaseOutput
# 从 PyTorch 工具中导入生成随机张量的函数
from ..utils.torch_utils import randn_tensor
# 从调度工具中导入 Karras 扩散调度器和调度混合类
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
# 从 diffusers.schedulers.scheduling_ddpm.DDPMSchedulerOutput 复制的调度器输出类,修改了 DDPM 为 DDIM
class DDIMSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 的图像):
上一时间步的计算样本 `(x_{t-1})`。`prev_sample` 应作为下一次模型输入
在去噪循环中使用。
pred_original_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 的图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或进行引导。
"""
# 定义前一个样本张量
prev_sample: torch.Tensor
# 定义可选的预测原始样本张量
pred_original_sample: Optional[torch.Tensor] = None
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制的函数
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个 beta 调度,离散化给定的 alpha_t_bar 函数,该函数定义了随时间变化的
(1-beta) 的累积乘积,t 的范围为 [0,1]。
包含一个 alpha_bar 函数,该函数接受 t 参数并将其转换为
(1-beta) 的累积乘积,直到扩散过程的该部分。
参数:
num_diffusion_timesteps (`int`): 要生成的 beta 数量。
max_beta (`float`): 使用的最大 beta 值;使用小于 1 的值以
防止奇异情况。
alpha_transform_type (`str`, *可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
可选值为 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于逐步模型输出的 beta 值
"""
# 检查给定的 alpha 变换类型是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义一个函数,根据输入 t 计算 cos 变换
def alpha_bar_fn(t):
# 返回经过调整的 cos 函数值的平方
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查给定的 alpha 变换类型是否为 "exp"
elif alpha_transform_type == "exp":
# 定义一个函数,根据输入 t 计算指数变换
def alpha_bar_fn(t):
# 返回 e 的 t 值乘以 -12 的指数
return math.exp(t * -12.0)
# 如果 alpha 变换类型不支持,则抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表用于存储 beta 值
betas = []
# 遍历每一个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步的比例 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步的比例 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算并追加 beta 值到列表,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 返回一个包含 beta 值的张量,数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 定义一个函数,重新缩放 betas 以使终端 SNR 为零
def rescale_zero_terminal_snr(betas):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 重新缩放 betas 以实现零终端 SNR
参数:
betas (`torch.Tensor`):
初始化调度器时使用的 betas。
返回:
`torch.Tensor`: 具有零终端 SNR 的重新缩放的 betas
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas # 计算 alphas
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 计算 alphas 的平方根
# 存储旧值
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 克隆第一个值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 克隆最后一个值
# 平移使得最后一步为零
alphas_bar_sqrt -= alphas_bar_sqrt_T # 减去最后一个值
# 缩放使得第一步恢复到旧值
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 缩放
# 将 alphas_bar_sqrt 转换回 betas
alphas_bar = alphas_bar_sqrt**2 # 还原平方
alphas = alphas_bar[1:] / alphas_bar[:-1] # 还原累积乘积
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个元素拼接回来
betas = 1 - alphas # 计算新的 betas
return betas # 返回重新缩放的 betas
# 定义 DDIMScheduler 类,扩展去噪程序
class DDIMScheduler(SchedulerMixin, ConfigMixin):
"""
`DDIMScheduler` 扩展了在去噪扩散概率模型 (DDPMs) 中引入的去噪程序,
并使用非马尔可夫引导。
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。请查看超类文档以获取
所有调度器的通用方法,例如加载和保存。
"""
_compatibles = [e.name for e in KarrasDiffusionSchedulers] # 兼容的调度器列表
order = 1 # 设置调度器的顺序
@register_to_config
def __init__(
self,
num_train_timesteps: int = 1000, # 训练时间步数
beta_start: float = 0.0001, # beta 的起始值
beta_end: float = 0.02, # beta 的结束值
beta_schedule: str = "linear", # beta 调度方式
trained_betas: Optional[Union[np.ndarray, List[float]]] = None, # 已训练的 betas
clip_sample: bool = True, # 是否剪切样本
set_alpha_to_one: bool = True, # 是否将 alpha 设置为 1
steps_offset: int = 0, # 步骤偏移量
prediction_type: str = "epsilon", # 预测类型
thresholding: bool = False, # 是否进行阈值处理
dynamic_thresholding_ratio: float = 0.995, # 动态阈值比例
clip_sample_range: float = 1.0, # 样本范围
sample_max_value: float = 1.0, # 样本最大值
timestep_spacing: str = "leading", # 时间步间隔方式
rescale_betas_zero_snr: bool = False, # 是否重新缩放以实现零 SNR
):
# 如果训练的 beta 值不为 None,则将其转换为张量
if trained_betas is not None:
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果 beta 调度是线性的,生成一个线性间隔的 beta 值张量
elif beta_schedule == "linear":
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果 beta 调度是 scaled_linear,特定于潜在扩散模型,生成平方根后再平方的 beta 值
elif beta_schedule == "scaled_linear":
# 该调度非常特定于潜在扩散模型
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果 beta 调度是 squaredcos_cap_v2,使用 Glide 的余弦调度
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果调度类型不被实现,抛出错误
else:
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 如果需要,重新缩放 beta 值以适应零 SNR
if rescale_betas_zero_snr:
self.betas = rescale_zero_terminal_snr(self.betas)
# 计算 alphas,1 - betas
self.alphas = 1.0 - self.betas
# 计算 alphas 的累积乘积
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 在每个 ddim 步骤中,查看前一个 alphas_cumprod
# 对于最终步骤,因为已经在 0,所以没有前一个 alphas_cumprod
# `set_alpha_to_one` 决定是将该参数设置为 1 还是使用“非前一个”的最终 alpha
self.final_alpha_cumprod = torch.tensor(1.0) if set_alpha_to_one else self.alphas_cumprod[0]
# 初始化噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值
self.num_inference_steps = None
# 创建时间步张量,从 num_train_timesteps 逆序生成
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64))
# 定义输入样本的缩放函数,支持时间步长的可互换性
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器之间的可互换性。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 返回原始样本,当前未进行任何缩放
return sample
# 获取给定时间步和前一步的方差
def _get_variance(self, timestep, prev_timestep):
# 计算当前时间步的 alphas 累积乘积
alpha_prod_t = self.alphas_cumprod[timestep]
# 计算前一步的 alphas 累积乘积
alpha_prod_t_prev = self.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else self.final_alpha_cumprod
# 计算当前和前一步的 beta 值
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 计算方差
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算得到的方差
return variance
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler._threshold_sample 复制而来
# 定义动态阈值采样方法,输入为一个张量,输出为处理后的张量
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
"""
"动态阈值:在每个采样步骤中,我们将 s 设置为 xt0 中某个百分位绝对像素值(x_0 在时间步 t 的预测),
如果 s > 1,则将 xt0 阈值限制在 [-s, s] 范围内,然后除以 s。动态阈值通过将饱和像素(接近 -1 和 1 的像素)向内推
进,从而在每个步骤中积极防止像素饱和。我们发现动态阈值显著提高了照片真实感以及图像与文本的对齐,特别是在使用
非常大的引导权重时。"
https://arxiv.org/abs/2205.11487
"""
# 获取输入样本的数据类型
dtype = sample.dtype
# 获取输入样本的批次大小、通道数和剩余维度
batch_size, channels, *remaining_dims = sample.shape
# 如果数据类型不是 float32 或 float64,则转换为 float,以便进行百分位计算
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # 用于量化计算的上溯,且未实现对 cpu 半精度的夹紧
# 将样本展平以便在每张图像上进行百分位计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值
abs_sample = sample.abs() # "某个百分位绝对像素值"
# 计算样本绝对值的给定百分位
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 将 s 限制在指定范围内
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # 当最小值限制为 1 时,相当于标准限制在 [-1, 1]
# 扩展 s 的维度,以便与样本广播
s = s.unsqueeze(1) # (batch_size, 1) 因为 clamp 将沿 dim=0 广播
# 将样本限制在 [-s, s] 范围内并进行归一化
sample = torch.clamp(sample, -s, s) / s # "我们将 xt0 阈值限制在 [-s, s] 范围内并除以 s"
# 将样本重新调整回原始形状
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原始数据类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 设置用于扩散链的离散时间步数(在推理之前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置生成样本时用于扩散步骤的离散时间步数。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。
"""
# 检查推理步骤数是否超过训练时间步数
if num_inference_steps > self.config.num_train_timesteps:
raise ValueError(
f"`num_inference_steps`: {num_inference_steps} 不能大于 `self.config.train_timesteps`:"
f" {self.config.num_train_timesteps} 因为使用此调度器训练的unet模型只能处理"
f" 最大 {self.config.num_train_timesteps} 个时间步。"
)
# 设置实例的推理步骤数
self.num_inference_steps = num_inference_steps
# 根据配置的时间步间隔方式生成时间步
if self.config.timestep_spacing == "linspace":
# 生成线性间隔的时间步,并反向排序
timesteps = (
np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps)
.round()[::-1]
.copy()
.astype(np.int64)
)
elif self.config.timestep_spacing == "leading":
# 计算步长比率
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 通过乘以步长比率生成整数时间步
# 转换为整数以避免在推理步骤为3的幂时出现问题
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64)
# 添加偏移量
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算步长比率
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 通过乘以步长比率生成整数时间步
# 转换为整数以避免在推理步骤为3的幂时出现问题
timesteps = np.round(np.arange(self.config.num_train_timesteps, 0, -step_ratio)).astype(np.int64)
# 减去1以调整时间步
timesteps -= 1
else:
# 抛出不支持的时间步间隔类型的错误
raise ValueError(
f"{self.config.timestep_spacing} 不受支持。请确保选择 'leading' 或 'trailing'。"
)
# 将生成的时间步转换为张量并移动到指定设备
self.timesteps = torch.from_numpy(timesteps).to(device)
# 定义推理步骤
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
eta: float = 0.0,
use_clipped_model_output: bool = False,
generator=None,
variance_noise: Optional[torch.Tensor] = None,
return_dict: bool = True,
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.add_noise 复制的
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.IntTensor,
) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 与 original_samples 拥有相同的设备和数据类型
# 将 self.alphas_cumprod 移动到指定设备,以避免后续 add_noise 调用时冗余的 CPU 到 GPU 数据传输
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为 original_samples 的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将 timesteps 移动到 original_samples 的设备
timesteps = timesteps.to(original_samples.device)
# 计算 timesteps 对应的 sqrt_alpha_prod
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 original_samples,则在最后添加一个维度
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算 timesteps 对应的 sqrt_one_minus_alpha_prod
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 original_samples,则在最后添加一个维度
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算带噪声的样本 noisy_samples
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回带噪声的样本
return noisy_samples
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.get_velocity 复制
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 与 sample 拥有相同的设备和数据类型
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 转换为 sample 的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将 timesteps 移动到 sample 的设备
timesteps = timesteps.to(sample.device)
# 计算 timesteps 对应的 sqrt_alpha_prod
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 sample,则在最后添加一个维度
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算 timesteps 对应的 sqrt_one_minus_alpha_prod
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 sample,则在最后添加一个维度
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度 velocity
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算得到的速度
return velocity
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权所有 2024 The CogVideoX 团队,清华大学和 ZhipuAI 及 HuggingFace 团队。
# 保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 除非遵循许可证,否则不得使用此文件。
# 您可以在以下地址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议,否则根据许可证分发的软件
# 是以“按现状”基础分发的,不提供任何形式的保证或条件,
# 无论是明示或暗示的。有关许可证下特定语言的权限和
# 限制,请参阅许可证。
# 免责声明:此代码深受 https://github.com/pesser/pytorch_diffusion
# 和 https://github.com/hojonathanho/diffusion 的影响
# 导入数学库
import math
# 从数据类模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从类型模块导入 List、Optional、Tuple 和 Union 类型
from typing import List, Optional, Tuple, Union
# 导入 numpy 库
import numpy as np
# 导入 torch 库
import torch
# 从配置工具模块导入 ConfigMixin 和 register_to_config
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具模块导入 BaseOutput 类
from ..utils import BaseOutput
# 从调度工具模块导入 KarrasDiffusionSchedulers 和 SchedulerMixin
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
# 从 diffusers.schedulers.scheduling_ddpm.DDPMSchedulerOutput 复制,进行 DDPM 到 DDIM 的转换
class DDIMSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
计算得出的前一时间步的样本 `(x_{t-1})`。`prev_sample` 应作为下一个模型输入
在去噪循环中使用。
pred_original_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或指导。
"""
# 定义前一个样本的张量
prev_sample: torch.Tensor
# 可选的预测原始样本的张量
pred_original_sample: Optional[torch.Tensor] = None
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制的函数
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个 beta 调度,该调度离散化给定的 alpha_t_bar 函数,该函数定义了
随着时间推移 (1-beta) 的累积乘积,从 t = [0,1]。
包含一个函数 alpha_bar,该函数接受参数 t 并将其转换为
在扩散过程中到目前为止 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 要生成的 beta 数量。
max_beta (`float`): 要使用的最大 beta 值;使用低于 1 的值以
防止奇异性。
alpha_transform_type (`str`, *可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于更新模型输出的 betas
"""
# 检查 alpha_transform_type 是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义 alpha_bar_fn 函数,用于计算基于余弦的 alpha 值
def alpha_bar_fn(t):
# 返回余弦函数的平方,调整参数以适应时间 t
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义 alpha_bar_fn 函数,用于计算基于指数的 alpha 值
def alpha_bar_fn(t):
# 返回以 t 为输入的指数衰减值
return math.exp(t * -12.0)
# 如果 alpha_transform_type 既不是 "cosine" 也不是 "exp"
else:
# 抛出不支持的 alpha_transform_type 异常
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表 betas,用于存储 beta 值
betas = []
# 遍历每一个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步 t2
t2 = (i + 1) / num_diffusion_timesteps
# 将计算得到的 beta 值添加到列表中
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 返回 betas 列表作为一个张量,数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 定义函数以根据给定的累积 alpha 值重新调整 beta 值,使最终信噪比为零
def rescale_zero_terminal_snr(alphas_cumprod):
"""
Rescales betas to have zero terminal SNR Based on https://arxiv.org/pdf/2305.08891.pdf (Algorithm 1)
Args:
betas (`torch.Tensor`):
the betas that the scheduler is being initialized with.
Returns:
`torch.Tensor`: rescaled betas with zero terminal SNR
"""
# 计算累积 alpha 值的平方根
alphas_bar_sqrt = alphas_cumprod.sqrt()
# 存储旧的值,以便后续计算
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone()
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone()
# 将最后一个时间步的值移至零
alphas_bar_sqrt -= alphas_bar_sqrt_T
# 调整比例,使第一个时间步的值恢复为旧值
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
# 将 alphas_bar_sqrt 转换为 beta 值
alphas_bar = alphas_bar_sqrt**2 # 恢复平方
# 返回调整后的 beta 值
return alphas_bar
# 定义类以扩展去噪过程,采用非马尔可夫指导
class CogVideoXDDIMScheduler(SchedulerMixin, ConfigMixin):
"""
`DDIMScheduler` extends the denoising procedure introduced in denoising diffusion probabilistic models (DDPMs) with
non-Markovian guidance.
This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
methods the library implements for all schedulers such as loading and saving.
"""
# 兼容的调度器名称列表
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
# 调度器的顺序
order = 1
# 装饰器注册配置
@register_to_config
def __init__(
# 训练时间步的数量,默认为1000
num_train_timesteps: int = 1000,
# beta 的起始值
beta_start: float = 0.00085,
# beta 的结束值
beta_end: float = 0.0120,
# beta 的调度类型
beta_schedule: str = "scaled_linear",
# 经过训练的 beta 值,可选参数
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
# 是否裁剪样本
clip_sample: bool = True,
# 是否将 alpha 设置为 1
set_alpha_to_one: bool = True,
# 时间步的偏移量
steps_offset: int = 0,
# 预测类型
prediction_type: str = "epsilon",
# 裁剪样本的范围
clip_sample_range: float = 1.0,
# 样本的最大值
sample_max_value: float = 1.0,
# 时间步的间隔类型
timestep_spacing: str = "leading",
# 是否重新调整 beta 以实现零信噪比
rescale_betas_zero_snr: bool = False,
# 信噪比的偏移缩放因子
snr_shift_scale: float = 3.0,
):
# 检查是否提供了经过训练的 beta 值
if trained_betas is not None:
# 将训练的 beta 转换为张量,数据类型为 float32
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果 beta 调度为线性
elif beta_schedule == "linear":
# 生成一个线性空间的 beta 值
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果 beta 调度为 scaled_linear
elif beta_schedule == "scaled_linear":
# 此调度非常特定于潜在扩散模型
# 生成平方根后的线性空间,并再平方
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float64) ** 2
# 如果 beta 调度为 squaredcos_cap_v2
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
else:
# 抛出未实现错误
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 计算 alpha 值为 1 减去 beta
self.alphas = 1.0 - self.betas
# 计算 alpha 累积乘积
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 根据 SNR shift 进行修改,遵循 SD3
self.alphas_cumprod = self.alphas_cumprod / (snr_shift_scale + (1 - snr_shift_scale) * self.alphas_cumprod)
# 如果需要对零 SNR 进行重缩放
if rescale_betas_zero_snr:
# 重缩放到零终端 SNR
self.alphas_cumprod = rescale_zero_terminal_snr(self.alphas_cumprod)
# 在每一步中,我们查看之前的 alphas_cumprod
# 最后一步时,没有前一个 alphas_cumprod,因为已经到达 0
# `set_alpha_to_one` 决定是否将此参数设置为 1 或使用最终 alpha
self.final_alpha_cumprod = torch.tensor(1.0) if set_alpha_to_one else self.alphas_cumprod[0]
# 初始化噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值
self.num_inference_steps = None
# 创建反向时间步长的张量
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64))
def _get_variance(self, timestep, prev_timestep):
# 获取当前时间步的 alpha 累积乘积
alpha_prod_t = self.alphas_cumprod[timestep]
# 获取前一个时间步的 alpha 累积乘积
alpha_prod_t_prev = self.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else self.final_alpha_cumprod
# 计算当前和前一个时间步的 beta 值
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 根据 beta 值计算方差
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算出的方差
return variance
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器互换性。
Args:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
Returns:
`torch.Tensor`:
缩放后的输入样本。
"""
# 返回未缩放的样本(示例中未实际实现缩放)
return sample
# 设置离散时间步,用于扩散链(在推理前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于生成样本的扩散步骤数量。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。
"""
# 检查推理步骤是否超过训练时步数的最大值
if num_inference_steps > self.config.num_train_timesteps:
raise ValueError(
f"`num_inference_steps`: {num_inference_steps} 不能大于 `self.config.train_timesteps`:"
f" {self.config.num_train_timesteps},因为训练此调度器的unet模型只能处理"
f" 最大 {self.config.num_train_timesteps} 步数。"
)
# 将推理步骤数量赋值给实例变量
self.num_inference_steps = num_inference_steps
# 根据配置中的时间步间隔类型计算时间步
if self.config.timestep_spacing == "linspace":
# 生成从0到训练时间步数-1的均匀间隔时间步
timesteps = (
np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps)
.round()[::-1]
.copy()
.astype(np.int64)
)
elif self.config.timestep_spacing == "leading":
# 计算每步的比例,用于生成整数量的时间步
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 创建整数时间步,通过比例乘法得到
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64)
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算每步的比例,用于生成整数量的时间步
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 生成整数量时间步,通过比例乘法得到
timesteps = np.round(np.arange(self.config.num_train_timesteps, 0, -step_ratio)).astype(np.int64)
timesteps -= 1
else:
# 如果时间步间隔不受支持,则抛出错误
raise ValueError(
f"{self.config.timestep_spacing} 不受支持。请确保选择 'leading' 或 'trailing' 之一。"
)
# 将计算得到的时间步转换为张量并移至指定设备
self.timesteps = torch.from_numpy(timesteps).to(device)
# 步骤函数,处理模型输出和样本
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
eta: float = 0.0,
use_clipped_model_output: bool = False,
generator=None,
variance_noise: Optional[torch.Tensor] = None,
return_dict: bool = True,
):
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.add_noise 中复制的函数
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.IntTensor,
) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 与 original_samples 具有相同的设备和数据类型
# 将 self.alphas_cumprod 移动到设备,以避免后续 add_noise 调用中冗余的 CPU 到 GPU 数据移动
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为与 original_samples 相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将 timesteps 移动到 original_samples 所在的设备
timesteps = timesteps.to(original_samples.device)
# 计算 alpha 的平方根乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 original_samples,则在最后增加维度
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算 (1 - alpha) 的平方根
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 original_samples,则在最后增加维度
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 生成噪声样本,结合原始样本和噪声
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回噪声样本
return noisy_samples
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.get_velocity 复制而来
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 与 sample 具有相同的设备和数据类型
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 转换为与 sample 相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将 timesteps 移动到 sample 所在的设备
timesteps = timesteps.to(sample.device)
# 计算 alpha 的平方根乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 sample,则在最后增加维度
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算 (1 - alpha) 的平方根
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 sample,则在最后增加维度
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度,结合噪声和样本
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算出的速度
return velocity
# 获取训练时间步长的数量
def __len__(self):
# 返回训练时间步长的数量
return self.config.num_train_timesteps