diffusers 源码解析(五十九)
# 版权所有 2024 TSAIL Team 和 HuggingFace Team。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件是在“按现状”基础上分发的,
# 不提供任何形式的保证或条件。
# 请参阅许可证以了解管理权限和
# 限制的具体条款。
# 声明:此文件受 https://github.com/LuChengTHU/dpm-solver 的强烈影响
import math # 导入数学库以进行数学计算
from typing import List, Optional, Tuple, Union # 导入类型提示以支持类型检查
import numpy as np # 导入 NumPy 库以支持数组操作
import torch # 导入 PyTorch 库以支持张量运算
from ..configuration_utils import ConfigMixin, register_to_config # 导入配置混合器和注册函数
from ..utils import deprecate, logging # 导入弃用和日志记录工具
from ..utils.torch_utils import randn_tensor # 导入生成随机张量的工具
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin, SchedulerOutput # 导入调度相关工具
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,便于调试
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制的函数
def betas_for_alpha_bar(
num_diffusion_timesteps, # 输入参数:扩散时间步数
max_beta=0.999, # 输入参数:最大 beta 值,默认值为 0.999
alpha_transform_type="cosine", # 输入参数:alpha 转换类型,默认值为 "cosine"
):
"""
创建一个 beta 调度,离散化给定的 alpha_t_bar 函数,该函数定义了
随时间的 (1-beta) 的累积乘积,从 t = [0,1]。
包含一个函数 alpha_bar,它接受一个参数 t,并将其转换为
扩散过程的 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 使用的最大 beta 值;使用小于 1 的值以
防止奇异性。
alpha_transform_type (`str`, *可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
可选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于逐步模型输出的 betas
"""
if alpha_transform_type == "cosine": # 如果 alpha 转换类型为 "cosine"
def alpha_bar_fn(t): # 定义 alpha_bar 函数
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2 # 计算 alpha_bar 值
elif alpha_transform_type == "exp": # 如果 alpha 转换类型为 "exp"
def alpha_bar_fn(t): # 定义 alpha_bar 函数
return math.exp(t * -12.0) # 计算 alpha_bar 值
else: # 如果 alpha 转换类型不支持
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}") # 抛出异常
betas = [] # 初始化一个空列表以存储 beta 值
for i in range(num_diffusion_timesteps): # 遍历每个扩散时间步
t1 = i / num_diffusion_timesteps # 计算当前时间步 t1
t2 = (i + 1) / num_diffusion_timesteps # 计算下一个时间步 t2
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta)) # 计算 beta 并添加到列表
return torch.tensor(betas, dtype=torch.float32) # 返回作为张量的 beta 列表
class DPMSolverSinglestepScheduler(SchedulerMixin, ConfigMixin): # 定义一个类,继承调度器混合器和配置混合器
"""
`DPMSolverSinglestepScheduler` 是一个快速的专用高阶求解器,用于扩散 ODE。
# 该模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`],查看父类文档以获取库为所有调度器实现的通用方法,如加载和保存。
_compatibles = [e.name for e in KarrasDiffusionSchedulers] # 获取 KarrasDiffusionSchedulers 中所有调度器的名称
order = 1 # 设置调度器的初始顺序为 1
@register_to_config # 将该方法注册到配置中
def __init__( # 初始化方法
self,
num_train_timesteps: int = 1000, # 训练的时间步数,默认值为 1000
beta_start: float = 0.0001, # 起始 beta 值,默认值为 0.0001
beta_end: float = 0.02, # 结束 beta 值,默认值为 0.02
beta_schedule: str = "linear", # beta 值的调度方式,默认值为线性
trained_betas: Optional[np.ndarray] = None, # 可选的已训练 beta 值
solver_order: int = 2, # 解算器的顺序,默认值为 2
prediction_type: str = "epsilon", # 预测类型,默认值为 'epsilon'
thresholding: bool = False, # 是否应用阈值处理,默认值为 False
dynamic_thresholding_ratio: float = 0.995, # 动态阈值比例,默认值为 0.995
sample_max_value: float = 1.0, # 生成样本的最大值,默认值为 1.0
algorithm_type: str = "dpmsolver++", # 算法类型,默认值为 'dpmsolver++'
solver_type: str = "midpoint", # 解算器类型,默认值为 'midpoint'
lower_order_final: bool = False, # 最终是否使用较低的顺序,默认值为 False
use_karras_sigmas: Optional[bool] = False, # 是否使用 Karras 的 sigma,默认值为 False
final_sigmas_type: Optional[str] = "zero", # 最终 sigma 的类型,默认为 'zero'
lambda_min_clipped: float = -float("inf"), # 最小 lambda 值,默认为负无穷
variance_type: Optional[str] = None, # 方差类型,默认为 None
def get_order_list(self, num_inference_steps: int) -> List[int]: # 获取每个时间步的解算器顺序
"""
计算每个时间步的解算器顺序。
Args:
num_inference_steps (`int`): # 输入参数:生成样本时使用的扩散步数
生成样本时使用的扩散步数。
"""
steps = num_inference_steps # 将输入的扩散步数赋值给变量 steps
order = self.config.solver_order # 从配置中获取解算器顺序
if order > 3: # 如果解算器顺序大于 3
raise ValueError("Order > 3 is not supported by this scheduler") # 抛出异常,表示不支持
if self.config.lower_order_final: # 如果配置要求最终使用较低的顺序
if order == 3: # 如果解算器顺序为 3
if steps % 3 == 0: # 如果步数能够被 3 整除
orders = [1, 2, 3] * (steps // 3 - 1) + [1, 2] + [1] # 按照顺序生成 orders 列表
elif steps % 3 == 1: # 如果步数除以 3 的余数为 1
orders = [1, 2, 3] * (steps // 3) + [1] # 生成 orders 列表
else: # 如果步数除以 3 的余数为 2
orders = [1, 2, 3] * (steps // 3) + [1, 2] # 生成 orders 列表
elif order == 2: # 如果解算器顺序为 2
if steps % 2 == 0: # 如果步数能够被 2 整除
orders = [1, 2] * (steps // 2 - 1) + [1, 1] # 生成 orders 列表
else: # 如果步数不能被 2 整除
orders = [1, 2] * (steps // 2) + [1] # 生成 orders 列表
elif order == 1: # 如果解算器顺序为 1
orders = [1] * steps # 生成 orders 列表
else: # 如果最终不使用较低的顺序
if order == 3: # 如果解算器顺序为 3
orders = [1, 2, 3] * (steps // 3) # 生成 orders 列表
elif order == 2: # 如果解算器顺序为 2
orders = [1, 2] * (steps // 2) # 生成 orders 列表
elif order == 1: # 如果解算器顺序为 1
orders = [1] * steps # 生成 orders 列表
return orders # 返回生成的 orders 列表
@property
def step_index(self): # 当前时间步的索引计数器属性
"""
当前时间步的索引计数器。每次调度器步骤后增加 1。
"""
return self._step_index # 返回当前步骤索引
@property
def begin_index(self): # 第一个时间步的索引属性
"""
第一个时间步的索引。应该通过 `set_begin_index` 方法从管道中设置。
"""
return self._begin_index # 返回开始步骤索引
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler 复制的 set_begin_index 方法
# 设置调度器的起始索引,默认值为 0
def set_begin_index(self, begin_index: int = 0):
"""
设置调度器的起始索引。此函数应在推理前从管道中运行。
Args:
begin_index (`int`):
调度器的起始索引。
"""
# 将传入的起始索引赋值给实例变量
self._begin_index = begin_index
# 设置推理步骤的数量、设备和时间步
def set_timesteps(
self,
num_inference_steps: int = None,
device: Union[str, torch.device] = None,
timesteps: Optional[List[int]] = None,
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler._threshold_sample 复制
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
"""
"动态阈值:在每个采样步骤中,我们将 s 设置为 xt0(t 时刻 x_0 的预测)的某个百分位绝对像素值,
如果 s > 1,则将 xt0 阈值设定在范围 [-s, s] 内,然后除以 s。动态阈值在每一步主动防止像素饱和,
我们发现动态阈值显著提高了照片现实主义以及图像-文本对齐,特别是在使用非常大的指导权重时。"
https://arxiv.org/abs/2205.11487
"""
# 获取样本的数值类型
dtype = sample.dtype
# 解包样本的形状,获取批大小、通道和剩余维度
batch_size, channels, *remaining_dims = sample.shape
# 如果数据类型不是 float32 或 float64,则将其转换为 float
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # 为分位数计算上溯,且未实现对 CPU half 的限制
# 将样本展平以便在每个图像上进行分位数计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值
abs_sample = sample.abs() # "某个百分位绝对像素值"
# 计算样本绝对值的给定分位数
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 限制 s 的范围,最小值为 1,最大值为配置的样本最大值
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # 当限制为最小值 1 时,相当于标准限制在 [-1, 1] 的范围内
# 扩展 s 的维度以便广播
s = s.unsqueeze(1) # (batch_size, 1) 因为 clamp 会在 dim=0 上广播
# 将样本限制在范围 [-s, s] 内并除以 s
sample = torch.clamp(sample, -s, s) / s # "我们将 xt0 阈值设定在范围 [-s, s] 内,然后除以 s"
# 将样本的形状还原回原来的维度
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原始数据类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._sigma_to_t 复制
# 定义一个私有方法,将 sigma 和 log_sigmas 转换为时间 t
def _sigma_to_t(self, sigma, log_sigmas):
# 计算 log sigma,防止 sigma 为零,使用 np.maximum 限制最小值
log_sigma = np.log(np.maximum(sigma, 1e-10))
# 计算分布,得到 log_sigma 和 log_sigmas 之间的距离
dists = log_sigma - log_sigmas[:, np.newaxis]
# 获取 sigmas 的范围,找到低索引和高索引
low_idx = np.cumsum((dists >= 0), axis=0).argmax(axis=0).clip(max=log_sigmas.shape[0] - 2)
high_idx = low_idx + 1
# 根据低索引和高索引提取对应的 log_sigmas 值
low = log_sigmas[low_idx]
high = log_sigmas[high_idx]
# 进行 sigma 的插值计算
w = (low - log_sigma) / (low - high)
# 限制 w 的范围在 [0, 1]
w = np.clip(w, 0, 1)
# 将插值转换为时间范围 t
t = (1 - w) * low_idx + w * high_idx
# 调整 t 的形状以匹配 sigma 的形状
t = t.reshape(sigma.shape)
# 返回计算得到的时间 t
return t
# 定义一个私有方法,将 sigma 转换为 alpha_t 和 sigma_t
def _sigma_to_alpha_sigma_t(self, sigma):
# 计算 alpha_t,表示在时间 t 的方差
alpha_t = 1 / ((sigma**2 + 1) ** 0.5)
# 计算 sigma_t,结合 alpha_t 调整 sigma
sigma_t = sigma * alpha_t
# 返回计算得到的 alpha_t 和 sigma_t
return alpha_t, sigma_t
# 定义一个私有方法,将输入 sigmas 转换为 Karras 的噪声调度
def _convert_to_karras(self, in_sigmas: torch.Tensor, num_inference_steps) -> torch.Tensor:
"""构建 Karras 等人 (2022) 的噪声调度。"""
# 确保其他调度器在复制此函数时不会出错的 Hack
# TODO: 将此逻辑添加到其他调度器中
if hasattr(self.config, "sigma_min"):
sigma_min = self.config.sigma_min # 获取 sigma_min 配置
else:
sigma_min = None # 如果没有配置,则为 None
if hasattr(self.config, "sigma_max"):
sigma_max = self.config.sigma_max # 获取 sigma_max 配置
else:
sigma_max = None # 如果没有配置,则为 None
# 确保 sigma_min 和 sigma_max 的值有效
sigma_min = sigma_min if sigma_min is not None else in_sigmas[-1].item()
sigma_max = sigma_max if sigma_max is not None else in_sigmas[0].item()
rho = 7.0 # 论文中使用的常数值
# 创建一个从 0 到 1 的 ramp,用于插值
ramp = np.linspace(0, 1, num_inference_steps)
# 计算 sigma_min 和 sigma_max 的倒数
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
# 计算最终的 sigmas 值
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
# 返回计算得到的 sigmas
return sigmas
# 定义一个方法,将模型输出进行转换
def convert_model_output(
self,
model_output: torch.Tensor,
*args,
sample: torch.Tensor = None,
**kwargs,
# 定义一个方法,进行 DPM 求解器的一阶更新
def dpm_solver_first_order_update(
self,
model_output: torch.Tensor,
*args,
sample: torch.Tensor = None,
noise: Optional[torch.Tensor] = None,
**kwargs,
# 定义函数,返回一个张量,表示一阶 DPMSolver 的一步操作(相当于 DDIM)
) -> torch.Tensor:
"""
一阶 DPMSolver 的一步,等价于 DDIM。
参数:
model_output (`torch.Tensor`):
从学习到的扩散模型直接输出的张量。
timestep (`int`):
当前扩散链中的离散时间步。
prev_timestep (`int`):
上一个离散时间步。
sample (`torch.Tensor`):
由扩散过程创建的当前样本实例。
返回:
`torch.Tensor`:
在上一个时间步的样本张量。
"""
# 从参数中提取当前时间步,若未提供则为 None
timestep = args[0] if len(args) > 0 else kwargs.pop("timestep", None)
# 从参数中提取上一个时间步,若未提供则为 None
prev_timestep = args[1] if len(args) > 1 else kwargs.pop("prev_timestep", None)
# 若样本为 None,则尝试从参数中获取样本
if sample is None:
if len(args) > 2:
sample = args[2]
else:
# 若仍未获取样本,则抛出错误
raise ValueError(" missing `sample` as a required keyward argument")
# 若提供当前时间步,进行弃用警告
if timestep is not None:
deprecate(
"timesteps",
"1.0.0",
"Passing `timesteps` is deprecated and has no effect as model output conversion is now handled via an internal counter `self.step_index`",
)
# 若提供上一个时间步,进行弃用警告
if prev_timestep is not None:
deprecate(
"prev_timestep",
"1.0.0",
"Passing `prev_timestep` is deprecated and has no effect as model output conversion is now handled via an internal counter `self.step_index`",
)
# 获取当前和上一个时间步对应的 sigma 值
sigma_t, sigma_s = self.sigmas[self.step_index + 1], self.sigmas[self.step_index]
# 将 sigma 转换为 alpha 和 sigma_t
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t)
# 将 sigma 转换为 alpha 和 sigma_s
alpha_s, sigma_s = self._sigma_to_alpha_sigma_t(sigma_s)
# 计算 lambda_t 和 lambda_s
lambda_t = torch.log(alpha_t) - torch.log(sigma_t)
lambda_s = torch.log(alpha_s) - torch.log(sigma_s)
# 计算 h 值
h = lambda_t - lambda_s
# 根据算法类型计算 x_t
if self.config.algorithm_type == "dpmsolver++":
x_t = (sigma_t / sigma_s) * sample - (alpha_t * (torch.exp(-h) - 1.0)) * model_output
elif self.config.algorithm_type == "dpmsolver":
x_t = (alpha_t / alpha_s) * sample - (sigma_t * (torch.exp(h) - 1.0)) * model_output
elif self.config.algorithm_type == "sde-dpmsolver++":
# 确保噪声不为 None
assert noise is not None
x_t = (
(sigma_t / sigma_s * torch.exp(-h)) * sample
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * model_output
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise
)
# 返回计算得到的 x_t
return x_t
# 定义第二阶更新的单步 DPM 求解器函数
def singlestep_dpm_solver_second_order_update(
self,
# 接受模型输出列表,类型为张量列表
model_output_list: List[torch.Tensor],
# 接受可变参数
*args,
# 可选的样本张量,默认为 None
sample: torch.Tensor = None,
# 可选的噪声张量,默认为 None
noise: Optional[torch.Tensor] = None,
# 接受其他关键字参数
**kwargs,
# 定义第三阶更新的单步 DPM 求解器
def singlestep_dpm_solver_third_order_update(
self,
model_output_list: List[torch.Tensor], # 模型输出列表,包含当前和后续时间步的输出
*args, # 可变位置参数
sample: torch.Tensor = None, # 当前样本,默认为 None
**kwargs, # 可变关键字参数
def singlestep_dpm_solver_update(
self,
model_output_list: List[torch.Tensor], # 模型输出列表
*args, # 可变位置参数
sample: torch.Tensor = None, # 当前样本,默认为 None
order: int = None, # 当前步骤的求解器阶数
noise: Optional[torch.Tensor] = None, # 噪声张量,默认为 None
**kwargs, # 可变关键字参数
) -> torch.Tensor: # 返回类型为 torch.Tensor
"""
单步执行单步 DPM 求解器的更新。
参数:
model_output_list (`List[torch.Tensor]`):
当前和后续时间步的学习扩散模型的直接输出。
timestep (`int`):
当前和后续的离散时间步。
prev_timestep (`int`):
前一个离散时间步。
sample (`torch.Tensor`):
扩散过程创建的当前样本实例。
order (`int`):
当前步骤的求解器阶数。
返回:
`torch.Tensor`:
前一个时间步的样本张量。
"""
# 获取时间步列表,如果没有则从关键字参数中取出
timestep_list = args[0] if len(args) > 0 else kwargs.pop("timestep_list", None)
# 获取前一个时间步,如果没有则从关键字参数中取出
prev_timestep = args[1] if len(args) > 1 else kwargs.pop("prev_timestep", None)
# 如果样本为 None,尝试从参数中获取或抛出异常
if sample is None:
if len(args) > 2:
sample = args[2] # 从参数中获取样本
else:
raise ValueError(" missing`sample` as a required keyward argument") # 抛出异常
# 如果阶数为 None,尝试从参数中获取或抛出异常
if order is None:
if len(args) > 3:
order = args[3] # 从参数中获取阶数
else:
raise ValueError(" missing `order` as a required keyward argument") # 抛出异常
# 如果时间步列表不为 None,发出弃用警告
if timestep_list is not None:
deprecate(
"timestep_list", # 弃用的参数名称
"1.0.0", # 弃用版本
"Passing `timestep_list` is deprecated and has no effect as model output conversion is now handled via an internal counter `self.step_index`", # 弃用原因
)
# 如果前一个时间步不为 None,发出弃用警告
if prev_timestep is not None:
deprecate(
"prev_timestep", # 弃用的参数名称
"1.0.0", # 弃用版本
"Passing `prev_timestep` is deprecated and has no effect as model output conversion is now handled via an internal counter `self.step_index`", # 弃用原因
)
# 根据阶数调用相应的更新方法并返回结果
if order == 1:
return self.dpm_solver_first_order_update(model_output_list[-1], sample=sample, noise=noise) # 一阶更新
elif order == 2:
return self.singlestep_dpm_solver_second_order_update(model_output_list, sample=sample, noise=noise) # 二阶更新
elif order == 3:
return self.singlestep_dpm_solver_third_order_update(model_output_list, sample=sample) # 三阶更新
else:
raise ValueError(f"Order must be 1, 2, 3, got {order}") # 抛出异常,阶数无效
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.index_for_timestep 复制的代码
# 定义一个根据时间步查找索引的函数
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果没有提供调度时间步,则使用类的时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 查找调度时间步中与当前时间步匹配的索引候选
index_candidates = (schedule_timesteps == timestep).nonzero()
# 如果没有匹配的索引候选,则取最后一个时间步的索引
if len(index_candidates) == 0:
step_index = len(self.timesteps) - 1
# 如果找到多个匹配,则选择第二个匹配的索引
# 这样可以确保在去噪调度的中间开始时不会跳过 sigma
elif len(index_candidates) > 1:
step_index = index_candidates[1].item()
# 否则,选择第一个匹配的索引
else:
step_index = index_candidates[0].item()
# 返回找到的步索引
return step_index
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler._init_step_index 复制的函数
def _init_step_index(self, timestep):
"""
初始化调度器的 step_index 计数器。
"""
# 如果开始索引未设置
if self.begin_index is None:
# 如果时间步是张量,将其移动到时间步所在的设备
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
# 使用 index_for_timestep 方法获取当前的步索引
self._step_index = self.index_for_timestep(timestep)
else:
# 否则,使用已设置的开始索引
self._step_index = self._begin_index
# 定义 step 方法,处理模型输出和其他参数
def step(
self,
model_output: torch.Tensor,
timestep: Union[int, torch.Tensor],
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
# 定义一个函数,用于预测上一个时间步的样本,反向传播 SDE
) -> Union[SchedulerOutput, Tuple]:
"""
预测上一个时间步的样本,通过反向 SDE 实现。该函数使用单步 DPMSolver 传播样本。
参数:
model_output (`torch.Tensor`):
来自学习的扩散模型的直接输出。
timestep (`int`):
当前扩散链中的离散时间步。
sample (`torch.Tensor`):
扩散过程中创建的当前样本实例。
return_dict (`bool`):
是否返回 [`~schedulers.scheduling_utils.SchedulerOutput`] 或 `tuple`。
返回:
[`~schedulers.scheduling_utils.SchedulerOutput`] 或 `tuple`:
如果 return_dict 为 `True`,则返回 [`~schedulers.scheduling_utils.SchedulerOutput`],否则返回一个
元组,元组的第一个元素是样本张量。
"""
# 检查推断步骤数量是否为 None
if self.num_inference_steps is None:
# 如果为 None,则抛出错误,提示需要设置时间步
raise ValueError(
"Number of inference steps is 'None', you need to run 'set_timesteps' after creating the scheduler"
)
# 如果步骤索引为 None,初始化步骤索引
if self.step_index is None:
self._init_step_index(timestep)
# 转换模型输出以适应当前样本
model_output = self.convert_model_output(model_output, sample=sample)
# 将模型输出向前移动以存储最新的模型输出
for i in range(self.config.solver_order - 1):
self.model_outputs[i] = self.model_outputs[i + 1]
# 将最新的模型输出存储在最后一个位置
self.model_outputs[-1] = model_output
# 如果算法类型是 "sde-dpmsolver++",则生成噪声
if self.config.algorithm_type == "sde-dpmsolver++":
noise = randn_tensor(
model_output.shape, generator=generator, device=model_output.device, dtype=model_output.dtype
)
else:
# 否则将噪声设为 None
noise = None
# 获取当前步骤的顺序
order = self.order_list[self.step_index]
# 对于 img2img,去噪时可能从 order>1 开始,需确保前两步均为 order=1
while self.model_outputs[-order] is None:
order -= 1
# 对于单步求解器,使用每个时间步的初始值,顺序为 1
if order == 1:
self.sample = sample
# 使用单步 DPM 求解器更新样本
prev_sample = self.singlestep_dpm_solver_update(
self.model_outputs, sample=self.sample, order=order, noise=noise
)
# 完成后将步骤索引加一,噪声为噪声
self._step_index += 1
# 如果不返回字典,则返回一个只包含 prev_sample 的元组
if not return_dict:
return (prev_sample,)
# 否则返回 SchedulerOutput 对象
return SchedulerOutput(prev_sample=prev_sample)
# 定义一个函数,用于缩放模型输入
def scale_model_input(self, sample: torch.Tensor, *args, **kwargs) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器互换性。
参数:
sample (`torch.Tensor`):
输入样本。
返回:
`torch.Tensor`:
一个缩放后的输入样本。
"""
# 直接返回输入样本
return sample
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.add_noise 复制而来
def add_noise(
self,
original_samples: torch.Tensor, # 输入的原始样本,类型为张量
noise: torch.Tensor, # 要添加的噪声,类型为张量
timesteps: torch.IntTensor, # 时间步,类型为整数张量
) -> torch.Tensor: # 函数返回添加噪声后的样本,类型为张量
# 确保 sigmas 和 timesteps 与 original_samples 具有相同的设备和数据类型
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps): # 检查是否在 mps 设备上且 timesteps 是浮点类型
# mps 不支持 float64 类型
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32) # 将 timesteps 转换为 float32
timesteps = timesteps.to(original_samples.device, dtype=torch.float32) # 将 timesteps 转换为 float32
else:
schedule_timesteps = self.timesteps.to(original_samples.device) # 将 timesteps 转换为与 original_samples 相同的设备
timesteps = timesteps.to(original_samples.device) # 将 timesteps 转换为与 original_samples 相同的设备
# 当调度器用于训练或管道未实现 set_begin_index 时,begin_index 为 None
if self.begin_index is None:
# 根据给定的 timesteps 计算相应的步骤索引
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# 在第一次去噪步骤之后调用 add_noise(用于修复)
step_indices = [self.step_index] * timesteps.shape[0] # 使用当前步骤索引
else:
# 在第一次去噪步骤之前调用 add_noise 以创建初始潜在图像(img2img)
step_indices = [self.begin_index] * timesteps.shape[0] # 使用开始索引
sigma = sigmas[step_indices].flatten() # 获取相应步骤的 sigma 并展平
while len(sigma.shape) < len(original_samples.shape): # 确保 sigma 的维度与 original_samples 匹配
sigma = sigma.unsqueeze(-1) # 在最后一个维度增加一维
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma) # 将 sigma 转换为 alpha_t 和 sigma_t
noisy_samples = alpha_t * original_samples + sigma_t * noise # 计算添加噪声后的样本
return noisy_samples # 返回添加噪声后的样本
def __len__(self): # 定义获取对象长度的方法
return self.config.num_train_timesteps # 返回训练时间步的数量
# 版权所有 2024 CogVideoX 团队,清华大学、ZhipuAI 和 HuggingFace 团队。
# 保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)进行许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件
# 以“原样”基础提供,不附带任何明示或暗示的担保或条件。
# 有关许可证所管辖的权限和限制的具体说明,请参见许可证。
# 免责声明:此代码受到 https://github.com/pesser/pytorch_diffusion 和
# https://github.com/hojonathanho/diffusion 的强烈影响
# 导入数学模块
import math
# 从数据类库导入 dataclass 装饰器
from dataclasses import dataclass
# 导入类型相关的类
from typing import List, Optional, Tuple, Union
# 导入 numpy 库
import numpy as np
# 导入 torch 库
import torch
# 从配置工具导入 ConfigMixin 和 register_to_config
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具库导入 BaseOutput 类
from ..utils import BaseOutput
# 从 torch 工具库导入随机张量生成函数
from ..utils.torch_utils import randn_tensor
# 从调度工具库导入 KarrasDiffusionSchedulers 和 SchedulerMixin
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
# 从 diffusers.schedulers.scheduling_ddpm.DDPMSchedulerOutput 中复制,DDPM 改为 DDIM
class DDIMSchedulerOutput(BaseOutput):
"""
调度器的 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 的图像):
先前时间步的计算样本 `(x_{t-1})`。 `prev_sample` 应在去噪循环中用作下一个模型输入。
pred_original_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 的图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或指导。
"""
# 先前时间步的样本张量
prev_sample: torch.Tensor
# 可选的预测原始样本张量
pred_original_sample: Optional[torch.Tensor] = None
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制的函数
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个 beta 调度,离散化给定的 alpha_t_bar 函数,该函数定义了
随时间推移 (1-beta) 的累积乘积,从 t = [0,1]。
包含一个函数 alpha_bar,该函数接受一个参数 t,并将其转换为
在该部分扩散过程中 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 使用的最大 beta;使用小于 1 的值
防止奇异性。
alpha_transform_type (`str`,*可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于推动模型输出的 betas
# 文档字符串,描述该代码块的功能
"""
# 检查 alpha_transform_type 是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义 alpha_bar_fn 函数,计算基于余弦的变换
def alpha_bar_fn(t):
# 计算余弦函数的平方,调整时间参数 t
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义 alpha_bar_fn 函数,计算基于指数的变换
def alpha_bar_fn(t):
# 计算指数衰减函数
return math.exp(t * -12.0)
# 如果 alpha_transform_type 不符合预期,抛出错误
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表,用于存储 beta 值
betas = []
# 遍历所有的扩散时间步数
for i in range(num_diffusion_timesteps):
# 计算当前时间步的归一化值 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步的归一化值 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值并添加到列表中,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 beta 列表转换为 PyTorch 张量,数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 定义一个函数,用于重新缩放 beta,使其终端信噪比为零
def rescale_zero_terminal_snr(alphas_cumprod):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 重新缩放 beta 使终端信噪比为零
参数:
betas (`torch.Tensor`):
初始化调度器时使用的 betas。
返回:
`torch.Tensor`: 具有零终端信噪比的重新缩放的 betas
"""
# 计算累积 alpha 的平方根
alphas_bar_sqrt = alphas_cumprod.sqrt()
# 存储旧值
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 记录第一个 alpha 的平方根
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 记录最后一个 alpha 的平方根
# 将最后一个时间步的值移至零
alphas_bar_sqrt -= alphas_bar_sqrt_T
# 缩放以将第一个时间步的值恢复到旧值
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T)
# 将 alphas_bar_sqrt 转换为 betas
alphas_bar = alphas_bar_sqrt**2 # 还原平方根
# 返回重新缩放后的 alpha 值
return alphas_bar
# 定义一个类,继承自 SchedulerMixin 和 ConfigMixin,扩展去噪过程
class CogVideoXDPMScheduler(SchedulerMixin, ConfigMixin):
"""
`DDIMScheduler` 扩展了在去噪扩散概率模型 (DDPM) 中引入的去噪过程,增加了非马尔可夫指导。
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。查看超类文档以获取库为所有调度器实现的通用方法,例如加载和保存。
"""
# 兼容的调度器名称列表
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
# 设置调度器的顺序
order = 1
@register_to_config
# 初始化方法,定义调度器的参数
def __init__(
self,
num_train_timesteps: int = 1000, # 训练时间步的数量
beta_start: float = 0.00085, # beta 的起始值
beta_end: float = 0.0120, # beta 的结束值
beta_schedule: str = "scaled_linear", # beta 调度的类型
trained_betas: Optional[Union[np.ndarray, List[float]]] = None, # 训练好的 betas(可选)
clip_sample: bool = True, # 是否裁剪样本
set_alpha_to_one: bool = True, # 是否将 alpha 设置为 1
steps_offset: int = 0, # 时间步的偏移量
prediction_type: str = "epsilon", # 预测类型
clip_sample_range: float = 1.0, # 裁剪样本的范围
sample_max_value: float = 1.0, # 样本的最大值
timestep_spacing: str = "leading", # 时间步间隔的类型
rescale_betas_zero_snr: bool = False, # 是否重新缩放 betas 使终端 SNR 为零
snr_shift_scale: float = 3.0, # SNR 移位比例
):
# 检查是否有训练好的 beta 值
if trained_betas is not None:
# 将训练好的 beta 值转换为张量,数据类型为 float32
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果 beta_schedule 是 "linear"
elif beta_schedule == "linear":
# 在给定范围内生成线性间隔的 beta 值
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果 beta_schedule 是 "scaled_linear"
elif beta_schedule == "scaled_linear":
# 该调度特定于潜在扩散模型
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float64) ** 2
# 如果 beta_schedule 是 "squaredcos_cap_v2"
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果以上情况都不符合
else:
# 抛出未实现的错误
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 计算 alpha 值
self.alphas = 1.0 - self.betas
# 计算累积 alpha 值
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 修改:根据 SD3 调整 SNR
self.alphas_cumprod = self.alphas_cumprod / (snr_shift_scale + (1 - snr_shift_scale) * self.alphas_cumprod)
# 如果需要为零 SNR 重新缩放
if rescale_betas_zero_snr:
# 重新缩放累积 alpha 值
self.alphas_cumprod = rescale_zero_terminal_snr(self.alphas_cumprod)
# 在每个 ddim 步骤中,查看前一个累积 alpha 值
# 对于最后一步,没有前一个累积 alpha 值,因为已经在 0 了
# `set_alpha_to_one` 决定是否将该参数设置为 1,或使用“非前一个”的最终 alpha
self.final_alpha_cumprod = torch.tensor(1.0) if set_alpha_to_one else self.alphas_cumprod[0]
# 初始噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值
self.num_inference_steps = None
# 创建时间步长的反向序列
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64))
# 获取特定时间步的方差
def _get_variance(self, timestep, prev_timestep):
# 当前时间步的累积 alpha 值
alpha_prod_t = self.alphas_cumprod[timestep]
# 前一个时间步的累积 alpha 值(如果有效)
alpha_prod_t_prev = self.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else self.final_alpha_cumprod
# 当前时间步的 beta 值
beta_prod_t = 1 - alpha_prod_t
# 前一个时间步的 beta 值
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 根据 beta 和 alpha 计算方差
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算出的方差
return variance
# 对模型输入进行缩放,确保与调度器的兼容性
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器互换性。
Args:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *optional*):
扩散链中的当前时间步。
Returns:
`torch.Tensor`:
缩放后的输入样本。
"""
# 返回输入样本(未进行实际缩放)
return sample
# 定义设置推理步数的方法,接受推理步骤数和设备参数
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步(在推理之前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数,基于预训练模型。
"""
# 检查推理步骤数是否超过训练时间步数
if num_inference_steps > self.config.num_train_timesteps:
raise ValueError(
f"`num_inference_steps`: {num_inference_steps} 不能大于 `self.config.train_timesteps`:"
f" {self.config.num_train_timesteps},因为与此调度器一起训练的 unet 模型只能处理"
f" 最大 {self.config.num_train_timesteps} 时间步。"
)
# 将推理步骤数赋值给实例变量
self.num_inference_steps = num_inference_steps
# 根据配置的时间步间隔类型生成时间步
if self.config.timestep_spacing == "linspace":
# 生成线性间隔的时间步并反向排序
timesteps = (
np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps)
.round()[::-1]
.copy()
.astype(np.int64)
)
elif self.config.timestep_spacing == "leading":
# 计算步长比例
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 通过乘以比例生成整数时间步,避免出现问题
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64)
# 添加步骤偏移量
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算步长比例
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 通过乘以比例生成整数时间步,避免出现问题
timesteps = np.round(np.arange(self.config.num_train_timesteps, 0, -step_ratio)).astype(np.int64)
# 减去1以调整时间步
timesteps -= 1
else:
# 抛出错误,如果时间步间隔不被支持
raise ValueError(
f"{self.config.timestep_spacing} 不被支持。请确保选择 'leading' 或 'trailing'。"
)
# 将生成的时间步转换为张量并移动到指定设备
self.timesteps = torch.from_numpy(timesteps).to(device)
# 定义获取变量的方法,计算相关的 lambda 值
def get_variables(self, alpha_prod_t, alpha_prod_t_prev, alpha_prod_t_back=None):
# 计算当前时间步的 lambda 值
lamb = ((alpha_prod_t / (1 - alpha_prod_t)) ** 0.5).log()
# 计算前一个时间步的 lambda 值
lamb_next = ((alpha_prod_t_prev / (1 - alpha_prod_t_prev)) ** 0.5).log()
# 计算两个 lambda 之间的差
h = lamb_next - lamb
# 如果提供了反向时间步的 alpha 值,进行进一步计算
if alpha_prod_t_back is not None:
lamb_previous = ((alpha_prod_t_back / (1 - alpha_prod_t_back)) ** 0.5).log()
# 计算最后一个 h 值
h_last = lamb - lamb_previous
# 计算比率 r
r = h_last / h
return h, r, lamb, lamb_next
else:
# 返回 h 和 lambda 值
return h, None, lamb, lamb_next
# 计算多重输出的辅助函数
def get_mult(self, h, r, alpha_prod_t, alpha_prod_t_prev, alpha_prod_t_back):
# 计算第一个乘数,使用当前和前一个 alpha 值
mult1 = ((1 - alpha_prod_t_prev) / (1 - alpha_prod_t)) ** 0.5 * (-h).exp()
# 计算第二个乘数,基于 h 和前一个 alpha 值
mult2 = (-2 * h).expm1() * alpha_prod_t_prev**0.5
# 如果 alpha_prod_t_back 不是 None,则计算额外的乘数
if alpha_prod_t_back is not None:
# 计算第三个乘数,基于 r
mult3 = 1 + 1 / (2 * r)
# 计算第四个乘数,基于 r
mult4 = 1 / (2 * r)
# 返回四个乘数
return mult1, mult2, mult3, mult4
else:
# 返回前两个乘数
return mult1, mult2
# 步进函数,处理模型输出和样本
def step(
self,
model_output: torch.Tensor, # 模型输出的张量
old_pred_original_sample: torch.Tensor, # 之前的原始样本预测
timestep: int, # 当前时间步
timestep_back: int, # 回溯时间步
sample: torch.Tensor, # 输入样本的张量
eta: float = 0.0, # 附加参数,默认为 0
use_clipped_model_output: bool = False, # 是否使用剪辑后的模型输出
generator=None, # 随机数生成器
variance_noise: Optional[torch.Tensor] = None, # 可选的方差噪声
return_dict: bool = False, # 是否以字典形式返回
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.add_noise 复制的函数
def add_noise(
self,
original_samples: torch.Tensor, # 原始样本的张量
noise: torch.Tensor, # 噪声的张量
timesteps: torch.IntTensor, # 时间步的张量
) -> torch.Tensor: # 返回添加噪声后的张量
# 确保 alphas_cumprod 和 timestep 与 original_samples 具有相同的设备和数据类型
# 将 self.alphas_cumprod 移动到设备,以避免后续 add_noise 调用时重复的 CPU 到 GPU 数据移动
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为与 original_samples 相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将 timesteps 移动到 original_samples 的设备
timesteps = timesteps.to(original_samples.device)
# 计算当前时间步的 alpha 乘积的平方根
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 展平平方根的 alpha 乘积
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于原始样本的维度,则添加额外的维度
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算当前时间步的 1 - alpha 乘积的平方根
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 展平平方根的 1 - alpha 乘积
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于原始样本的维度,则添加额外的维度
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算带噪声的样本
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回带噪声的样本
return noisy_samples
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.get_velocity 复制的函数
# 定义一个获取速度的函数,接受样本、噪声和时间步作为输入,返回速度张量
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timesteps 与样本的设备和数据类型相同
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 的数据类型转换为样本的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将 timesteps 转移到样本的设备
timesteps = timesteps.to(sample.device)
# 计算 sqrt(alpha_prod) 的平方根,基于给定的时间步
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将其展平为一维张量
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果平方根的形状小于样本的形状,则在最后一个维度添加维度
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算 sqrt(1 - alpha_prod) 的平方根
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将其展平为一维张量
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果平方根的形状小于样本的形状,则在最后一个维度添加维度
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度,结合噪声和样本的影响
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算出的速度
return velocity
# 定义一个返回训练时间步数的函数
def __len__(self):
# 返回配置中的训练时间步数
return self.config.num_train_timesteps
# 版权所有 2024 TSAIL Team 和 The HuggingFace Team。保留所有权利。
#
# 根据 Apache License, Version 2.0(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,按“原样”分发软件,
# 不提供任何形式的保证或条件,无论是明示或暗示。
# 有关许可证所涵盖的特定权限和限制,请参见许可证。
# 免责声明:此文件受到 https://github.com/LuChengTHU/dpm-solver 和 https://github.com/NVlabs/edm 的强烈影响
# 导入数学库
import math
# 从 typing 模块导入必要的类型提示
from typing import List, Optional, Tuple, Union
# 导入 NumPy 库,通常用于数组和数学运算
import numpy as np
# 导入 PyTorch 库,用于深度学习操作
import torch
# 从配置工具中导入 ConfigMixin 和 register_to_config
from ..configuration_utils import ConfigMixin, register_to_config
# 从 Torch 工具中导入随机张量生成函数
from ..utils.torch_utils import randn_tensor
# 从调度工具中导入调度器混合类和调度器输出类
from .scheduling_utils import SchedulerMixin, SchedulerOutput
class EDMDPMSolverMultistepScheduler(SchedulerMixin, ConfigMixin):
"""
实现 Karras 等人 2022 年提出的 EDM 形式的 DPMSolverMultistepScheduler [1]。
`EDMDPMSolverMultistepScheduler` 是用于扩散 ODE 的快速专用高阶求解器。
[1] Karras, Tero, et al. "Elucidating the Design Space of Diffusion-Based Generative Models."
https://arxiv.org/abs/2206.00364
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。有关所有调度器的通用方法的文档,
请查看超类文档,例如加载和保存。
"""
# 定义兼容性列表,初始化为空
_compatibles = []
# 定义求解器的阶数,默认为 1
order = 1
@register_to_config
# 初始化函数,接收多个参数,具有默认值
def __init__(
# 最小 sigma 值,默认 0.002
self,
sigma_min: float = 0.002,
# 最大 sigma 值,默认 80.0
sigma_max: float = 80.0,
# 数据的 sigma 值,默认 0.5
sigma_data: float = 0.5,
# sigma 的调度类型,默认为 "karras"
sigma_schedule: str = "karras",
# 训练的时间步数,默认 1000
num_train_timesteps: int = 1000,
# 预测类型,默认为 "epsilon"
prediction_type: str = "epsilon",
# ρ 值,默认 7.0
rho: float = 7.0,
# 求解器的阶数,默认 2
solver_order: int = 2,
# 是否进行阈值处理,默认为 False
thresholding: bool = False,
# 动态阈值处理的比例,默认 0.995
dynamic_thresholding_ratio: float = 0.995,
# 采样的最大值,默认 1.0
sample_max_value: float = 1.0,
# 算法类型,默认为 "dpmsolver++"
algorithm_type: str = "dpmsolver++",
# 求解器类型,默认为 "midpoint"
solver_type: str = "midpoint",
# 最终是否使用较低阶的处理,默认为 True
lower_order_final: bool = True,
# 最终步骤是否使用欧拉法,默认为 False
euler_at_final: bool = False,
# 最终 sigma 的类型,默认为 "zero",可选值有 "zero" 和 "sigma_min"
final_sigmas_type: Optional[str] = "zero", # "zero", "sigma_min"
):
# DPM-Solver的设置
# 检查算法类型是否在支持的类型中
if algorithm_type not in ["dpmsolver++", "sde-dpmsolver++"]:
# 如果算法类型是“deis”,则注册为“dpmsolver++”
if algorithm_type == "deis":
self.register_to_config(algorithm_type="dpmsolver++")
# 否则,抛出未实现的错误
else:
raise NotImplementedError(f"{algorithm_type} is not implemented for {self.__class__}")
# 检查求解器类型是否在支持的类型中
if solver_type not in ["midpoint", "heun"]:
# 如果求解器类型是“logrho”、“bh1”或“bh2”,则注册为“midpoint”
if solver_type in ["logrho", "bh1", "bh2"]:
self.register_to_config(solver_type="midpoint")
# 否则,抛出未实现的错误
else:
raise NotImplementedError(f"{solver_type} is not implemented for {self.__class__}")
# 检查算法类型和最终标准差类型的兼容性
if algorithm_type not in ["dpmsolver++", "sde-dpmsolver++"] and final_sigmas_type == "zero":
# 如果不兼容,抛出值错误
raise ValueError(
f"`final_sigmas_type` {final_sigmas_type} is not supported for `algorithm_type` {algorithm_type}. Please choose `sigma_min` instead."
)
# 创建一个从0到1的等间隔张量,长度为训练时间步数
ramp = torch.linspace(0, 1, num_train_timesteps)
# 如果sigma调度是“karras”,计算相应的sigma值
if sigma_schedule == "karras":
sigmas = self._compute_karras_sigmas(ramp)
# 如果sigma调度是“exponential”,计算相应的sigma值
elif sigma_schedule == "exponential":
sigmas = self._compute_exponential_sigmas(ramp)
# 对计算得到的sigma进行预处理噪声
self.timesteps = self.precondition_noise(sigmas)
# 将sigma与一个零张量连接,确保与设备一致
self.sigmas = torch.cat([sigmas, torch.zeros(1, device=sigmas.device)])
# 可设置的值
self.num_inference_steps = None # 推理步骤数量初始化为None
self.model_outputs = [None] * solver_order # 模型输出初始化为None列表,长度为求解器顺序
self.lower_order_nums = 0 # 较低阶数初始化为0
self._step_index = None # 步骤索引初始化为None
self._begin_index = None # 开始索引初始化为None
# 将sigma转移到CPU,以避免过多的CPU/GPU通信
self.sigmas = self.sigmas.to("cpu")
@property
def init_noise_sigma(self):
# 返回初始噪声分布的标准差
return (self.config.sigma_max**2 + 1) ** 0.5
@property
def step_index(self):
"""
当前时间步的索引计数器。每次调度步骤后增加1。
"""
return self._step_index
@property
def begin_index(self):
"""
第一个时间步的索引。应通过管道使用`set_begin_index`方法设置。
"""
return self._begin_index
# 从diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index复制
def set_begin_index(self, begin_index: int = 0):
"""
设置调度器的开始索引。此函数应在推理之前从管道运行。
Args:
begin_index (`int`):
调度器的开始索引。
"""
self._begin_index = begin_index
# 从diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.precondition_inputs复制
def precondition_inputs(self, sample, sigma):
# 计算输入的预处理系数
c_in = 1 / ((sigma**2 + self.config.sigma_data**2) ** 0.5)
# 将样本按预处理系数缩放
scaled_sample = sample * c_in
# 返回缩放后的样本
return scaled_sample
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.precondition_noise 复制的
def precondition_noise(self, sigma):
# 检查 sigma 是否为 PyTorch 张量,如果不是则转换为张量
if not isinstance(sigma, torch.Tensor):
sigma = torch.tensor([sigma])
# 计算 c_noise,基于 sigma 计算对数并乘以 0.25
c_noise = 0.25 * torch.log(sigma)
# 返回计算得到的 c_noise
return c_noise
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.precondition_outputs 复制的
def precondition_outputs(self, sample, model_output, sigma):
# 获取配置信息中的 sigma_data
sigma_data = self.config.sigma_data
# 计算 c_skip,用于后续的输出合成
c_skip = sigma_data**2 / (sigma**2 + sigma_data**2)
# 根据配置中的预测类型计算 c_out
if self.config.prediction_type == "epsilon":
# 计算 epsilon 预测下的 c_out
c_out = sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
elif self.config.prediction_type == "v_prediction":
# 计算 v 预测下的 c_out
c_out = -sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
else:
# 如果预测类型不支持,则抛出错误
raise ValueError(f"Prediction type {self.config.prediction_type} is not supported.")
# 根据 c_skip 和 c_out 组合去噪样本与模型输出
denoised = c_skip * sample + c_out * model_output
# 返回去噪后的结果
return denoised
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler.scale_model_input 复制的
def scale_model_input(self, sample: torch.Tensor, timestep: Union[float, torch.Tensor]) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器的可互换性。通过 `(sigma**2 + 1) ** 0.5` 缩放去噪模型输入,以匹配欧拉算法。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 如果 step_index 尚未初始化,则初始化它
if self.step_index is None:
self._init_step_index(timestep)
# 获取当前 step_index 对应的 sigma
sigma = self.sigmas[self.step_index]
# 对输入样本进行预处理
sample = self.precondition_inputs(sample, sigma)
# 标记输入缩放函数已被调用
self.is_scale_input_called = True
# 返回处理后的样本
return sample
# 定义设置推理步骤的方法,接受推理步数和设备类型参数
def set_timesteps(self, num_inference_steps: int = None, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步(在推理之前运行)。
参数:
num_inference_steps (`int`):
用于生成样本的扩散步骤数,使用预训练模型时。
device (`str` 或 `torch.device`, *可选*):
时间步应移动到的设备。如果为 `None`,则时间步不会移动。
"""
# 将推理步骤数保存到实例变量
self.num_inference_steps = num_inference_steps
# 生成一个从 0 到 1 的等间距张量,长度为推理步骤数
ramp = torch.linspace(0, 1, self.num_inference_steps)
# 根据配置选择 sigma 调度方式
if self.config.sigma_schedule == "karras":
# 使用 Karras 方法计算 sigma 值
sigmas = self._compute_karras_sigmas(ramp)
elif self.config.sigma_schedule == "exponential":
# 使用指数方法计算 sigma 值
sigmas = self._compute_exponential_sigmas(ramp)
# 将 sigma 转换为浮点类型并移动到指定设备
sigmas = sigmas.to(dtype=torch.float32, device=device)
# 对 sigma 进行预处理以获得时间步
self.timesteps = self.precondition_noise(sigmas)
# 根据配置决定最后的 sigma 值
if self.config.final_sigmas_type == "sigma_min":
sigma_last = self.config.sigma_min
elif self.config.final_sigmas_type == "zero":
sigma_last = 0
else:
# 如果 final_sigmas_type 的值无效,则抛出异常
raise ValueError(
f"`final_sigmas_type` must be one of 'zero', or 'sigma_min', but got {self.config.final_sigmas_type}"
)
# 将最后的 sigma 值添加到 sigma 张量中
self.sigmas = torch.cat([sigmas, torch.tensor([sigma_last], dtype=torch.float32, device=device)])
# 初始化模型输出列表,长度为求解器的阶数
self.model_outputs = [
None,
] * self.config.solver_order
# 记录较低阶数的数量
self.lower_order_nums = 0
# 为允许重复时间步的调度器添加索引计数器
self._step_index = None
self._begin_index = None
# 将 sigma 移动到 CPU,避免过多的 CPU/GPU 通信
self.sigmas = self.sigmas.to("cpu")
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler._compute_karras_sigmas 复制的方法
def _compute_karras_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
"""构建 Karras 等人 (2022) 的噪声调度。"""
# 如果没有提供 sigma_min,则使用配置中的值
sigma_min = sigma_min or self.config.sigma_min
# 如果没有提供 sigma_max,则使用配置中的值
sigma_max = sigma_max or self.config.sigma_max
# 获取配置中的 rho 值
rho = self.config.rho
# 计算 sigma_min 和 sigma_max 的倒数
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
# 计算 sigmas,根据 Karras 方法
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
return sigmas
# 从 diffusers.schedulers.scheduling_edm_euler.EDMEulerScheduler._compute_exponential_sigmas 复制的方法
# 计算指数sigma值,基于给定的ramp和可选的sigma_min和sigma_max参数
def _compute_exponential_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
# 文档字符串,描述该函数的实现和相关链接
"""Implementation closely follows k-diffusion.
https://github.com/crowsonkb/k-diffusion/blob/6ab5146d4a5ef63901326489f31f1d8e7dd36b48/k_diffusion/sampling.py#L26
"""
# 如果sigma_min未提供,则使用配置中的默认值
sigma_min = sigma_min or self.config.sigma_min
# 如果sigma_max未提供,则使用配置中的默认值
sigma_max = sigma_max or self.config.sigma_max
# 生成从log(sigma_min)到log(sigma_max)的等间距值,取指数后翻转顺序
sigmas = torch.linspace(math.log(sigma_min), math.log(sigma_max), len(ramp)).exp().flip(0)
# 返回计算得到的sigma值
return sigmas
# 从diffusers.schedulers.scheduling_ddpm.DDPMScheduler._threshold_sample复制而来
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
# 文档字符串,描述动态阈值处理的实现和效果
"""
"Dynamic thresholding: At each sampling step we set s to a certain percentile absolute pixel value in xt0 (the
prediction of x_0 at timestep t), and if s > 1, then we threshold xt0 to the range [-s, s] and then divide by
s. Dynamic thresholding pushes saturated pixels (those near -1 and 1) inwards, thereby actively preventing
pixels from saturation at each step. We find that dynamic thresholding results in significantly better
photorealism as well as better image-text alignment, especially when using very large guidance weights."
https://arxiv.org/abs/2205.11487
"""
# 获取输入样本的数据类型
dtype = sample.dtype
# 获取样本的批量大小、通道数和其他维度
batch_size, channels, *remaining_dims = sample.shape
# 如果数据类型不是float32或float64,则将样本转换为float类型
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # upcast for quantile calculation, and clamp not implemented for cpu half
# 将样本展平,以便沿每个图像进行分位数计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值,用于后续的阈值计算
abs_sample = sample.abs() # "a certain percentile absolute pixel value"
# 计算样本的动态阈值s,基于配置的动态阈值比例
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 将s限制在[1, sample_max_value]范围内
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # When clamped to min=1, equivalent to standard clipping to [-1, 1]
# 在第一维增加一个维度,以便后续广播
s = s.unsqueeze(1) # (batch_size, 1) because clamp will broadcast along dim=0
# 将样本限制在[-s, s]范围内,并除以s进行归一化
sample = torch.clamp(sample, -s, s) / s # "we threshold xt0 to the range [-s, s] and then divide by s"
# 将样本恢复到原始形状
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原始数据类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 从diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._sigma_to_t复制而来
# 定义一个私有方法,计算 sigma 和 log_sigmas 之间的关系
def _sigma_to_t(self, sigma, log_sigmas):
# 计算 sigma 的对数值,避免过小的 sigma 值引发计算问题
log_sigma = np.log(np.maximum(sigma, 1e-10))
# 计算 log_sigma 与 log_sigmas 之间的距离
dists = log_sigma - log_sigmas[:, np.newaxis]
# 找到满足条件的最小索引,表示 sigma 在 log_sigmas 中的范围
low_idx = np.cumsum((dists >= 0), axis=0).argmax(axis=0).clip(max=log_sigmas.shape[0] - 2)
high_idx = low_idx + 1
# 根据索引获取对应的 log_sigmas 值
low = log_sigmas[low_idx]
high = log_sigmas[high_idx]
# 通过线性插值计算权重 w
w = (low - log_sigma) / (low - high)
w = np.clip(w, 0, 1) # 限制 w 的范围在 0 到 1 之间
# 将插值结果转换为时间范围 t
t = (1 - w) * low_idx + w * high_idx
t = t.reshape(sigma.shape) # 重新调整 t 的形状以匹配 sigma
return t # 返回计算出的时间范围
# 定义一个私有方法,将 sigma 转换为 alpha_t 和 sigma_t
def _sigma_to_alpha_sigma_t(self, sigma):
alpha_t = torch.tensor(1) # 输入在进入 unet 之前已预缩放,因此 alpha_t = 1
sigma_t = sigma # 将 sigma 直接赋值给 sigma_t
return alpha_t, sigma_t # 返回 alpha_t 和 sigma_t
# 定义一个方法,将模型输出转换为 DPM 算法需要的类型
def convert_model_output(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
) -> torch.Tensor:
"""
将模型输出转换为 DPMSolver/DPMSolver++ 算法所需的相应类型。DPM-Solver
旨在离散化噪声预测模型的积分,DPM-Solver++ 旨在离散化数据预测模型的积分。
<Tip>
算法和模型类型是解耦的。您可以使用 DPMSolver 或 DPMSolver++ 处理噪声
预测和数据预测模型。
</Tip>
Args:
model_output (`torch.Tensor`):
从学习到的扩散模型直接输出的结果。
sample (`torch.Tensor`):
当前由扩散过程创建的样本实例。
Returns:
`torch.Tensor`:
转换后的模型输出。
"""
sigma = self.sigmas[self.step_index] # 获取当前步骤的 sigma 值
x0_pred = self.precondition_outputs(sample, model_output, sigma) # 预处理模型输出
if self.config.thresholding: # 检查是否启用阈值处理
x0_pred = self._threshold_sample(x0_pred) # 应用阈值处理
return x0_pred # 返回处理后的预测结果
# 定义一个方法,用于执行 DPM 求解器的第一阶更新
def dpm_solver_first_order_update(
self,
model_output: torch.Tensor,
sample: torch.Tensor = None,
noise: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
一步操作用于第一阶 DPMSolver(等同于 DDIM)。
Args:
model_output (`torch.Tensor`):
从学习的扩散模型直接输出的张量。
sample (`torch.Tensor`):
通过扩散过程创建的当前样本实例。
Returns:
`torch.Tensor`:
上一个时间步的样本张量。
"""
# 获取当前和上一个时间步的 sigma 值
sigma_t, sigma_s = self.sigmas[self.step_index + 1], self.sigmas[self.step_index]
# 将 sigma 转换为 alpha 和 sigma_t
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t)
# 将上一个 sigma 转换为 alpha 和 sigma_s
alpha_s, sigma_s = self._sigma_to_alpha_sigma_t(sigma_s)
# 计算 lambda_t 和 lambda_s 的差
lambda_t = torch.log(alpha_t) - torch.log(sigma_t)
lambda_s = torch.log(alpha_s) - torch.log(sigma_s)
# 计算 h 值
h = lambda_t - lambda_s
# 根据配置选择算法类型并进行相应计算
if self.config.algorithm_type == "dpmsolver++":
# 使用 dpmsolver++ 算法计算 x_t
x_t = (sigma_t / sigma_s) * sample - (alpha_t * (torch.exp(-h) - 1.0)) * model_output
elif self.config.algorithm_type == "sde-dpmsolver++":
# 确保噪声不为 None
assert noise is not None
# 使用 sde-dpmsolver++ 算法计算 x_t
x_t = (
(sigma_t / sigma_s * torch.exp(-h)) * sample
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * model_output
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise
)
# 返回计算得到的样本 x_t
return x_t
# 定义多步 DPM Solver 的二阶更新方法
def multistep_dpm_solver_second_order_update(
self,
# 模型输出的张量列表
model_output_list: List[torch.Tensor],
# 当前样本的张量,默认为 None
sample: torch.Tensor = None,
# 可选的噪声张量,默认为 None
noise: Optional[torch.Tensor] = None,
# 返回一个张量,表示第二阶多步 DPMSolver 的一步
) -> torch.Tensor:
"""
一步对于第二阶多步 DPMSolver。
参数:
model_output_list (`List[torch.Tensor]`):
当前及后续时间步学习到的扩散模型直接输出。
sample (`torch.Tensor`):
通过扩散过程生成的当前样本实例。
返回:
`torch.Tensor`:
上一个时间步的样本张量。
"""
# 获取当前时间步和相邻时间步的 sigma 值
sigma_t, sigma_s0, sigma_s1 = (
self.sigmas[self.step_index + 1], # 下一个时间步的 sigma
self.sigmas[self.step_index], # 当前时间步的 sigma
self.sigmas[self.step_index - 1], # 上一个时间步的 sigma
)
# 将 sigma 转换为 alpha 和 sigma_t
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t)
alpha_s0, sigma_s0 = self._sigma_to_alpha_sigma_t(sigma_s0)
alpha_s1, sigma_s1 = self._sigma_to_alpha_sigma_t(sigma_s1)
# 计算 lambda 值
lambda_t = torch.log(alpha_t) - torch.log(sigma_t) # 当前时间步的 lambda
lambda_s0 = torch.log(alpha_s0) - torch.log(sigma_s0) # 上一个时间步的 lambda
lambda_s1 = torch.log(alpha_s1) - torch.log(sigma_s1) # 前两个时间步的 lambda
# 获取模型输出的最后两个值
m0, m1 = model_output_list[-1], model_output_list[-2]
# 计算 h 和 r0
h, h_0 = lambda_t - lambda_s0, lambda_s0 - lambda_s1
r0 = h_0 / h # 计算 r0
D0, D1 = m0, (1.0 / r0) * (m0 - m1) # 计算 D0 和 D1
if self.config.algorithm_type == "dpmsolver++": # 检查算法类型
# 详细推导请参见 https://arxiv.org/abs/2211.01095
if self.config.solver_type == "midpoint": # 检查求解器类型
# 根据中点法计算 x_t
x_t = (
(sigma_t / sigma_s0) * sample # 根据 sigma 调整样本
- (alpha_t * (torch.exp(-h) - 1.0)) * D0 # 减去第一个项
- 0.5 * (alpha_t * (torch.exp(-h) - 1.0)) * D1 # 减去第二个项
)
elif self.config.solver_type == "heun": # 如果使用 Heun 方法
# 根据 Heun 方法计算 x_t
x_t = (
(sigma_t / sigma_s0) * sample # 根据 sigma 调整样本
- (alpha_t * (torch.exp(-h) - 1.0)) * D0 # 减去第一个项
+ (alpha_t * ((torch.exp(-h) - 1.0) / h + 1.0)) * D1 # 加上第二个项
)
elif self.config.algorithm_type == "sde-dpmsolver++": # 检查另一种算法类型
assert noise is not None # 确保噪声不为空
if self.config.solver_type == "midpoint": # 使用中点法
# 根据中点法计算 x_t,考虑噪声
x_t = (
(sigma_t / sigma_s0 * torch.exp(-h)) * sample # 根据 sigma 和样本
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * D0 # 加上第一个项
+ 0.5 * (alpha_t * (1 - torch.exp(-2.0 * h))) * D1 # 加上第二个项
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise # 加上噪声项
)
elif self.config.solver_type == "heun": # 如果使用 Heun 方法
# 根据 Heun 方法计算 x_t,考虑噪声
x_t = (
(sigma_t / sigma_s0 * torch.exp(-h)) * sample # 根据 sigma 和样本
+ (alpha_t * (1 - torch.exp(-2.0 * h))) * D0 # 加上第一个项
+ (alpha_t * ((1.0 - torch.exp(-2.0 * h)) / (-2.0 * h) + 1.0)) * D1 # 加上第二个项
+ sigma_t * torch.sqrt(1.0 - torch.exp(-2 * h)) * noise # 加上噪声项
)
# 返回计算得到的 x_t
return x_t
# 多步 DPM 求解器的三阶更新
def multistep_dpm_solver_third_order_update(
self,
model_output_list: List[torch.Tensor], # 模型输出列表
sample: torch.Tensor = None, # 当前样本
) -> torch.Tensor: # 指定函数返回值类型为 torch.Tensor
""" # 文档字符串开始
One step for the third-order multistep DPMSolver. # 说明这是第三阶多步 DPMSolver 的一步
Args: # 参数说明部分开始
model_output_list (`List[torch.Tensor]`): # 输入参数 model_output_list 的类型为列表,元素为 torch.Tensor
The direct outputs from learned diffusion model at current and latter timesteps. # 描述该参数为当前及后续时间步的扩散模型输出
sample (`torch.Tensor`): # 输入参数 sample 的类型为 torch.Tensor
A current instance of a sample created by diffusion process. # 描述该参数为扩散过程中创建的当前样本实例
Returns: # 返回值说明部分开始
`torch.Tensor`: # 返回值类型为 torch.Tensor
The sample tensor at the previous timestep. # 描述返回的是上一个时间步的样本张量
""" # 文档字符串结束
sigma_t, sigma_s0, sigma_s1, sigma_s2 = ( # 从 sigmas 属性中提取当前及前两步的 sigma 值
self.sigmas[self.step_index + 1], # 获取下一个时间步的 sigma 值
self.sigmas[self.step_index], # 获取当前时间步的 sigma 值
self.sigmas[self.step_index - 1], # 获取前一个时间步的 sigma 值
self.sigmas[self.step_index - 2], # 获取前两个时间步的 sigma 值
)
alpha_t, sigma_t = self._sigma_to_alpha_sigma_t(sigma_t) # 将 sigma_t 转换为对应的 alpha_t 和 sigma_t
alpha_s0, sigma_s0 = self._sigma_to_alpha_sigma_t(sigma_s0) # 将 sigma_s0 转换为对应的 alpha_s0 和 sigma_s0
alpha_s1, sigma_s1 = self._sigma_to_alpha_sigma_t(sigma_s1) # 将 sigma_s1 转换为对应的 alpha_s1 和 sigma_s1
alpha_s2, sigma_s2 = self._sigma_to_alpha_sigma_t(sigma_s2) # 将 sigma_s2 转换为对应的 alpha_s2 和 sigma_s2
lambda_t = torch.log(alpha_t) - torch.log(sigma_t) # 计算 lambda_t,表示 alpha_t 和 sigma_t 的对数差
lambda_s0 = torch.log(alpha_s0) - torch.log(sigma_s0) # 计算 lambda_s0,表示 alpha_s0 和 sigma_s0 的对数差
lambda_s1 = torch.log(alpha_s1) - torch.log(sigma_s1) # 计算 lambda_s1,表示 alpha_s1 和 sigma_s1 的对数差
lambda_s2 = torch.log(alpha_s2) - torch.log(sigma_s2) # 计算 lambda_s2,表示 alpha_s2 和 sigma_s2 的对数差
m0, m1, m2 = model_output_list[-1], model_output_list[-2], model_output_list[-3] # 从输出列表中提取最近三次模型输出
h, h_0, h_1 = lambda_t - lambda_s0, lambda_s0 - lambda_s1, lambda_s1 - lambda_s2 # 计算 h 值,表示各时间步的差异
r0, r1 = h_0 / h, h_1 / h # 计算 r0 和 r1,表示比例关系
D0 = m0 # 赋值 D0 为最近的模型输出 m0
D1_0, D1_1 = (1.0 / r0) * (m0 - m1), (1.0 / r1) * (m1 - m2) # 计算 D1_0 和 D1_1,表示调整后的输出差异
D1 = D1_0 + (r0 / (r0 + r1)) * (D1_0 - D1_1) # 计算 D1,结合 D1_0 和 D1_1 的信息
D2 = (1.0 / (r0 + r1)) * (D1_0 - D1_1) # 计算 D2,表示更高阶的差异
if self.config.algorithm_type == "dpmsolver++": # 检查算法类型是否为 "dpmsolver++"
# See https://arxiv.org/abs/2206.00927 for detailed derivations # 参考文献,提供详细推导信息
x_t = ( # 计算当前时间步的样本张量 x_t
(sigma_t / sigma_s0) * sample # 计算样本的加权项
- (alpha_t * (torch.exp(-h) - 1.0)) * D0 # 减去 D0 的调整项
+ (alpha_t * ((torch.exp(-h) - 1.0) / h + 1.0)) * D1 # 添加 D1 的调整项
- (alpha_t * ((torch.exp(-h) - 1.0 + h) / h**2 - 0.5)) * D2 # 减去 D2 的调整项
)
return x_t # 返回计算得到的样本张量 x_t
# Copied from diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.index_for_timestep # 注明此行代码来源于特定的调度器
# 根据时间步(timestep)获取对应的索引
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果没有指定调度时间步,则使用实例的时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 查找与给定时间步匹配的所有候选索引
index_candidates = (schedule_timesteps == timestep).nonzero()
# 如果没有找到候选索引,则设置为最后一个时间步的索引
if len(index_candidates) == 0:
step_index = len(self.timesteps) - 1
# 如果找到多个候选索引,则取第二个索引(确保不跳过)
# 这样可以确保在去噪计划中间开始时不会跳过任何 sigma
elif len(index_candidates) > 1:
step_index = index_candidates[1].item()
# 否则,取第一个候选索引
else:
step_index = index_candidates[0].item()
# 返回找到的时间步索引
return step_index
# 从调度器的 DPMSolverMultistepScheduler 初始化步骤索引的函数
def _init_step_index(self, timestep):
"""
初始化调度器的步骤索引计数器。
"""
# 如果开始索引为空
if self.begin_index is None:
# 如果时间步是一个张量,则将其转换到与时间步相同的设备上
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
# 通过调用 index_for_timestep 函数初始化步骤索引
self._step_index = self.index_for_timestep(timestep)
else:
# 否则,将步骤索引设置为开始索引
self._step_index = self._begin_index
# 执行单步操作的函数
def step(
self,
model_output: torch.Tensor,
timestep: Union[int, torch.Tensor],
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
# 从 EulerDiscreteScheduler 的 add_noise 函数复制的代码
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor,
# 返回类型为 torch.Tensor
) -> torch.Tensor:
# 确保 sigmas 和 timesteps 的设备和数据类型与 original_samples 相同
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
# 如果设备为 mps 且 timesteps 是浮点数
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps):
# mps 不支持 float64 类型
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32)
# 将 timesteps 转换为相同设备和 float32 类型
timesteps = timesteps.to(original_samples.device, dtype=torch.float32)
else:
# 在其他情况下,将 schedule_timesteps 转换为 original_samples 的设备
schedule_timesteps = self.timesteps.to(original_samples.device)
# 将 timesteps 转换为 original_samples 的设备
timesteps = timesteps.to(original_samples.device)
# 当 scheduler 用于训练或 pipeline 未实现 set_begin_index 时,self.begin_index 为 None
if self.begin_index is None:
# 获取每个 timesteps 对应的步骤索引
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# add_noise 在第一次去噪步骤之后被调用(用于修复)
step_indices = [self.step_index] * timesteps.shape[0]
else:
# 在第一次去噪步骤之前调用 add noise,以创建初始潜在图像(img2img)
step_indices = [self.begin_index] * timesteps.shape[0]
# 根据步骤索引获取 sigma,并将其展平
sigma = sigmas[step_indices].flatten()
# 扩展 sigma 的形状,以匹配 original_samples 的维度
while len(sigma.shape) < len(original_samples.shape):
sigma = sigma.unsqueeze(-1)
# 将噪声添加到原始样本中,生成噪声样本
noisy_samples = original_samples + noise * sigma
# 返回噪声样本
return noisy_samples
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权信息,声明文件所有权和使用许可
# Copyright 2024 Katherine Crowson and The HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0 (“许可证”) 授权使用
# 你只能在遵守许可证的情况下使用此文件
# 你可以在以下网址获取许可证副本
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件按“现状”分发
# 不提供任何形式的担保或条件
# 有关许可的特定条款和限制,请参阅许可证
import math # 导入数学库以进行数学运算
from dataclasses import dataclass # 从dataclasses导入dataclass以简化类定义
from typing import Optional, Tuple, Union # 从typing导入类型注解支持
import torch # 导入PyTorch库以进行张量计算
from ..configuration_utils import ConfigMixin, register_to_config # 从配置工具导入配置混合类和注册功能
from ..utils import BaseOutput, logging # 从工具库导入基础输出类和日志功能
from ..utils.torch_utils import randn_tensor # 从torch工具导入随机张量生成函数
from .scheduling_utils import SchedulerMixin # 从调度工具导入调度混合类
logger = logging.get_logger(__name__) # 创建当前模块的日志记录器,方便调试和记录信息
@dataclass
# 定义调度器的输出类,继承自BaseOutput,提供先前样本和预测样本
class EDMEulerSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor` 的形状为 `(batch_size, num_channels, height, width)`,用于图像):
计算的先前时间步样本 `(x_{t-1})`。`prev_sample` 应作为下一个模型输入使用
在去噪循环中。
pred_original_sample (`torch.Tensor` 的形状为 `(batch_size, num_channels, height, width)`,用于图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或指导。
"""
prev_sample: torch.Tensor # 定义先前样本的属性,类型为张量
pred_original_sample: Optional[torch.Tensor] = None # 定义可选的预测原始样本属性,类型为张量
class EDMEulerScheduler(SchedulerMixin, ConfigMixin):
"""
实现Karras等人2022年提出的EDM公式中的Euler调度器 [1]。
[1] Karras, Tero, 等。“阐明基于扩散的生成模型的设计空间。”
https://arxiv.org/abs/2206.00364
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。有关所有调度程序实现的通用
方法的文档,请查看父类文档,例如加载和保存。
# 参数说明文档
Args:
sigma_min (`float`, *optional*, defaults to 0.002):
# sigma 调度中的最小噪声幅度,EDM 论文中设置为 0.002;合理范围是 [0, 10]。
Minimum noise magnitude in the sigma schedule. This was set to 0.002 in the EDM paper [1]; a reasonable
range is [0, 10].
sigma_max (`float`, *optional*, defaults to 80.0):
# sigma 调度中的最大噪声幅度,EDM 论文中设置为 80.0;合理范围是 [0.2, 80.0]。
Maximum noise magnitude in the sigma schedule. This was set to 80.0 in the EDM paper [1]; a reasonable
range is [0.2, 80.0].
sigma_data (`float`, *optional*, defaults to 0.5):
# 数据分布的标准差,EDM 论文中设置为 0.5。
The standard deviation of the data distribution. This is set to 0.5 in the EDM paper [1].
sigma_schedule (`str`, *optional*, defaults to `karras`):
# 用于计算 `sigmas` 的 sigma 调度,默认使用 EDM 论文中介绍的调度。
Sigma schedule to compute the `sigmas`. By default, we the schedule introduced in the EDM paper
(https://arxiv.org/abs/2206.00364). Other acceptable value is "exponential". The exponential schedule was
incorporated in this model: https://huggingface.co/stabilityai/cosxl.
num_train_timesteps (`int`, defaults to 1000):
# 训练模型的扩散步骤数量。
The number of diffusion steps to train the model.
prediction_type (`str`, defaults to `epsilon`, *optional*):
# 调度函数的预测类型,可以是 `epsilon`、`sample` 或 `v_prediction`。
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://imagen.research.google/video/paper.pdf) paper).
rho (`float`, *optional*, defaults to 7.0):
# 用于计算 Karras sigma 调度的 rho 参数,EDM 论文中设置为 7.0。
The rho parameter used for calculating the Karras sigma schedule, which is set to 7.0 in the EDM paper [1].
"""
# 初始化一个空列表,用于存储兼容的设置
_compatibles = []
# 设置某个顺序标识,默认为 1
order = 1
@register_to_config
# 初始化函数,用于设置对象的基本属性
def __init__(
self,
sigma_min: float = 0.002, # 设置最小噪声幅度,默认值为 0.002
sigma_max: float = 80.0, # 设置最大噪声幅度,默认值为 80.0
sigma_data: float = 0.5, # 设置数据分布的标准差,默认值为 0.5
sigma_schedule: str = "karras", # 设置 sigma 调度,默认值为 "karras"
num_train_timesteps: int = 1000, # 设置训练时的扩散步骤数,默认值为 1000
prediction_type: str = "epsilon", # 设置预测类型,默认值为 "epsilon"
rho: float = 7.0, # 设置 rho 参数,默认值为 7.0
):
# 验证 sigma 调度类型是否有效,若无效则抛出异常
if sigma_schedule not in ["karras", "exponential"]:
raise ValueError(f"Wrong value for provided for `{sigma_schedule=}`.`")
# 可设置的值,初始化为空
self.num_inference_steps = None
# 创建一个线性 ramp 从 0 到 1,包含 num_train_timesteps 个点
ramp = torch.linspace(0, 1, num_train_timesteps)
# 如果选择的调度是 "karras",计算对应的 sigmas
if sigma_schedule == "karras":
sigmas = self._compute_karras_sigmas(ramp)
# 如果选择的调度是 "exponential",计算对应的 sigmas
elif sigma_schedule == "exponential":
sigmas = self._compute_exponential_sigmas(ramp)
# 预处理噪声,获得时间步信息
self.timesteps = self.precondition_noise(sigmas)
# 将 sigmas 和一个零数组合并,后者位于计算设备上
self.sigmas = torch.cat([sigmas, torch.zeros(1, device=sigmas.device)])
# 标记是否已调用输入缩放
self.is_scale_input_called = False
# 初始化步骤索引和开始索引为空
self._step_index = None
self._begin_index = None
# 将 sigmas 移动到 CPU,避免过多的 CPU/GPU 通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
@property
# 获取初始噪声分布的标准差
def init_noise_sigma(self):
# 计算初始噪声分布的标准差
return (self.config.sigma_max**2 + 1) ** 0.5
@property
# 定义当前时间步的索引计数器,每次调度器步骤后增加1
def step_index(self):
"""
当前时间步的索引计数器。每次调度器步骤后增加1。
"""
# 返回当前步骤索引
return self._step_index
# 定义一个属性,表示开始索引
@property
def begin_index(self):
"""
第一个时间步的索引。应该通过 `set_begin_index` 方法从管道设置。
"""
# 返回开始索引
return self._begin_index
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index 复制而来
# 设置开始索引的方法
def set_begin_index(self, begin_index: int = 0):
"""
设置调度器的开始索引。此函数应在推理之前从管道运行。
Args:
begin_index (`int`):
调度器的开始索引。
"""
# 将开始索引设置为给定值
self._begin_index = begin_index
# 预处理输入样本的方法
def precondition_inputs(self, sample, sigma):
# 计算输入样本的缩放因子
c_in = 1 / ((sigma**2 + self.config.sigma_data**2) ** 0.5)
# 根据缩放因子调整样本
scaled_sample = sample * c_in
# 返回缩放后的样本
return scaled_sample
# 预处理噪声的方法
def precondition_noise(self, sigma):
# 检查 sigma 是否为张量,如果不是则转换为张量
if not isinstance(sigma, torch.Tensor):
sigma = torch.tensor([sigma])
# 计算噪声的缩放因子
c_noise = 0.25 * torch.log(sigma)
# 返回噪声的缩放因子
return c_noise
# 预处理输出的方法
def precondition_outputs(self, sample, model_output, sigma):
# 获取配置中的 sigma_data
sigma_data = self.config.sigma_data
# 计算跳过的系数
c_skip = sigma_data**2 / (sigma**2 + sigma_data**2)
# 根据预测类型计算输出的系数
if self.config.prediction_type == "epsilon":
c_out = sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
elif self.config.prediction_type == "v_prediction":
c_out = -sigma * sigma_data / (sigma**2 + sigma_data**2) ** 0.5
else:
# 如果预测类型不支持,则抛出异常
raise ValueError(f"Prediction type {self.config.prediction_type} is not supported.")
# 计算去噪后的结果
denoised = c_skip * sample + c_out * model_output
# 返回去噪后的结果
return denoised
# 缩放模型输入的方法
def scale_model_input(self, sample: torch.Tensor, timestep: Union[float, torch.Tensor]) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器互换性。通过 `(sigma**2 + 1) ** 0.5` 缩放去噪模型输入,以匹配欧拉算法。
Args:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *optional*):
扩散链中的当前时间步。
Returns:
`torch.Tensor`:
缩放后的输入样本。
"""
# 检查当前步骤索引是否为 None,如果是则初始化步骤索引
if self.step_index is None:
self._init_step_index(timestep)
# 获取当前步骤对应的 sigma 值
sigma = self.sigmas[self.step_index]
# 预处理输入样本
sample = self.precondition_inputs(sample, sigma)
# 标记输入缩放方法已被调用
self.is_scale_input_called = True
# 返回预处理后的样本
return sample
# 设置离散时间步,用于扩散链(推理前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步(在推理前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数。
device (`str` 或 `torch.device`, *可选*):
时间步应该移动到的设备。如果为 `None`,则不移动时间步。
"""
# 将输入的扩散步骤数存储在实例变量中
self.num_inference_steps = num_inference_steps
# 创建一个从0到1的线性空间,包含num_inference_steps个点
ramp = torch.linspace(0, 1, self.num_inference_steps)
# 根据配置选择Karras噪声调度
if self.config.sigma_schedule == "karras":
# 计算Karras噪声调度的sigma值
sigmas = self._compute_karras_sigmas(ramp)
# 根据配置选择指数噪声调度
elif self.config.sigma_schedule == "exponential":
# 计算指数噪声调度的sigma值
sigmas = self._compute_exponential_sigmas(ramp)
# 将sigmas转换为float32类型,并移动到指定设备
sigmas = sigmas.to(dtype=torch.float32, device=device)
# 预处理噪声并存储在timesteps中
self.timesteps = self.precondition_noise(sigmas)
# 将sigma与零向量连接,以便扩散过程中使用
self.sigmas = torch.cat([sigmas, torch.zeros(1, device=sigmas.device)])
# 初始化步索引和开始索引为None
self._step_index = None
self._begin_index = None
# 将sigmas移动到CPU以避免过多的CPU/GPU通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
# 从Karras等人(2022)构建噪声调度
def _compute_karras_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
"""构建Karras等人(2022)的噪声调度。"""
# 如果未提供sigma_min,则使用配置中的值
sigma_min = sigma_min or self.config.sigma_min
# 如果未提供sigma_max,则使用配置中的值
sigma_max = sigma_max or self.config.sigma_max
# 从配置中获取rho值
rho = self.config.rho
# 计算sigma_min和sigma_max的倒数
min_inv_rho = sigma_min ** (1 / rho)
max_inv_rho = sigma_max ** (1 / rho)
# 计算根据ramp的sigma值
sigmas = (max_inv_rho + ramp * (min_inv_rho - max_inv_rho)) ** rho
return sigmas
# 计算指数噪声调度的sigma值
def _compute_exponential_sigmas(self, ramp, sigma_min=None, sigma_max=None) -> torch.Tensor:
"""实现紧随k-diffusion。"""
# 如果未提供sigma_min,则使用配置中的值
sigma_min = sigma_min or self.config.sigma_min
# 如果未提供sigma_max,则使用配置中的值
sigma_max = sigma_max or self.config.sigma_max
# 计算从sigma_min到sigma_max的对数线性空间,并取指数后翻转
sigmas = torch.linspace(math.log(sigma_min), math.log(sigma_max), len(ramp)).exp().flip(0)
return sigmas
# 从diffusers库复制的调度器的时间步索引计算方法
# 根据时间步计算索引
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果没有提供调度时间步,使用实例的时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 找出与给定时间步相等的时间步的索引
indices = (schedule_timesteps == timestep).nonzero()
# 确定要使用的 sigma 索引,默认为第二个索引
# 如果只有一个索引,则使用第一个索引
pos = 1 if len(indices) > 1 else 0
# 返回选定索引的标量值
return indices[pos].item()
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._init_step_index 复制
def _init_step_index(self, timestep):
# 如果开始索引为 None,初始化步索引
if self.begin_index is None:
# 如果时间步是张量,则将其移动到相应设备
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
# 计算时间步对应的步索引
self._step_index = self.index_for_timestep(timestep)
else:
# 否则直接使用开始索引
self._step_index = self._begin_index
# 定义步进函数,包含多个参数
def step(
self,
model_output: torch.Tensor,
timestep: Union[float, torch.Tensor],
sample: torch.Tensor,
s_churn: float = 0.0,
s_tmin: float = 0.0,
s_tmax: float = float("inf"),
s_noise: float = 1.0,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.add_noise 复制
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor,
# 返回一个张量,表示噪声样本
) -> torch.Tensor:
# 确保 sigmas 和 timesteps 与 original_samples 拥有相同的设备和数据类型
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
# 检查设备类型为 mps 且 timesteps 是浮点类型
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps):
# mps 不支持 float64 类型
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32)
timesteps = timesteps.to(original_samples.device, dtype=torch.float32)
else:
# 将 timesteps 转换为 original_samples 设备的数据类型
schedule_timesteps = self.timesteps.to(original_samples.device)
timesteps = timesteps.to(original_samples.device)
# self.begin_index 为 None 表示调度器用于训练,或管道未实现 set_begin_index
if self.begin_index is None:
# 通过调度时间步计算步索引
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# 在第一次去噪步骤后调用 add_noise(用于修复)
step_indices = [self.step_index] * timesteps.shape[0]
else:
# 在第一次去噪步骤前调用 add_noise 创建初始潜在图像(img2img)
step_indices = [self.begin_index] * timesteps.shape[0]
# 根据步索引获取 sigmas,并扁平化
sigma = sigmas[step_indices].flatten()
# 扩展 sigma 的维度以匹配 original_samples 的维度
while len(sigma.shape) < len(original_samples.shape):
sigma = sigma.unsqueeze(-1)
# 生成噪声样本,原始样本加上噪声乘以 sigma
noisy_samples = original_samples + noise * sigma
# 返回噪声样本
return noisy_samples
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权所有 2024 Katherine Crowson 和 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版("许可证")授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件是按 "原样" 提供的,
# 不附带任何形式的担保或条件,无论是明示或暗示的。
# 有关许可证所规定的权限和限制的具体信息,请参阅许可证。
import math # 导入数学库以进行数学运算
from dataclasses import dataclass # 从数据类模块导入 dataclass 装饰器
from typing import List, Optional, Tuple, Union # 导入类型提示相关的类
import numpy as np # 导入 NumPy 库以进行数组和矩阵运算
import torch # 导入 PyTorch 库以进行张量运算
from ..configuration_utils import ConfigMixin, register_to_config # 从配置工具中导入混合类和注册函数
from ..utils import BaseOutput, logging # 从工具中导入基本输出类和日志记录功能
from ..utils.torch_utils import randn_tensor # 从工具中导入生成随机张量的函数
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin # 从调度工具中导入调度器类
logger = logging.get_logger(__name__) # 初始化日志记录器,使用当前模块的名称
@dataclass
# 从 diffusers.schedulers.scheduling_ddpm.DDPMSchedulerOutput 复制而来,DDPM->EulerAncestralDiscrete
class EulerAncestralDiscreteSchedulerOutput(BaseOutput):
"""
调度器的 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
计算出的前一时间步的样本 `(x_{t-1})`。`prev_sample` 应作为下一模型输入使用
在去噪循环中。
pred_original_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
基于当前时间步的模型输出预测的去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或指导。
"""
prev_sample: torch.Tensor # 前一时间步的样本
pred_original_sample: Optional[torch.Tensor] = None # 可选的预测去噪样本
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 复制而来
def betas_for_alpha_bar(
num_diffusion_timesteps, # 扩散时间步的数量
max_beta=0.999, # 使用的最大 beta 值
alpha_transform_type="cosine", # alpha_bar 的噪声调度类型
):
"""
创建一个 beta 调度,离散化给定的 alpha_t_bar 函数,该函数定义了
在 t = [0,1] 之间随时间变化的 (1-beta) 的累积乘积。
包含一个 alpha_bar 函数,该函数接受参数 t,并将其转换为到达
扩散过程的该部分的 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 使用的最大 beta 值;使用小于 1 的值以
防止奇异性。
alpha_transform_type (`str`, *可选*, 默认为 `cosine`): alpha_bar 的噪声调度类型。
从 `cosine` 或 `exp` 中选择。
返回:
betas (`np.ndarray`): 调度器用于推进模型输出的 beta 值
"""
# 检查指定的 alpha 变换类型是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义一个函数 alpha_bar_fn,用于计算 cos 变换
def alpha_bar_fn(t):
# 根据 t 计算 cos 变换值的平方
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查指定的 alpha 变换类型是否为 "exp"
elif alpha_transform_type == "exp":
# 定义一个函数 alpha_bar_fn,用于计算指数变换
def alpha_bar_fn(t):
# 根据 t 计算 e 的负 12 倍乘以 t 的指数值
return math.exp(t * -12.0)
# 如果 alpha 变换类型不受支持,抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表,用于存储 beta 值
betas = []
# 遍历每个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步的归一化值
t1 = i / num_diffusion_timesteps
# 计算下一个时间步的归一化值
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值并添加到列表中,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 返回 beta 值的张量,数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 从 diffusers.schedulers.scheduling_ddim.rescale_zero_terminal_snr 复制而来
def rescale_zero_terminal_snr(betas):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 重新缩放 betas,使其终端信噪比为零
参数:
betas (`torch.Tensor`):
初始化调度器时使用的 betas。
返回:
`torch.Tensor`: 重新缩放后的 betas,终端信噪比为零
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas # 计算 alphas
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算 alphas 的累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 计算累积乘积的平方根
# 存储旧值。
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 复制第一个值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 复制最后一个值
# 将最后一个时间步的值移位为零。
alphas_bar_sqrt -= alphas_bar_sqrt_T # 减去最后一个值
# 缩放,使第一个时间步返回到旧值。
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 按比例调整
# 将 alphas_bar_sqrt 转换为 betas
alphas_bar = alphas_bar_sqrt**2 # 恢复平方
alphas = alphas_bar[1:] / alphas_bar[:-1] # 恢复累积乘积
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个值添加到结果
betas = 1 - alphas # 计算 betas
return betas # 返回重新缩放后的 betas
class EulerAncestralDiscreteScheduler(SchedulerMixin, ConfigMixin):
"""
使用欧拉方法步骤进行祖先采样。
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。有关所有调度器通用方法的文档,请检查超类文档,例如加载和保存。
# 参数说明部分,用于描述类初始化时的参数及其默认值
Args:
# 训练模型的扩散步骤数量,默认值为1000
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
# 推理的起始beta值,默认值为0.0001
beta_start (`float`, defaults to 0.0001):
The starting `beta` value of inference.
# 推理的最终beta值,默认值为0.02
beta_end (`float`, defaults to 0.02):
The final `beta` value.
# beta的调度方式,指定beta范围到一系列beta的映射,默认值为"linear"
beta_schedule (`str`, defaults to `"linear"`):
The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
`linear` or `scaled_linear`.
# 可选参数,直接传入betas数组以绕过beta_start和beta_end
trained_betas (`np.ndarray`, *optional*):
Pass an array of betas directly to the constructor to bypass `beta_start` and `beta_end`.
# 预测类型,默认值为`epsilon`,可选参数
prediction_type (`str`, defaults to `epsilon`, *optional*):
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://imagen.research.google/video/paper.pdf) paper).
# 时间步的缩放方式,默认值为"linspace"
timestep_spacing (`str`, defaults to `"linspace"`):
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
# 在推理步骤中添加的偏移量,默认值为0
steps_offset (`int`, defaults to 0):
An offset added to the inference steps, as required by some model families.
# 是否将betas重新缩放以达到零终端信噪比,默认值为False
rescale_betas_zero_snr (`bool`, defaults to `False`):
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and
dark samples instead of limiting it to samples with medium brightness. Loosely related to
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506).
# 兼容的调度器列表,从KarrasDiffusionSchedulers中提取名称
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
# 设置调度器的顺序,默认为1
order = 1
# 用于注册配置的初始化方法
@register_to_config
def __init__(
# 训练步骤数量,默认值为1000
num_train_timesteps: int = 1000,
# 起始beta值,默认值为0.0001
beta_start: float = 0.0001,
# 结束beta值,默认值为0.02
beta_end: float = 0.02,
# beta调度类型,默认值为"linear"
beta_schedule: str = "linear",
# 可选的训练beta值,默认为None
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
# 预测类型,默认为`epsilon`
prediction_type: str = "epsilon",
# 时间步缩放类型,默认为"linspace"
timestep_spacing: str = "linspace",
# 步骤偏移量,默认为0
steps_offset: int = 0,
# 是否重新缩放beta以获得零终端SNR,默认为False
rescale_betas_zero_snr: bool = False,
):
# 检查是否提供了训练过的贝塔值
if trained_betas is not None:
# 将训练过的贝塔值转换为 PyTorch 张量,数据类型为 float32
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果贝塔调度为线性
elif beta_schedule == "linear":
# 生成从 beta_start 到 beta_end 的线性间隔贝塔值,数量为 num_train_timesteps
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果贝塔调度为缩放线性
elif beta_schedule == "scaled_linear":
# 该调度特别针对潜在扩散模型
# 生成从 beta_start 的平方根到 beta_end 的平方根的线性间隔,然后平方
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果贝塔调度为平方余弦调度版本 2
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果未实现的贝塔调度
else:
# 抛出未实现的错误
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 如果需要将贝塔值重新缩放到零 SNR
if rescale_betas_zero_snr:
# 对贝塔值进行重新缩放
self.betas = rescale_zero_terminal_snr(self.betas)
# 计算 alpha 值,等于 1 减去贝塔值
self.alphas = 1.0 - self.betas
# 计算累积乘积的 alpha 值
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 如果需要将贝塔值重新缩放到零 SNR
if rescale_betas_zero_snr:
# 将最后一个累积乘积 alpha 值设置为接近 0 的小值,以避免无穷大
# FP16 最小正数次正规值在这里表现良好
self.alphas_cumprod[-1] = 2**-24
# 计算 sigma 值,基于 alpha 值的公式
sigmas = np.array(((1 - self.alphas_cumprod) / self.alphas_cumprod) ** 0.5)
# 反转 sigma 数组,并追加一个 0.0,转换为 float32 类型
sigmas = np.concatenate([sigmas[::-1], [0.0]]).astype(np.float32)
# 将 sigma 转换为 PyTorch 张量
self.sigmas = torch.from_numpy(sigmas)
# 可设置的属性
self.num_inference_steps = None
# 生成从 0 到 num_train_timesteps - 1 的时间步长数组,并反转
timesteps = np.linspace(0, num_train_timesteps - 1, num_train_timesteps, dtype=float)[::-1].copy()
# 将时间步长转换为 PyTorch 张量
self.timesteps = torch.from_numpy(timesteps)
# 初始化标记,指示是否调用了输入缩放
self.is_scale_input_called = False
# 初始化步骤索引
self._step_index = None
# 初始化起始索引
self._begin_index = None
# 将 sigma 移动到 CPU,以避免过多的 CPU/GPU 通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
@property
def init_noise_sigma(self):
# 返回初始噪声分布的标准差
if self.config.timestep_spacing in ["linspace", "trailing"]:
# 返回 sigma 值的最大值
return self.sigmas.max()
# 返回最大 sigma 的平方加 1 的平方根
return (self.sigmas.max() ** 2 + 1) ** 0.5
@property
def step_index(self):
"""
当前时间步的索引计数器。每次调度步骤后增加 1。
"""
# 返回步骤索引
return self._step_index
@property
def begin_index(self):
"""
第一个时间步的索引。应该通过 `set_begin_index` 方法从管道中设置。
"""
# 返回起始索引
return self._begin_index
# 从 diffusers.schedulers.scheduling_dpmsolver_multistep.DPMSolverMultistepScheduler.set_begin_index 复制
def set_begin_index(self, begin_index: int = 0):
"""
设置调度器的起始索引。此函数应在推理之前从管道中运行。
参数:
begin_index (`int`):
调度器的起始索引。
"""
# 设置调度器的起始索引
self._begin_index = begin_index
# 定义一个方法来缩放模型输入,接受样本和时间步
def scale_model_input(self, sample: torch.Tensor, timestep: Union[float, torch.Tensor]) -> torch.Tensor:
# 方法说明,确保与调度器兼容,根据当前时间步缩放去噪模型输入
"""
Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
current timestep. Scales the denoising model input by `(sigma**2 + 1) ** 0.5` to match the Euler algorithm.
Args:
sample (`torch.Tensor`):
The input sample.
timestep (`int`, *optional*):
The current timestep in the diffusion chain.
Returns:
`torch.Tensor`:
A scaled input sample.
"""
# 如果当前步索引为 None,初始化步索引
if self.step_index is None:
self._init_step_index(timestep)
# 获取当前步的 sigma 值
sigma = self.sigmas[self.step_index]
# 根据公式缩放输入样本
sample = sample / ((sigma**2 + 1) ** 0.5)
# 标记输入缩放函数已被调用
self.is_scale_input_called = True
# 返回缩放后的样本
return sample
# 设置离散时间步长,用于扩散链(在推理之前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步长(在推理之前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数。
device (`str` 或 `torch.device`, *可选*):
要移动时间步长的设备。如果为 `None`,则不移动时间步长。
"""
# 存储推理步骤的数量
self.num_inference_steps = num_inference_steps
# 根据配置的时间步长间隔计算时间步长
if self.config.timestep_spacing == "linspace":
# 创建从0到num_train_timesteps-1的均匀间隔的时间步长,并反向排序
timesteps = np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps, dtype=np.float32)[
::-1
].copy()
elif self.config.timestep_spacing == "leading":
# 计算步骤比率,创建整数时间步长
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 通过比率创建整数时间步长,避免当num_inference_step为3的幂时出现问题
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.float32)
# 添加步骤偏移量
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算步骤比率,创建整数时间步长
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 通过比率创建整数时间步长,避免当num_inference_step为3的幂时出现问题
timesteps = (np.arange(self.config.num_train_timesteps, 0, -step_ratio)).round().copy().astype(np.float32)
# 减去1以调整时间步长
timesteps -= 1
else:
# 抛出错误,指示不支持的时间步长间隔类型
raise ValueError(
f"{self.config.timestep_spacing} is not supported. Please make sure to choose one of 'linspace', 'leading' or 'trailing'."
)
# 计算sigmas,基于累积alpha值
sigmas = np.array(((1 - self.alphas_cumprod) / self.alphas_cumprod) ** 0.5)
# 使用时间步长插值sigmas
sigmas = np.interp(timesteps, np.arange(0, len(sigmas)), sigmas)
# 将sigmas与0.0连接并转换为float32类型
sigmas = np.concatenate([sigmas, [0.0]]).astype(np.float32)
# 将sigmas转换为torch张量,并移动到指定设备
self.sigmas = torch.from_numpy(sigmas).to(device=device)
# 将时间步长转换为torch张量,并移动到指定设备
self.timesteps = torch.from_numpy(timesteps).to(device=device)
# 初始化步骤索引和开始索引为None
self._step_index = None
self._begin_index = None
# 将sigmas移动到CPU,以避免过多的CPU/GPU通信
self.sigmas = self.sigmas.to("cpu") # to avoid too much CPU/GPU communication
# 从diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.index_for_timestep复制的代码
# 定义一个方法,根据给定的时间步索引找到对应的索引
def index_for_timestep(self, timestep, schedule_timesteps=None):
# 如果没有提供调度时间步,则使用默认时间步
if schedule_timesteps is None:
schedule_timesteps = self.timesteps
# 找到与当前时间步匹配的所有索引
indices = (schedule_timesteps == timestep).nonzero()
# 对于第一步,选择第二个索引(或仅有一个时选择第一个),以避免跳过
pos = 1 if len(indices) > 1 else 0
# 返回找到的索引值
return indices[pos].item()
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler._init_step_index 复制的代码
def _init_step_index(self, timestep):
# 如果开始索引为 None,初始化步骤索引
if self.begin_index is None:
# 如果时间步是张量,则将其转移到相同的设备上
if isinstance(timestep, torch.Tensor):
timestep = timestep.to(self.timesteps.device)
# 根据时间步查找步骤索引
self._step_index = self.index_for_timestep(timestep)
else:
# 如果已定义开始索引,则使用它
self._step_index = self._begin_index
# 定义一个步骤方法,接受模型输出、时间步、样本等参数
def step(
self,
model_output: torch.Tensor,
timestep: Union[float, torch.Tensor],
sample: torch.Tensor,
generator: Optional[torch.Generator] = None,
return_dict: bool = True,
# 从 diffusers.schedulers.scheduling_euler_discrete.EulerDiscreteScheduler.add_noise 复制的代码
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor,
) -> torch.Tensor:
# 确保 sigmas 和 timesteps 与 original_samples 的设备和数据类型相同
sigmas = self.sigmas.to(device=original_samples.device, dtype=original_samples.dtype)
# 检查设备是否为 "mps" 且 timesteps 是否为浮点数
if original_samples.device.type == "mps" and torch.is_floating_point(timesteps):
# mps 不支持 float64,因此将时间步长转换为 float32
schedule_timesteps = self.timesteps.to(original_samples.device, dtype=torch.float32)
timesteps = timesteps.to(original_samples.device, dtype=torch.float32)
else:
# 否则,直接将时间步长转换为与 original_samples 相同的设备
schedule_timesteps = self.timesteps.to(original_samples.device)
timesteps = timesteps.to(original_samples.device)
# 当 scheduler 用于训练时,self.begin_index 为 None,或 pipeline 未实现 set_begin_index
if self.begin_index is None:
# 根据时间步长计算对应的步骤索引
step_indices = [self.index_for_timestep(t, schedule_timesteps) for t in timesteps]
elif self.step_index is not None:
# 在第一个去噪步骤后调用 add_noise(用于修复)
step_indices = [self.step_index] * timesteps.shape[0]
else:
# 在第一个去噪步骤之前调用 add_noise 以创建初始潜在图像 (img2img)
step_indices = [self.begin_index] * timesteps.shape[0]
# 根据步骤索引提取 sigmas,并展平为一维
sigma = sigmas[step_indices].flatten()
# 确保 sigma 的形状与 original_samples 的形状相匹配
while len(sigma.shape) < len(original_samples.shape):
sigma = sigma.unsqueeze(-1)
# 生成带噪声的样本,通过 original_samples 和噪声加权 sigma 进行叠加
noisy_samples = original_samples + noise * sigma
# 返回带噪声的样本
return noisy_samples
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps