diffusers 源码解析(五十七)
# 版权所有 2024 斯坦福大学团队与 HuggingFace 团队,保留所有权利。
#
# 根据 Apache 许可证,第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件是按“现状”基础提供,
# 不附带任何明示或暗示的担保或条件。
# 有关许可证所治理权限和限制的具体语言,请参见许可证。
# 免责声明:此代码受到以下项目的强烈影响:https://github.com/pesser/pytorch_diffusion
# 和 https://github.com/hojonathanho/diffusion
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入可选类型、元组和联合类型
from typing import Optional, Tuple, Union
# 导入 flax 库
import flax
# 导入 JAX 的 numpy 模块并命名为 jnp
import jax.numpy as jnp
# 从配置工具和调度工具导入所需组件
from ..configuration_utils import ConfigMixin, register_to_config
from .scheduling_utils_flax import (
CommonSchedulerState, # 导入通用调度器状态
FlaxKarrasDiffusionSchedulers, # 导入 Karras Diffusion 调度器
FlaxSchedulerMixin, # 导入调度器混合功能
FlaxSchedulerOutput, # 导入调度器输出类型
add_noise_common, # 导入添加噪声的公共函数
get_velocity_common, # 导入获取速度的公共函数
)
# 定义 DDIM 调度器状态的结构数据类
@flax.struct.dataclass
class DDIMSchedulerState:
common: CommonSchedulerState # 包含通用调度器状态
final_alpha_cumprod: jnp.ndarray # 最终的 alpha 累积产品
# 可设置的值
init_noise_sigma: jnp.ndarray # 初始噪声标准差
timesteps: jnp.ndarray # 时间步数组
num_inference_steps: Optional[int] = None # 可选的推理步骤数
# 类方法,用于创建 DDIMSchedulerState 实例
@classmethod
def create(
cls,
common: CommonSchedulerState, # 传入的通用调度器状态
final_alpha_cumprod: jnp.ndarray, # 传入的最终 alpha 累积产品
init_noise_sigma: jnp.ndarray, # 传入的初始噪声标准差
timesteps: jnp.ndarray, # 传入的时间步数组
):
# 返回新的 DDIMSchedulerState 实例
return cls(
common=common, # 设置通用调度器状态
final_alpha_cumprod=final_alpha_cumprod, # 设置最终 alpha 累积产品
init_noise_sigma=init_noise_sigma, # 设置初始噪声标准差
timesteps=timesteps, # 设置时间步数组
)
# 定义 FlaxDDIMSchedulerOutput 数据类,继承自 FlaxSchedulerOutput
@dataclass
class FlaxDDIMSchedulerOutput(FlaxSchedulerOutput):
state: DDIMSchedulerState # 状态为 DDIMSchedulerState 类型
# 定义 FlaxDDIMScheduler 类,继承自 FlaxSchedulerMixin 和 ConfigMixin
class FlaxDDIMScheduler(FlaxSchedulerMixin, ConfigMixin):
"""
去噪扩散隐式模型是一种调度器,它扩展了在去噪扩散概率模型 (DDPMs) 中引入的去噪过程
,并具有非马尔可夫引导。
[`~ConfigMixin`] 负责存储在调度器的 `__init__` 函数中传递的所有配置属性,
例如 `num_train_timesteps`。它们可以通过 `scheduler.config.num_train_timesteps` 访问。
[`SchedulerMixin`] 通过 [`SchedulerMixin.save_pretrained`] 和
[`~SchedulerMixin.from_pretrained`] 函数提供一般的加载和保存功能。
有关更多详细信息,请参见原始论文:https://arxiv.org/abs/2010.02502
# 函数参数说明
Args:
# 训练模型使用的扩散步骤数量
num_train_timesteps (`int`): number of diffusion steps used to train the model.
# 推理过程开始的 beta 值
beta_start (`float`): the starting `beta` value of inference.
# 推理过程最终的 beta 值
beta_end (`float`): the final `beta` value.
# beta 调度方式
beta_schedule (`str`):
# beta 范围到 beta 序列的映射,选择步进模型的方式,可选值包括
`linear`, `scaled_linear`, or `squaredcos_cap_v2`.
# 直接传递 beta 数组的选项,以绕过 beta_start、beta_end 等参数
trained_betas (`jnp.ndarray`, optional):
option to pass an array of betas directly to the constructor to bypass `beta_start`, `beta_end` etc.
# 预测样本的裁剪选项,用于数值稳定性
clip_sample (`bool`, default `True`):
option to clip predicted sample between for numerical stability. The clip range is determined by
`clip_sample_range`.
# 裁剪的最大幅度,仅在 clip_sample=True 时有效
clip_sample_range (`float`, default `1.0`):
the maximum magnitude for sample clipping. Valid only when `clip_sample=True`.
# 将每个扩散步骤的前一个 alpha 乘积固定为 1 的选项
set_alpha_to_one (`bool`, default `True`):
each diffusion step uses the value of alphas product at that step and at the previous one. For the final
step there is no previous alpha. When this option is `True` the previous alpha product is fixed to `1`,
otherwise it uses the value of alpha at step 0.
# 推理步骤的偏移量
steps_offset (`int`, default `0`):
An offset added to the inference steps, as required by some model families.
# 指示模型预测噪声或样本的类型
prediction_type (`str`, default `epsilon`):
indicates whether the model predicts the noise (epsilon), or the samples. One of `epsilon`, `sample`.
`v-prediction` is not supported for this scheduler.
# 用于参数和计算的数据类型
dtype (`jnp.dtype`, *optional*, defaults to `jnp.float32`):
the `dtype` used for params and computation.
"""
# 创建一个兼容的调度器列表,包含 FlaxKarrasDiffusionSchedulers 中的调度器名称
_compatibles = [e.name for e in FlaxKarrasDiffusionSchedulers]
# 定义数据类型
dtype: jnp.dtype
# 定义一个属性,表示该类是否有状态
@property
def has_state(self):
# 返回 True,表示该类有状态
return True
# 注册到配置中的构造函数
@register_to_config
def __init__(
# 训练步骤数量,默认为 1000
num_train_timesteps: int = 1000,
# 初始 beta 值,默认为 0.0001
beta_start: float = 0.0001,
# 最终 beta 值,默认为 0.02
beta_end: float = 0.02,
# beta 调度方式,默认为 "linear"
beta_schedule: str = "linear",
# 训练 beta 的数组,默认为 None
trained_betas: Optional[jnp.ndarray] = None,
# 是否裁剪样本,默认为 True
clip_sample: bool = True,
# 样本裁剪范围,默认为 1.0
clip_sample_range: float = 1.0,
# 是否将 alpha 的前一个值设置为 1,默认为 True
set_alpha_to_one: bool = True,
# 步骤偏移量,默认为 0
steps_offset: int = 0,
# 预测类型,默认为 "epsilon"
prediction_type: str = "epsilon",
# 数据类型,默认为 jnp.float32
dtype: jnp.dtype = jnp.float32,
):
# 将输入的数据类型赋值给实例变量
self.dtype = dtype
# 创建状态的方法,接收一个可选的公共调度器状态参数,返回 DDIM 调度器状态
def create_state(self, common: Optional[CommonSchedulerState] = None) -> DDIMSchedulerState:
# 如果未提供公共调度器状态,则创建一个新的公共调度器状态
if common is None:
common = CommonSchedulerState.create(self)
# 在每个 DDIM 步骤中,我们查看前一个 alphas_cumprod
# 对于最后一步,没有前一个 alphas_cumprod,因为我们已经处于 0
# `set_alpha_to_one` 决定我们是将该参数设置为 1,还是使用“非前一个”最终 alpha
final_alpha_cumprod = (
# 如果设置为 1,则使用 1.0 的数组,否则使用公共状态中的第一个 alphas_cumprod
jnp.array(1.0, dtype=self.dtype) if self.config.set_alpha_to_one else common.alphas_cumprod[0]
)
# 初始化噪声分布的标准差
init_noise_sigma = jnp.array(1.0, dtype=self.dtype)
# 生成从 0 到训练时间步数的数组,并反转顺序
timesteps = jnp.arange(0, self.config.num_train_timesteps).round()[::-1]
# 创建并返回 DDIM 调度器状态
return DDIMSchedulerState.create(
common=common,
final_alpha_cumprod=final_alpha_cumprod,
init_noise_sigma=init_noise_sigma,
timesteps=timesteps,
)
# 规模模型输入的方法,接收状态、样本和可选的时间步
def scale_model_input(
self, state: DDIMSchedulerState, sample: jnp.ndarray, timestep: Optional[int] = None
) -> jnp.ndarray:
"""
参数:
state (`PNDMSchedulerState`): `FlaxPNDMScheduler` 状态数据类实例。
sample (`jnp.ndarray`): 输入样本
timestep (`int`, optional): 当前时间步
返回:
`jnp.ndarray`: 缩放后的输入样本
"""
# 直接返回输入样本,不做任何处理
return sample
# 设置时间步的方法,接收状态、推理步骤数量和形状
def set_timesteps(
self, state: DDIMSchedulerState, num_inference_steps: int, shape: Tuple = ()
) -> DDIMSchedulerState:
"""
设置用于扩散链的离散时间步。支持在推理之前运行的功能。
参数:
state (`DDIMSchedulerState`):
`FlaxDDIMScheduler` 状态数据类实例。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。
"""
# 计算步骤比率,通过训练时间步数除以推理步骤数量
step_ratio = self.config.num_train_timesteps // num_inference_steps
# 通过比率生成整数时间步,乘以比率
# 四舍五入以避免当 num_inference_step 为 3 的幂时出现问题
timesteps = (jnp.arange(0, num_inference_steps) * step_ratio).round()[::-1] + self.config.steps_offset
# 用新的时间步数和推理步骤数量替换状态
return state.replace(
num_inference_steps=num_inference_steps,
timesteps=timesteps,
)
# 计算在给定时间步的方差
def _get_variance(self, state: DDIMSchedulerState, timestep, prev_timestep):
# 获取当前时间步的累计alpha值
alpha_prod_t = state.common.alphas_cumprod[timestep]
# 根据前一个时间步获取其累计alpha值,如果为负则使用最终的累计alpha值
alpha_prod_t_prev = jnp.where(
prev_timestep >= 0, state.common.alphas_cumprod[prev_timestep], state.final_alpha_cumprod
)
# 计算当前时间步的beta值
beta_prod_t = 1 - alpha_prod_t
# 计算前一个时间步的beta值
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 根据当前和前一个时间步的beta和alpha值计算方差
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算得到的方差
return variance
# 执行单步更新
def step(
self,
state: DDIMSchedulerState,
model_output: jnp.ndarray,
timestep: int,
sample: jnp.ndarray,
eta: float = 0.0,
return_dict: bool = True,
# 添加噪声到原始样本
def add_noise(
self,
state: DDIMSchedulerState,
original_samples: jnp.ndarray,
noise: jnp.ndarray,
timesteps: jnp.ndarray,
) -> jnp.ndarray:
# 调用通用函数添加噪声并返回结果
return add_noise_common(state.common, original_samples, noise, timesteps)
# 计算样本的速度
def get_velocity(
self,
state: DDIMSchedulerState,
sample: jnp.ndarray,
noise: jnp.ndarray,
timesteps: jnp.ndarray,
) -> jnp.ndarray:
# 调用通用函数获取速度并返回结果
return get_velocity_common(state.common, sample, noise, timesteps)
# 返回训练时间步的数量
def __len__(self):
return self.config.num_train_timesteps
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 许可声明,指明版权及使用条件
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 表示软件是按现状提供,不提供任何形式的保证或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 参阅许可协议以了解具体权限和限制
# DISCLAIMER: This code is strongly influenced by https://github.com/pesser/pytorch_diffusion
# and https://github.com/hojonathanho/diffusion
# 声明代码受相关项目的影响,提供了代码来源信息
import math
# 导入数学库,用于数学计算
from dataclasses import dataclass
# 从dataclasses模块导入dataclass装饰器,用于简化类的定义
from typing import List, Optional, Tuple, Union
# 从typing模块导入类型注解,用于类型提示
import numpy as np
# 导入NumPy库,用于数值计算和数组操作
import torch
# 导入PyTorch库,用于深度学习
from diffusers.configuration_utils import ConfigMixin, register_to_config
# 从diffusers.configuration_utils导入ConfigMixin和register_to_config,用于配置管理
from diffusers.schedulers.scheduling_utils import SchedulerMixin
# 从diffusers.schedulers.scheduling_utils导入SchedulerMixin,用于调度器的混合功能
from diffusers.utils import BaseOutput, deprecate
# 从diffusers.utils导入BaseOutput和deprecate,用于输出格式和弃用警告
@dataclass
# 使用dataclass装饰器定义数据类
# Copied from diffusers.schedulers.scheduling_ddpm.DDPMSchedulerOutput with DDPM->DDIM
class DDIMSchedulerOutput(BaseOutput):
"""
Output class for the scheduler's `step` function output.
# 定义调度器的`step`函数输出类
Args:
prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
denoising loop.
# 前一时间步计算的样本,作为下一步去噪循环的输入
pred_original_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
The predicted denoised sample `(x_{0})` based on the model output from the current timestep.
`pred_original_sample` can be used to preview progress or for guidance.
# 当前时间步模型输出的预测去噪样本,可以用于进度预览或指导
"""
prev_sample: torch.Tensor
# 定义前一时间步样本,类型为torch.Tensor
pred_original_sample: Optional[torch.Tensor] = None
# 定义预测去噪样本,类型为可选的torch.Tensor,默认为None
# Copied from diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
(1-beta) over time from t = [0,1].
# 创建一个beta调度,离散化给定的alpha_t_bar函数,定义(1-beta)随时间的累积乘积
Contains a function alpha_bar that takes an argument t and transforms it to the cumulative product of (1-beta) up
to that part of the diffusion process.
# 包含一个函数alpha_bar,接受参数t并将其转换为扩散过程的(1-beta)累积乘积
Args:
num_diffusion_timesteps (`int`): the number of betas to produce.
# 生成的beta数量
max_beta (`float`): the maximum beta to use; use values lower than 1 to
prevent singularities.
# 使用的最大beta值,避免使用1以上的值以防止奇点
alpha_transform_type (`str`, *optional*, default to `cosine`): the type of noise schedule for alpha_bar.
Choose from `cosine` or `exp`
# 噪声调度类型,选择'cosine'或'exp'
Returns:
betas (`np.ndarray`): the betas used by the scheduler to step the model outputs
# 返回调度器用于模型输出的betas
"""
# 检查 alpha_transform_type 是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义一个函数,计算基于余弦函数的 alpha 值
def alpha_bar_fn(t):
# 计算并返回余弦平方值,调整参数以确保值在特定范围
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义一个函数,计算基于指数函数的 alpha 值
def alpha_bar_fn(t):
# 计算并返回负指数值,参数设为 -12.0
return math.exp(t * -12.0)
# 如果 alpha_transform_type 不符合预期,抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表以存储 beta 值
betas = []
# 遍历每个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步的比例 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步的比例 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值并添加到列表,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 beta 列表转换为浮点型的张量并返回
return torch.tensor(betas, dtype=torch.float32)
# 从 diffusers.schedulers.scheduling_ddim.rescale_zero_terminal_snr 复制而来
def rescale_zero_terminal_snr(betas):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 将 betas 重新缩放为具有零终端 SNR
参数:
betas (`torch.Tensor`):
用于初始化调度器的 betas。
返回:
`torch.Tensor`: 具有零终端 SNR 的重新缩放的 betas
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas # 计算 alphas,表示每个 beta 对应的 alpha
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算 alphas 的累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 对累积乘积取平方根
# 存储旧值
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 记录初始值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 记录终止值
# 移动,使最后一个时间步为零
alphas_bar_sqrt -= alphas_bar_sqrt_T # 从 alphas_bar_sqrt 中减去终止值
# 缩放,使第一个时间步恢复到旧值
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 根据初始值和终止值进行缩放
# 将 alphas_bar_sqrt 转换回 betas
alphas_bar = alphas_bar_sqrt**2 # 将平方根还原为平方
alphas = alphas_bar[1:] / alphas_bar[:-1] # 根据累积乘积的逆操作恢复 alphas
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个 alpha 加入到 alphas 中
betas = 1 - alphas # 计算 betas,取 1 减去 alphas
return betas # 返回重新缩放的 betas
class DDIMInverseScheduler(SchedulerMixin, ConfigMixin):
"""
`DDIMInverseScheduler` 是 [`DDIMScheduler`] 的反向调度器。
该模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。请查看父类文档,以了解库为所有调度器实现的通用方法,例如加载和保存。
# 定义参数文档字符串,描述每个参数的作用和默认值
Args:
# 训练模型的扩散步骤数,默认值为1000
num_train_timesteps (`int`, defaults to 1000):
The number of diffusion steps to train the model.
# 推理的起始 beta 值,默认值为 0.0001
beta_start (`float`, defaults to 0.0001):
The starting `beta` value of inference.
# 最终的 beta 值,默认值为 0.02
beta_end (`float`, defaults to 0.02):
The final `beta` value.
# beta 调度类型,默认为 "linear",用于映射 beta 范围到模型的步进序列
beta_schedule (`str`, defaults to `"linear"`):
The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
`linear`, `scaled_linear`, or `squaredcos_cap_v2`.
# 直接传入 beta 数组以绕过 beta_start 和 beta_end 的设置,属于可选参数
trained_betas (`np.ndarray`, *optional*):
Pass an array of betas directly to the constructor to bypass `beta_start` and `beta_end`.
# 是否对预测样本进行剪裁以提高数值稳定性,默认值为 True
clip_sample (`bool`, defaults to `True`):
Clip the predicted sample for numerical stability.
# 样本剪裁的最大幅度,仅在 clip_sample=True 时有效,默认值为 1.0
clip_sample_range (`float`, defaults to 1.0):
The maximum magnitude for sample clipping. Valid only when `clip_sample=True`.
# 每个扩散步骤使用当前和上一个步骤的 alphas 乘积值,最后一步的前一个 alpha 固定为 0 的选项,默认值为 True
set_alpha_to_one (`bool`, defaults to `True`):
Each diffusion step uses the alphas product value at that step and at the previous one. For the final step
there is no previous alpha. When this option is `True` the previous alpha product is fixed to 0, otherwise
it uses the alpha value at step `num_train_timesteps - 1`.
# 推理步骤的偏移量,某些模型族可能需要,默认值为 0
steps_offset (`int`, defaults to 0):
An offset added to the inference steps, as required by some model families.
# 调度函数的预测类型,可选项包括 `epsilon`(预测扩散过程的噪声)、`sample`(直接预测噪声样本)或 `v_prediction`
prediction_type (`str`, defaults to `epsilon`, *optional*):
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
Video](https://imagen.research.google/video/paper.pdf) paper).
# 时间步的缩放方式,默认值为 "leading"
timestep_spacing (`str`, defaults to `"leading"`):
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
# 是否将 beta 重新缩放为零终端 SNR,允许模型生成非常明亮和非常暗的样本,默认值为 False
rescale_betas_zero_snr (`bool`, defaults to `False`):
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and
dark samples instead of limiting it to samples with medium brightness. Loosely related to
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506).
# 参数文档字符串结束
# 设置参数的顺序
order = 1
# 指定在配置中忽略的参数
ignore_for_config = ["kwargs"]
# 标记已弃用的参数
_deprecated_kwargs = ["set_alpha_to_zero"]
# 注册到配置
@register_to_config
# 初始化函数,设置了多个参数和默认值,包括训练步数、beta的起始和结束值、beta的调度类型、
# 训练后的betas数组、是否剪裁样本、是否将alpha设置为1、步骤偏移量、预测类型、剪裁样本范围、
# 时间步间距、是否对零信噪比重新缩放等
def __init__(
self,
num_train_timesteps: int = 1000,
beta_start: float = 0.0001,
beta_end: float = 0.02,
beta_schedule: str = "linear",
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
clip_sample: bool = True,
set_alpha_to_one: bool = True,
steps_offset: int = 0,
prediction_type: str = "epsilon",
clip_sample_range: float = 1.0,
timestep_spacing: str = "leading",
rescale_betas_zero_snr: bool = False,
**kwargs,
):
# 如果kwargs中存在"set_alpha_to_zero"参数,则给出警告信息并使用其值替代"set_alpha_to_one"参数
if kwargs.get("set_alpha_to_zero", None) is not None:
deprecation_message = (
"The `set_alpha_to_zero` argument is deprecated. Please use `set_alpha_to_one` instead."
)
deprecate("set_alpha_to_zero", "1.0.0", deprecation_message, standard_warn=False)
set_alpha_to_one = kwargs["set_alpha_to_zero"]
# 如果给定了训练后的betas数组,则使用它来初始化self.betas
if trained_betas is not None:
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果beta_schedule为"linear",则使用线性插值生成self.betas数组
elif beta_schedule == "linear":
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果beta_schedule为"scaled_linear",则使用特定的缩放线性插值生成self.betas数组
elif beta_schedule == "scaled_linear":
# 此调度方式非常特定于潜在扩散模型。
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果beta_schedule为"squaredcos_cap_v2",则使用特定的函数生成self.betas数组
elif beta_schedule == "squaredcos_cap_v2":
# Glide cosine schedule
self.betas = betas_for_alpha_bar(num_train_timesteps)
else:
# 抛出未实现的调度类型异常
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 如果设置了rescale_betas_zero_snr标志为True,则对self.betas数组进行零信噪比重新缩放
if rescale_betas_zero_snr:
self.betas = rescale_zero_terminal_snr(self.betas)
# 根据self.betas计算self.alphas,并计算累积乘积
self.alphas = 1.0 - self.betas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 在反向ddim中的每个步骤中,我们查看下一个alphas_cumprod
# 对于初始步骤,没有当前的alphas_cumprod,索引越界
# `set_alpha_to_one`决定是否将此参数简单设置为1
# 在这种情况下,self.step()仅输出预测的噪声
# 或者是否使用训练扩散模型中使用的初始alpha。
self.initial_alpha_cumprod = torch.tensor(1.0) if set_alpha_to_one else self.alphas_cumprod[0]
# 初始噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值
self.num_inference_steps = None
# 使用np.arange创建时间步长的张量
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps).copy().astype(np.int64))
# 定义一个方法,缩放模型输入以确保与调度器的互换性
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保根据当前时间步缩放去噪模型输入,以便与需要的调度器互换。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 返回输入样本,当前未对其进行缩放
return sample
# 定义一个方法,设置用于扩散链的离散时间步(在推理前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步(在推理前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。
"""
# 检查推理步骤数量是否超过训练时的最大时间步数
if num_inference_steps > self.config.num_train_timesteps:
raise ValueError(
f"`num_inference_steps`: {num_inference_steps} 不能大于 `self.config.train_timesteps`:"
f" {self.config.num_train_timesteps} 因为使用该调度器训练的 unet 模型最多只能处理"
f" {self.config.num_train_timesteps} 个时间步。"
)
# 将推理步骤数量赋值给实例变量
self.num_inference_steps = num_inference_steps
# 根据配置的时间步间隔设置时间步
if self.config.timestep_spacing == "leading":
# 计算每步的比例
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 通过乘以比例创建整数时间步
# 转换为整数以避免当 num_inference_steps 是 3 的幂时出现问题
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round().copy().astype(np.int64)
# 加上步骤偏移量
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算每步的比例
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 通过乘以比例创建整数时间步
# 转换为整数以避免当 num_inference_steps 是 3 的幂时出现问题
timesteps = np.round(np.arange(self.config.num_train_timesteps, 0, -step_ratio)[::-1]).astype(np.int64)
# 减去 1
timesteps -= 1
else:
# 如果时间步间隔不支持,抛出错误
raise ValueError(
f"{self.config.timestep_spacing} 不被支持。请确保选择 'leading' 或 'trailing' 之一。"
)
# 将时间步转换为张量并移动到指定设备
self.timesteps = torch.from_numpy(timesteps).to(device)
# 定义一个方法,执行一步操作
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
return_dict: bool = True,
):
# 定义一个方法,返回训练时的时间步数量
def __len__(self):
return self.config.num_train_timesteps
# 版权声明,指定代码作者及版权信息
# Copyright 2024 ParaDiGMS authors and The HuggingFace Team. All rights reserved.
#
# 根据 Apache License 2.0 许可协议,限制文件的使用
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 许可的副本可以在下面的地址获取
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,软件以 "原样" 基础分发
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 请参见许可证以了解特定权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 免责声明:此代码受以下项目强烈影响
# DISCLAIMER: This code is strongly influenced by https://github.com/pesser/pytorch_diffusion
# and https://github.com/hojonathanho/diffusion
# 导入数学库
import math
# 从数据类模块导入 dataclass 装饰器
from dataclasses import dataclass
# 导入类型提示相关类型
from typing import List, Optional, Tuple, Union
# 导入 NumPy 库
import numpy as np
# 导入 PyTorch 库
import torch
# 从配置工具导入 ConfigMixin 和注册配置功能
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具模块导入 BaseOutput 类
from ..utils import BaseOutput
# 从 PyTorch 工具模块导入生成随机张量的函数
from ..utils.torch_utils import randn_tensor
# 从调度工具导入调度器类
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
# 该类用于表示调度器的步骤函数输出
class DDIMParallelSchedulerOutput(BaseOutput):
"""
输出类,用于调度器的 `step` 函数输出。
参数:
prev_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
上一时间步的计算样本 `(x_{t-1})`。 `prev_sample` 应在去噪循环中用作下一个模型输入。
pred_original_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或提供指导。
"""
# 上一时间步的样本张量
prev_sample: torch.Tensor
# 可选的预测去噪样本张量,默认为 None
pred_original_sample: Optional[torch.Tensor] = None
# 从 diffusers.schedulers.scheduling_ddpm.betas_for_alpha_bar 函数复制
def betas_for_alpha_bar(
# 生成的扩散时间步数量
num_diffusion_timesteps,
# 最大 beta 值,防止奇异性
max_beta=0.999,
# 噪声调度的类型,默认为 "cosine"
alpha_transform_type="cosine",
):
"""
创建一个 beta 调度器,离散化给定的 alpha_t_bar 函数,
定义了 (1-beta) 随时间的累积乘积,从 t = [0,1] 开始。
包含一个 alpha_bar 函数,该函数接收 t 参数,并将其转换为
该扩散过程部分的 (1-beta) 的累积乘积。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 要使用的最大 beta;使用小于 1 的值以
防止奇异性。
alpha_transform_type (`str`, *可选*, 默认值为 `cosine`): alpha_bar 的噪声调度类型。
从 `cosine` 或 `exp` 中选择。
返回:
betas (`np.ndarray`): 调度器用于更新模型输出的 betas
"""
# 检查 alpha_transform_type 是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义一个函数 alpha_bar_fn,接受参数 t
def alpha_bar_fn(t):
# 计算余弦函数并返回平方值,用于 alpha 值转换
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义一个函数 alpha_bar_fn,接受参数 t
def alpha_bar_fn(t):
# 计算指数衰减函数,用于 alpha 值转换
return math.exp(t * -12.0)
# 如果 alpha_transform_type 不是 "cosine" 或 "exp",抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表 betas,用于存储 beta 值
betas = []
# 遍历从 0 到 num_diffusion_timesteps 的每个步骤
for i in range(num_diffusion_timesteps):
# 计算当前时间 t1,归一化到 [0, 1] 范围
t1 = i / num_diffusion_timesteps
# 计算下一个时间 t2,归一化到 [0, 1] 范围
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值,并将其添加到 betas 列表,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 betas 列表转换为 PyTorch 张量,并指定数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 从 diffusers.schedulers.scheduling_ddim.rescale_zero_terminal_snr 复制而来
def rescale_zero_terminal_snr(betas):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 将 betas 重新缩放为零终端 SNR
参数:
betas (`torch.Tensor`):
初始化调度器所用的 betas。
返回:
`torch.Tensor`: 具有零终端 SNR 的重新缩放的 betas
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas # 计算 alphas,等于 1 减去 betas
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算 alphas 的累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 计算 alphas_cumprod 的平方根
# 存储旧值。
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 克隆第一个 alphas_bar_sqrt 的值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 克隆最后一个 alphas_bar_sqrt 的值
# 使最后一个时间步的值为零。
alphas_bar_sqrt -= alphas_bar_sqrt_T # 从 alphas_bar_sqrt 中减去最后一个值
# 缩放以使第一个时间步返回旧值。
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 根据比例缩放
# 将 alphas_bar_sqrt 转换回 betas
alphas_bar = alphas_bar_sqrt**2 # 还原平方根
alphas = alphas_bar[1:] / alphas_bar[:-1] # 还原累积乘积
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个 alphas_bar 加入 alphas
betas = 1 - alphas # 计算新的 betas,等于 1 减去 alphas
return betas # 返回重新缩放的 betas
class DDIMParallelScheduler(SchedulerMixin, ConfigMixin):
"""
去噪扩散隐式模型是一种调度器,扩展了去噪程序,最初在去噪扩散概率模型 (DDPMs) 中引入了非马尔可夫指导。
[`~ConfigMixin`] 负责存储传递给调度器 `__init__` 函数的所有配置属性,如 `num_train_timesteps`。
它们可以通过 `scheduler.config.num_train_timesteps` 访问。
[`SchedulerMixin`] 通过 [`SchedulerMixin.save_pretrained`] 和
[`~SchedulerMixin.from_pretrained`] 函数提供一般的加载和保存功能。
更多详细信息,请参见原始论文:https://arxiv.org/abs/2010.02502
"""
_compatibles = [e.name for e in KarrasDiffusionSchedulers] # 存储兼容的调度器名称
order = 1 # 设置调度器的顺序
_is_ode_scheduler = True # 指示这是一个常微分方程调度器
@register_to_config
# 从 diffusers.schedulers.scheduling_ddim.DDIMScheduler.__init__ 复制而来
def __init__(
self,
num_train_timesteps: int = 1000, # 训练时间步的数量,默认值为 1000
beta_start: float = 0.0001, # 起始 beta 值,默认值为 0.0001
beta_end: float = 0.02, # 结束 beta 值,默认值为 0.02
beta_schedule: str = "linear", # beta 的调度策略,默认值为线性
trained_betas: Optional[Union[np.ndarray, List[float]]] = None, # 可选的训练 beta 值
clip_sample: bool = True, # 是否裁剪样本,默认值为 True
set_alpha_to_one: bool = True, # 是否将 alpha 设置为 1,默认值为 True
steps_offset: int = 0, # 步骤偏移量,默认值为 0
prediction_type: str = "epsilon", # 预测类型,默认值为 epsilon
thresholding: bool = False, # 是否使用阈值,默认值为 False
dynamic_thresholding_ratio: float = 0.995, # 动态阈值比例,默认值为 0.995
clip_sample_range: float = 1.0, # 裁剪样本范围,默认值为 1.0
sample_max_value: float = 1.0, # 样本最大值,默认值为 1.0
timestep_spacing: str = "leading", # 时间步间隔策略,默认值为 leading
rescale_betas_zero_snr: bool = False, # 是否将 beta 重新缩放为零终端 SNR,默认值为 False
# 定义一个方法,用于设置 beta 值和相关参数
):
# 如果已训练的 beta 值不为 None,则使用这些值
if trained_betas is not None:
# 将已训练的 beta 值转换为浮点型张量
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果 beta 调度为线性
elif beta_schedule == "linear":
# 生成从 beta_start 到 beta_end 的线性空间 beta 值
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果 beta 调度为缩放线性
elif beta_schedule == "scaled_linear":
# 此调度特定于潜在扩散模型
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果 beta 调度为 squaredcos_cap_v2
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果 beta 调度不被实现,则抛出异常
else:
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 对于零 SNR 进行重新缩放
if rescale_betas_zero_snr:
# 调整 beta 值以满足零终端 SNR 的要求
self.betas = rescale_zero_terminal_snr(self.betas)
# 计算 alphas,等于 1 减去 beta 值
self.alphas = 1.0 - self.betas
# 计算累积乘积的 alphas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 在每一步的 ddim 中,查看先前的 alphas_cumprod
# 对于最后一步,没有先前的 alphas_cumprod,因为我们已经在 0 处
# `set_alpha_to_one` 决定是否将此参数设为 1 或使用“非先前” alpha 的最终值
self.final_alpha_cumprod = torch.tensor(1.0) if set_alpha_to_one else self.alphas_cumprod[0]
# 初始化噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值
self.num_inference_steps = None
# 生成反向的时间步长张量
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy().astype(np.int64))
# 从 diffusers.schedulers.scheduling_ddim.DDIMScheduler.scale_model_input 复制的方法
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步长缩放去噪模型输入的调度器互换性。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步长。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 直接返回输入样本,不进行任何缩放
return sample
def _get_variance(self, timestep, prev_timestep=None):
# 如果没有提供前一个时间步长,则使用当前时间步长减去的值
if prev_timestep is None:
prev_timestep = timestep - self.config.num_train_timesteps // self.num_inference_steps
# 获取当前时间步长的 alphas 的累积乘积
alpha_prod_t = self.alphas_cumprod[timestep]
# 获取前一个时间步长的 alphas 的累积乘积,如果无效则使用最终的 alpha
alpha_prod_t_prev = self.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else self.final_alpha_cumprod
# 计算当前和前一个时间步长的 beta 值
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 计算方差
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算得到的方差
return variance
# 定义一个批量获取方差的私有方法,接收时间步 t 和前一个时间步 prev_t
def _batch_get_variance(self, t, prev_t):
# 获取当前时间步 t 的累积 alpha 值
alpha_prod_t = self.alphas_cumprod[t]
# 获取前一个时间步的累积 alpha 值,确保不低于 0
alpha_prod_t_prev = self.alphas_cumprod[torch.clip(prev_t, min=0)]
# 对于 prev_t 小于 0 的情况,将其 alpha 值设置为 1.0
alpha_prod_t_prev[prev_t < 0] = torch.tensor(1.0)
# 计算当前和前一个时间步的 beta 值
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
# 计算方差,基于 beta 和 alpha 的关系
variance = (beta_prod_t_prev / beta_prod_t) * (1 - alpha_prod_t / alpha_prod_t_prev)
# 返回计算出的方差
return variance
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler._threshold_sample 复制的方法
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
"""
"动态阈值处理:在每个采样步骤中,我们将 s 设置为 xt0(在时间步 t 时对 x_0 的预测)中的某个百分位绝对像素值,
如果 s > 1,则将 xt0 阈值限制到 [-s, s] 的范围内,然后除以 s。动态阈值处理将饱和像素(接近 -1 和 1 的像素)向内推,
从而在每个步骤中主动防止像素饱和。我们发现动态阈值处理可以显著提高照片真实感以及更好的图像-文本对齐,
尤其是在使用非常大的引导权重时。"
https://arxiv.org/abs/2205.11487
"""
# 获取输入样本的数据类型
dtype = sample.dtype
# 获取输入样本的批量大小、通道数和其他维度
batch_size, channels, *remaining_dims = sample.shape
# 如果数据类型不是 float32 或 float64,则将样本转换为 float
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # 为分位数计算进行提升,并且没有实现 cpu half 的 clamping
# 将样本展平,以便对每个图像进行分位数计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值
abs_sample = sample.abs() # "某个百分位绝对像素值"
# 计算绝对值样本的分位数
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 将分位数限制在最小值 1 和最大值 sample_max_value 之间
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # 限制在最小值 1 时,相当于标准裁剪到 [-1, 1]
# 扩展维度以便后续广播
s = s.unsqueeze(1) # (batch_size, 1),因为 clamp 会在 dim=0 上广播
# 对样本进行裁剪并除以 s,限制在范围 [-s, s] 内
sample = torch.clamp(sample, -s, s) / s # "我们将 xt0 阈值限制到 [-s, s] 的范围内,然后除以 s"
# 将样本形状还原为原始结构
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原始数据类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 从 diffusers.schedulers.scheduling_ddim.DDIMScheduler.set_timesteps 复制的方法
# 定义设置离散时间步长的方法,用于扩散链(在推理之前运行)
def set_timesteps(self, num_inference_steps: int, device: Union[str, torch.device] = None):
"""
设置用于扩散链的离散时间步长(在推理之前运行)。
参数:
num_inference_steps (`int`):
生成样本时使用的扩散步骤数,基于预训练模型。
"""
# 检查推理步骤数是否大于训练时的最大时间步数
if num_inference_steps > self.config.num_train_timesteps:
# 如果大于,则抛出错误,提示用户不可超过训练时间步数
raise ValueError(
f"`num_inference_steps`: {num_inference_steps} cannot be larger than `self.config.train_timesteps`:"
f" {self.config.num_train_timesteps} as the unet model trained with this scheduler can only handle"
f" maximal {self.config.num_train_timesteps} timesteps."
)
# 将传入的推理步骤数赋值给实例变量
self.num_inference_steps = num_inference_steps
# 根据配置的时间步长间隔类型生成时间步数组
# "linspace", "leading", "trailing" 对应于 https://arxiv.org/abs/2305.08891 表 2 的注释
if self.config.timestep_spacing == "linspace":
# 创建一个均匀分布的时间步数组,并进行反转和类型转换
timesteps = (
np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps)
.round()[::-1]
.copy()
.astype(np.int64)
)
elif self.config.timestep_spacing == "leading":
# 计算每个步骤的比例
step_ratio = self.config.num_train_timesteps // self.num_inference_steps
# 通过比例生成整数时间步,反转并转换为 int64 类型
timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64)
# 添加偏移量
timesteps += self.config.steps_offset
elif self.config.timestep_spacing == "trailing":
# 计算每个步骤的比例(浮点数)
step_ratio = self.config.num_train_timesteps / self.num_inference_steps
# 通过比例生成整数时间步,反转并转换为 int64 类型
timesteps = np.round(np.arange(self.config.num_train_timesteps, 0, -step_ratio)).astype(np.int64)
# 减去 1 以确保时间步正确
timesteps -= 1
else:
# 如果时间步间隔类型不支持,则抛出错误
raise ValueError(
f"{self.config.timestep_spacing} is not supported. Please make sure to choose one of 'leading' or 'trailing'."
)
# 将时间步数组转换为 PyTorch 张量,并移动到指定设备
self.timesteps = torch.from_numpy(timesteps).to(device)
# 定义一步推理的方法
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
eta: float = 0.0,
use_clipped_model_output: bool = False,
generator=None,
variance_noise: Optional[torch.Tensor] = None,
return_dict: bool = True,
# 定义批量推理的方法,且不添加噪声
def batch_step_no_noise(
self,
model_output: torch.Tensor,
timesteps: List[int],
sample: torch.Tensor,
eta: float = 0.0,
use_clipped_model_output: bool = False,
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.add_noise 复制的内容
# 定义一个添加噪声的函数,接受原始样本、噪声和时间步长作为输入
def add_noise(
self,
original_samples: torch.Tensor, # 原始样本的张量
noise: torch.Tensor, # 噪声的张量
timesteps: torch.IntTensor, # 时间步长的张量
) -> torch.Tensor: # 返回添加噪声后的张量
# 确保 alphas_cumprod 和 timesteps 与 original_samples 在相同的设备和数据类型上
# 将 self.alphas_cumprod 移动到相应设备,以避免后续 add_noise 调用中冗余的 CPU 到 GPU 数据移动
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为与原始样本相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将时间步长移动到与原始样本相同的设备
timesteps = timesteps.to(original_samples.device)
# 计算平方根的 alpha 累积乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将其展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 original_samples,则在最后一维扩展
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 (1 - alpha) 累积乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将其展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 original_samples,则在最后一维扩展
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算添加噪声后的样本
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回添加噪声后的样本
return noisy_samples
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.get_velocity 复制的函数
# 定义一个获取速度的函数,接受样本、噪声和时间步长作为输入
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timesteps 与 sample 在相同的设备和数据类型上
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 转换为与样本相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将时间步长移动到与样本相同的设备
timesteps = timesteps.to(sample.device)
# 计算平方根的 alpha 累积乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将其展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 如果 sqrt_alpha_prod 的维度小于 sample,则在最后一维扩展
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 (1 - alpha) 累积乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将其展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 如果 sqrt_one_minus_alpha_prod 的维度小于 sample,则在最后一维扩展
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算出的速度
return velocity
# 定义一个获取训练时间步长数量的函数
def __len__(self():
# 返回配置中的训练时间步长数量
return self.config.num_train_timesteps
# 版权声明,表明版权所有者和许可信息
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 您只能在遵守许可证的情况下使用此文件。
# 您可以在以下地址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件在许可证下分发是基于“按现状”原则,
# 不提供任何形式的担保或条件,无论是明示还是暗示。
# 有关许可证的特定条款和条件,请参见许可证。
# 免责声明:此文件受到 https://github.com/ermongroup/ddim 的强烈影响
# 导入数学模块
import math
# 从 dataclasses 导入 dataclass 装饰器
from dataclasses import dataclass
# 导入类型注解
from typing import List, Optional, Tuple, Union
# 导入 numpy 库
import numpy as np
# 导入 PyTorch 库
import torch
# 导入配置相关的工具和注册功能
from ..configuration_utils import ConfigMixin, register_to_config
# 导入基本输出工具
from ..utils import BaseOutput
# 导入生成随机张量的工具
from ..utils.torch_utils import randn_tensor
# 导入调度相关的工具
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
# 定义调度器输出的数据类
@dataclass
class DDPMSchedulerOutput(BaseOutput):
"""
调度器 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 用于图像):
先前时间步的计算样本 `(x_{t-1})`。 `prev_sample` 应作为下一个模型输入使用
在去噪循环中。
pred_original_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)` 用于图像):
基于当前时间步模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进展或提供指导。
"""
# 定义上一个样本的张量
prev_sample: torch.Tensor
# 可选的预测原始样本的张量
pred_original_sample: Optional[torch.Tensor] = None
# 定义用于生成 beta 的函数
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个 beta 计划,该计划离散化给定的 alpha_t_bar 函数,该函数定义了
(1-beta) 在时间上的累积乘积,从 t = [0,1]。
包含一个 alpha_bar 函数,该函数接受参数 t 并将其转换为 (1-beta) 的累积乘积
直到扩散过程的那一部分。
参数:
num_diffusion_timesteps (`int`): 生成的 beta 数量。
max_beta (`float`): 使用的最大 beta;使用小于 1 的值以
防止奇异性。
alpha_transform_type (`str`, *可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于对模型输出进行步骤的 betas
"""
# 判断 alpha 变换类型是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义 alpha_bar 函数,接受参数 t
def alpha_bar_fn(t):
# 返回基于余弦函数的转换值
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义一个函数,根据输入 t 返回指数衰减值
def alpha_bar_fn(t):
return math.exp(t * -12.0)
# 如果 alpha_transform_type 不支持,则抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化一个空列表 betas,用于存储 beta 值
betas = []
# 循环 num_diffusion_timesteps 次以计算 beta 值
for i in range(num_diffusion_timesteps):
# 计算当前时间步 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值并添加到 betas 列表,限制最大值为 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 betas 转换为张量并返回
return torch.tensor(betas, dtype=torch.float32)
# 从 diffusers.schedulers.scheduling_ddim 导入 rescale_zero_terminal_snr 函数
def rescale_zero_terminal_snr(betas):
"""
将 betas 重缩放为具有零终端 SNR,基于 https://arxiv.org/pdf/2305.08891.pdf (算法 1)
参数:
betas (`torch.Tensor`):
初始化调度器时使用的 betas。
返回:
`torch.Tensor`: 重缩放后的 betas,具有零终端 SNR
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 开平方以得到 alphas_bar_sqrt
# 存储旧值。
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 复制第一个值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 复制最后一个值
# 移动,使最后时间步为零。
alphas_bar_sqrt -= alphas_bar_sqrt_T # 从每个值中减去最后一个值
# 缩放,使第一个时间步恢复到旧值。
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 计算缩放因子
# 将 alphas_bar_sqrt 转换回 betas
alphas_bar = alphas_bar_sqrt**2 # 还原平方
alphas = alphas_bar[1:] / alphas_bar[:-1] # 还原累积乘积
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个值与其余部分连接
betas = 1 - alphas # 计算 betas
return betas # 返回计算后的 betas
class DDPMScheduler(SchedulerMixin, ConfigMixin):
"""
`DDPMScheduler` 探索去噪得分匹配与 Langevin 动力学采样之间的关系。
此模型继承自 [`SchedulerMixin`] 和 [`ConfigMixin`]。查看超类文档以了解库为所有调度器实现的通用
方法,如加载和保存。
# 参数定义部分,描述各参数的作用和默认值
Args:
num_train_timesteps (`int`, defaults to 1000): # 训练模型的扩散步骤数,默认值为1000
The number of diffusion steps to train the model. # 描述参数的作用
beta_start (`float`, defaults to 0.0001): # 推理的起始`beta`值,默认值为0.0001
The starting `beta` value of inference. # 描述参数的作用
beta_end (`float`, defaults to 0.02): # 最终的`beta`值,默认值为0.02
The final `beta` value. # 描述参数的作用
beta_schedule (`str`, defaults to `"linear"`): # `beta`调度方式,默认为“线性”
The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from # 描述参数的作用
`linear`, `scaled_linear`, or `squaredcos_cap_v2`. # 可选的调度方式
trained_betas (`np.ndarray`, *optional*): # 直接传递给构造函数的`beta`数组,可选参数
An array of betas to pass directly to the constructor without using `beta_start` and `beta_end`. # 描述参数的作用
variance_type (`str`, defaults to `"fixed_small"`): # 添加噪声时裁剪方差的类型,默认为“固定小”
Clip the variance when adding noise to the denoised sample. Choose from `fixed_small`, `fixed_small_log`, # 描述参数的作用
`fixed_large`, `fixed_large_log`, `learned` or `learned_range`. # 可选的方差类型
clip_sample (`bool`, defaults to `True`): # 用于数值稳定性的样本裁剪,默认为True
Clip the predicted sample for numerical stability. # 描述参数的作用
clip_sample_range (`float`, defaults to 1.0): # 样本裁剪的最大幅度,默认为1.0,仅在`clip_sample=True`时有效
The maximum magnitude for sample clipping. Valid only when `clip_sample=True`. # 描述参数的作用
prediction_type (`str`, defaults to `epsilon`, *optional*): # 调度函数的预测类型,默认为“epsilon”
Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process), # 描述参数的作用
`sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen # 可选的预测类型
Video](https://imagen.research.google/video/paper.pdf) paper). # 引用文献
thresholding (`bool`, defaults to `False`): # 是否使用“动态阈值”方法,默认为False
Whether to use the "dynamic thresholding" method. This is unsuitable for latent-space diffusion models such # 描述参数的作用
as Stable Diffusion. # 说明不适用的模型
dynamic_thresholding_ratio (`float`, defaults to 0.995): # 动态阈值方法的比率,默认为0.995,仅在`thresholding=True`时有效
The ratio for the dynamic thresholding method. Valid only when `thresholding=True`. # 描述参数的作用
sample_max_value (`float`, defaults to 1.0): # 动态阈值的阈值,默认为1.0,仅在`thresholding=True`时有效
The threshold value for dynamic thresholding. Valid only when `thresholding=True`. # 描述参数的作用
timestep_spacing (`str`, defaults to `"leading"`): # 时间步的缩放方式,默认为“领先”
The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and # 描述参数的作用
Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information. # 引用文献
steps_offset (`int`, defaults to 0): # 推理步骤的偏移量,默认为0
An offset added to the inference steps, as required by some model families. # 描述参数的作用
rescale_betas_zero_snr (`bool`, defaults to `False`): # 是否重新缩放`beta`以使终端SNR为零,默认为False
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and # 描述参数的作用
dark samples instead of limiting it to samples with medium brightness. Loosely related to # 说明模型的功能
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506). # 引用文献
# 从 KarrasDiffusionSchedulers 中提取每个调度器的名称,生成一个兼容的列表
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
# 设置顺序为 1
order = 1
# 装饰器,将该方法注册到配置中
@register_to_config
def __init__(
# 定义训练时间步数,默认为 1000
num_train_timesteps: int = 1000,
# 定义 beta 开始值,默认为 0.0001
beta_start: float = 0.0001,
# 定义 beta 结束值,默认为 0.02
beta_end: float = 0.02,
# 定义 beta 调度方式,默认为 "linear"
beta_schedule: str = "linear",
# 可选参数,训练好的 beta 值,可以是数组或列表
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
# 定义方差类型,默认为 "fixed_small"
variance_type: str = "fixed_small",
# 定义是否裁剪样本,默认为 True
clip_sample: bool = True,
# 定义预测类型,默认为 "epsilon"
prediction_type: str = "epsilon",
# 定义是否使用阈值处理,默认为 False
thresholding: bool = False,
# 定义动态阈值处理的比例,默认为 0.995
dynamic_thresholding_ratio: float = 0.995,
# 定义样本裁剪范围,默认为 1.0
clip_sample_range: float = 1.0,
# 定义样本最大值,默认为 1.0
sample_max_value: float = 1.0,
# 定义时间步间距方式,默认为 "leading"
timestep_spacing: str = "leading",
# 定义时间步偏移量,默认为 0
steps_offset: int = 0,
# 定义是否为零 SNR 重缩放,默认为 False
rescale_betas_zero_snr: bool = False,
):
# 如果提供了训练好的 beta 值,则将其转换为张量
if trained_betas is not None:
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果 beta 调度为线性,则生成线性 beta 序列
elif beta_schedule == "linear":
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果 beta 调度为 scaled_linear,则生成特定的 beta 序列
elif beta_schedule == "scaled_linear":
# 该调度特定于潜在扩散模型
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果 beta 调度为 squaredcos_cap_v2,则使用 Glide 余弦调度生成 beta 序列
elif beta_schedule == "squaredcos_cap_v2":
# Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果 beta 调度为 sigmoid,则生成 Sigmoid 调度的 beta 序列
elif beta_schedule == "sigmoid":
# GeoDiff Sigmoid 调度
betas = torch.linspace(-6, 6, num_train_timesteps)
self.betas = torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
# 如果 beta 调度方式不被实现,抛出未实现错误
else:
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 如果选择了零 SNR 重缩放,则进行重缩放处理
if rescale_betas_zero_snr:
self.betas = rescale_zero_terminal_snr(self.betas)
# 计算 alpha 值,alpha = 1 - beta
self.alphas = 1.0 - self.betas
# 计算累积的 alpha 值
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 创建一个包含 1.0 的张量
self.one = torch.tensor(1.0)
# 设置初始噪声分布的标准差
self.init_noise_sigma = 1.0
# 可设置的值,表示是否自定义时间步
self.custom_timesteps = False
# 推理步骤的数量,初始为 None
self.num_inference_steps = None
# 创建一个反向的时间步序列
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy())
# 设置方差类型
self.variance_type = variance_type
# 定义缩放模型输入的方法
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器之间的互换性。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
缩放后的输入样本。
"""
# 返回未缩放的输入样本
return sample
# 设置推断步骤和设备等参数的函数
def set_timesteps(
# 可选的推断步骤数
self,
num_inference_steps: Optional[int] = None,
# 可选的设备类型,字符串或 torch.device
device: Union[str, torch.device] = None,
# 可选的时间步列表
timesteps: Optional[List[int]] = None,
# 获取方差的函数
def _get_variance(self, t, predicted_variance=None, variance_type=None):
# 获取当前时间步的前一个时间步
prev_t = self.previous_timestep(t)
# 获取当前时间步的累积 alpha 值
alpha_prod_t = self.alphas_cumprod[t]
# 获取前一个时间步的累积 alpha 值,如果没有则使用 one
alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one
# 计算当前时间步的 beta 值
current_beta_t = 1 - alpha_prod_t / alpha_prod_t_prev
# 计算预测的方差,依据文献中的公式
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * current_beta_t
# 确保方差的对数不为零,限制其最小值
variance = torch.clamp(variance, min=1e-20)
# 如果未指定方差类型,则使用配置中的方差类型
if variance_type is None:
variance_type = self.config.variance_type
# 针对训练稳定性的一些调整
if variance_type == "fixed_small":
variance = variance
# 针对 rl-diffuser 的特定处理
elif variance_type == "fixed_small_log":
variance = torch.log(variance)
variance = torch.exp(0.5 * variance)
elif variance_type == "fixed_large":
variance = current_beta_t
elif variance_type == "fixed_large_log":
# Glide 的最大对数方差
variance = torch.log(current_beta_t)
elif variance_type == "learned":
# 返回预测的方差
return predicted_variance
elif variance_type == "learned_range":
# 计算最小和最大对数方差,并根据预测方差加权
min_log = torch.log(variance)
max_log = torch.log(current_beta_t)
frac = (predicted_variance + 1) / 2
variance = frac * max_log + (1 - frac) * min_log
# 返回最终的方差值
return variance
# 定义私有方法,进行动态阈值采样,输入为样本张量,输出为处理后的张量
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
"""
"动态阈值处理:在每个采样步骤中,我们将 s 设置为 xt0 中某个百分位的绝对像素值(t 时刻对 x_0 的预测),
如果 s > 1,则将 xt0 限制在 [-s, s] 范围内,然后除以 s。动态阈值处理将饱和像素(接近 -1 和 1 的像素)
向内推,使每一步都能主动防止像素饱和。我们发现动态阈值处理显著提高了真实感以及图像与文本的对齐,
尤其是在使用非常大的引导权重时。"
https://arxiv.org/abs/2205.11487
"""
# 获取样本的数据类型
dtype = sample.dtype
# 获取样本的批量大小、通道数以及剩余维度
batch_size, channels, *remaining_dims = sample.shape
# 如果数据类型不是 float32 或 float64,则转换为 float
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # 为了进行分位数计算而进行类型提升,且 clamp 对 CPU half 类型未实现
# 将样本展平,以便在每个图像上进行分位数计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值
abs_sample = sample.abs() # "某个百分位的绝对像素值"
# 计算每个样本的动态阈值 s
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 将 s 限制在 min=1 和 max=self.config.sample_max_value 之间
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # 当被限制到 min=1 时,相当于标准的裁剪到 [-1, 1]
# 在第一个维度上扩展 s 的维度,使其形状为 (batch_size, 1)
s = s.unsqueeze(1) # (batch_size, 1),因为 clamp 会在维度 0 上广播
# 将样本裁剪到 [-s, s] 范围内,然后除以 s
sample = torch.clamp(sample, -s, s) / s # "我们将 xt0 限制在 [-s, s] 范围内,然后除以 s"
# 将样本恢复到原来的形状
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原来的数据类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 定义步进方法,输入为模型输出、时间步、样本、生成器和返回字典的标志
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
# 定义添加噪声的方法,输入为原始样本、噪声和时间步
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.IntTensor,
# 返回类型为 torch.Tensor
) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 具有与 original_samples 相同的设备和数据类型
# 将 self.alphas_cumprod 移动到设备上,以避免后续 add_noise 调用时重复的 CPU 到 GPU 数据移动
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为与 original_samples 相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将 timesteps 移动到与 original_samples 相同的设备
timesteps = timesteps.to(original_samples.device)
# 计算平方根的 alpha 乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将结果展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 在 sqrt_alpha_prod 的形状小于 original_samples 时,添加新维度
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 (1 - alpha) 乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将结果展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 在 sqrt_one_minus_alpha_prod 的形状小于 original_samples 时,添加新维度
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 生成带噪声的样本
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回带噪声的样本
return noisy_samples
# 定义获取速度的函数,输入为样本、噪声和时间步
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 具有与 sample 相同的设备和数据类型
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 转换为与 sample 相同的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将 timesteps 移动到与 sample 相同的设备
timesteps = timesteps.to(sample.device)
# 计算平方根的 alpha 乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将结果展平为一维
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 在 sqrt_alpha_prod 的形状小于 sample 时,添加新维度
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 (1 - alpha) 乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将结果展平为一维
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 在 sqrt_one_minus_alpha_prod 的形状小于 sample 时,添加新维度
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算得到的速度
return velocity
# 定义获取对象长度的函数
def __len__(self):
# 返回配置中的训练时间步数
return self.config.num_train_timesteps
# 定义获取上一个时间步的函数
def previous_timestep(self, timestep):
# 如果使用自定义时间步
if self.custom_timesteps:
# 找到当前时间步的索引
index = (self.timesteps == timestep).nonzero(as_tuple=True)[0][0]
# 如果当前索引是最后一个,返回 -1
if index == self.timesteps.shape[0] - 1:
prev_t = torch.tensor(-1)
else:
# 返回下一个时间步
prev_t = self.timesteps[index + 1]
else:
# 计算推理步骤数
num_inference_steps = (
self.num_inference_steps if self.num_inference_steps else self.config.num_train_timesteps
)
# 返回上一个时间步
prev_t = timestep - self.config.num_train_timesteps // num_inference_steps
# 返回上一个时间步
return prev_t
# 版权声明,标明文件的版权归属
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可证授权,此处说明用户使用该文件的条款
# Licensed under the Apache License, Version 2.0 (the "License");
# 只有在遵循许可证的情况下才能使用该文件
# you may not use this file except in compliance with the License.
# 许可证可以在以下网址获取
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 本文件在适用法律或书面协议下是“按现状”提供的,不附带任何形式的保证或条件
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 具体的许可证条款和条件可以在下方找到
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 本文件受到 https://github.com/ermongroup/ddim 的强烈影响
# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim
# 导入数据类功能,用于定义简单的类
from dataclasses import dataclass
# 导入类型定义,便于类型注解
from typing import Optional, Tuple, Union
# 导入 Flax 和 JAX 库,用于机器学习
import flax
import jax
import jax.numpy as jnp
# 导入配置和调度相关的工具
from ..configuration_utils import ConfigMixin, register_to_config
from .scheduling_utils_flax import (
# 导入通用调度器状态
CommonSchedulerState,
# 导入 Karras 扩散调度器
FlaxKarrasDiffusionSchedulers,
# 导入调度混合类
FlaxSchedulerMixin,
# 导入调度输出类
FlaxSchedulerOutput,
# 导入添加噪声的通用函数
add_noise_common,
# 导入获取速度的通用函数
get_velocity_common,
)
# 定义 DDPMSchedulerState 数据类,保存调度器的状态信息
@flax.struct.dataclass
class DDPMSchedulerState:
# 包含通用调度器状态
common: CommonSchedulerState
# 可设置的属性值
init_noise_sigma: jnp.ndarray # 初始噪声标准差
timesteps: jnp.ndarray # 时间步数组
num_inference_steps: Optional[int] = None # 推断步骤数,可选
# 类方法,用于创建 DDPMSchedulerState 实例
@classmethod
def create(cls, common: CommonSchedulerState, init_noise_sigma: jnp.ndarray, timesteps: jnp.ndarray):
# 返回一个新的 DDPMSchedulerState 实例
return cls(common=common, init_noise_sigma=init_noise_sigma, timesteps=timesteps)
# 定义 FlaxDDPMSchedulerOutput 数据类,继承自 FlaxSchedulerOutput
@dataclass
class FlaxDDPMSchedulerOutput(FlaxSchedulerOutput):
state: DDPMSchedulerState # 调度器的状态
# 定义 FlaxDDPMScheduler 类,结合调度和配置功能
class FlaxDDPMScheduler(FlaxSchedulerMixin, ConfigMixin):
"""
Denoising diffusion probabilistic models (DDPMs) explores the connections between denoising score matching and
Langevin dynamics sampling.
[`~ConfigMixin`] takes care of storing all config attributes that are passed in the scheduler's `__init__`
function, such as `num_train_timesteps`. They can be accessed via `scheduler.config.num_train_timesteps`.
[`SchedulerMixin`] provides general loading and saving functionality via the [`SchedulerMixin.save_pretrained`] and
[`~SchedulerMixin.from_pretrained`] functions.
For more details, see the original paper: https://arxiv.org/abs/2006.11239
"""
# 文档字符串,说明初始化函数的参数
Args:
num_train_timesteps (`int`): 训练模型所用的扩散步骤数量。
beta_start (`float`): 推断的起始 `beta` 值。
beta_end (`float`): 最终的 `beta` 值。
beta_schedule (`str`):
beta 调度,表示从 beta 范围到模型步骤序列的映射。可选值有
`linear`、`scaled_linear` 或 `squaredcos_cap_v2`。
trained_betas (`np.ndarray`, optional):
直接传递 beta 数组到构造函数的选项,以绕过 `beta_start`、`beta_end` 等参数。
variance_type (`str`):
用于添加噪声到去噪样本时裁剪方差的选项。可选值有 `fixed_small`、
`fixed_small_log`、`fixed_large`、`fixed_large_log`、`learned` 或 `learned_range`。
clip_sample (`bool`, default `True`):
裁剪预测样本在 -1 和 1 之间以确保数值稳定性的选项。
prediction_type (`str`, default `epsilon`):
指示模型是预测噪声(epsilon)还是样本。可选值为 `epsilon`、`sample`。
对于此调度器不支持 `v-prediction`。
dtype (`jnp.dtype`, *optional*, defaults to `jnp.float32`):
用于参数和计算的 `dtype` 类型。
"""
# 获取所有兼容的调度器名称
_compatibles = [e.name for e in FlaxKarrasDiffusionSchedulers]
# 定义数据类型的变量
dtype: jnp.dtype
# 属性,返回是否有状态
@property
def has_state(self):
# 返回 True,表示有状态信息
return True
# 注册到配置的初始化方法
@register_to_config
def __init__(
# 训练步骤的默认值
num_train_timesteps: int = 1000,
# 起始 beta 值的默认值
beta_start: float = 0.0001,
# 最终 beta 值的默认值
beta_end: float = 0.02,
# beta 调度的默认值
beta_schedule: str = "linear",
# 训练好的 beta 数组,默认为 None
trained_betas: Optional[jnp.ndarray] = None,
# 方差类型的默认值
variance_type: str = "fixed_small",
# 裁剪样本的默认值
clip_sample: bool = True,
# 预测类型的默认值
prediction_type: str = "epsilon",
# 数据类型的默认值
dtype: jnp.dtype = jnp.float32,
):
# 设置数据类型
self.dtype = dtype
# 创建状态的方法
def create_state(self, common: Optional[CommonSchedulerState] = None) -> DDPMSchedulerState:
# 如果未提供公共状态,则创建新的公共状态
if common is None:
common = CommonSchedulerState.create(self)
# 初始噪声分布的标准差
init_noise_sigma = jnp.array(1.0, dtype=self.dtype)
# 生成从 num_train_timesteps 到 0 的时间步数组,进行反转
timesteps = jnp.arange(0, self.config.num_train_timesteps).round()[::-1]
# 返回创建的 DDPMSchedulerState 对象
return DDPMSchedulerState.create(
common=common,
init_noise_sigma=init_noise_sigma,
timesteps=timesteps,
)
# 缩放模型输入的方法
def scale_model_input(
self, state: DDPMSchedulerState, sample: jnp.ndarray, timestep: Optional[int] = None
) -> jnp.ndarray:
"""
Args:
state (`PNDMSchedulerState`): `FlaxPNDMScheduler` 状态数据类实例。
sample (`jnp.ndarray`): 输入样本
timestep (`int`, optional): 当前时间步
Returns:
`jnp.ndarray`: 缩放后的输入样本
"""
# 返回输入样本,未进行缩放处理
return sample
# 定义设置时间步长的方法,参数包括状态、推理步骤数量和形状
def set_timesteps(
self, state: DDPMSchedulerState, num_inference_steps: int, shape: Tuple = ()
) -> DDPMSchedulerState:
"""
设置扩散链中使用的离散时间步长。用于推理前的辅助函数。
Args:
state (`DDIMSchedulerState`):
`FlaxDDPMScheduler`状态数据类实例。
num_inference_steps (`int`):
在使用预训练模型生成样本时使用的扩散步骤数量。
"""
# 计算步骤比例,通过训练时间步数除以推理步骤数量
step_ratio = self.config.num_train_timesteps // num_inference_steps
# 通过乘以比例创建整数时间步,进行四舍五入以避免当推理步骤为3的幂时出现问题
timesteps = (jnp.arange(0, num_inference_steps) * step_ratio).round()[::-1]
# 返回替换后的状态,包括推理步骤数量和时间步
return state.replace(
num_inference_steps=num_inference_steps,
timesteps=timesteps,
)
# 定义获取方差的方法,参数包括状态、时间步、预测方差和方差类型
def _get_variance(self, state: DDPMSchedulerState, t, predicted_variance=None, variance_type=None):
# 从状态中获取当前时间步的累积 alpha 值
alpha_prod_t = state.common.alphas_cumprod[t]
# 获取前一个时间步的累积 alpha 值,如果 t 大于 0,则取前一个值,否则为 1.0
alpha_prod_t_prev = jnp.where(t > 0, state.common.alphas_cumprod[t - 1], jnp.array(1.0, dtype=self.dtype))
# 计算预测方差 βt(见公式 (6) 和 (7)),并从中采样以获得前一个样本
# x_{t-1} ~ N(pred_prev_sample, variance) == 将方差添加到预测样本中
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * state.common.betas[t]
# 如果没有提供方差类型,则使用配置中的默认方差类型
if variance_type is None:
variance_type = self.config.variance_type
# 进行各种处理以提高训练稳定性
if variance_type == "fixed_small":
# 将方差裁剪到最小值 1e-20
variance = jnp.clip(variance, a_min=1e-20)
# 对于 rl-diffuser,进行 log 转换处理
elif variance_type == "fixed_small_log":
variance = jnp.log(jnp.clip(variance, a_min=1e-20))
elif variance_type == "fixed_large":
# 使用当前时间步的 beta 值作为方差
variance = state.common.betas[t]
elif variance_type == "fixed_large_log":
# 对当前时间步的 beta 值进行 log 转换
variance = jnp.log(state.common.betas[t])
elif variance_type == "learned":
# 返回预测的方差
return predicted_variance
elif variance_type == "learned_range":
# 计算最小和最大 log 方差
min_log = variance
max_log = state.common.betas[t]
# 计算比例并混合方差
frac = (predicted_variance + 1) / 2
variance = frac * max_log + (1 - frac) * min_log
# 返回计算后的方差
return variance
# 定义执行一步推理的步骤,参数包括状态、模型输出、时间步、样本和其他可选参数
def step(
self,
state: DDPMSchedulerState,
model_output: jnp.ndarray,
timestep: int,
sample: jnp.ndarray,
key: Optional[jax.Array] = None,
return_dict: bool = True,
def add_noise(
self,
state: DDPMSchedulerState,
original_samples: jnp.ndarray,
noise: jnp.ndarray,
timesteps: jnp.ndarray,
# 返回添加噪声后的样本,基于公共状态和输入参数
) -> jnp.ndarray:
return add_noise_common(state.common, original_samples, noise, timesteps)
# 获取样本的速度,使用调度器状态和噪声等参数
def get_velocity(
self,
state: DDPMSchedulerState,
sample: jnp.ndarray,
noise: jnp.ndarray,
timesteps: jnp.ndarray,
) -> jnp.ndarray:
# 调用公共函数以计算速度
return get_velocity_common(state.common, sample, noise, timesteps)
# 返回训练时间步数的数量
def __len__(self):
return self.config.num_train_timesteps
# 版权声明,说明文件的版权所有者及许可信息
# Copyright 2024 ParaDiGMS authors and The HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0 授权使用该文件
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 你可以在以下网址获取许可证副本
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律要求或书面同意,否则以 "AS IS" 基础分发该文件,没有任何形式的担保
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以了解特定的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 声明该文件受 https://github.com/ermongroup/ddim 的影响
# DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim
# 导入数学库
import math
# 导入数据类装饰器
from dataclasses import dataclass
# 导入类型注解
from typing import List, Optional, Tuple, Union
# 导入 NumPy 库
import numpy as np
# 导入 PyTorch 库
import torch
# 从配置工具导入混合类和注册函数
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具导入基础输出类
from ..utils import BaseOutput
# 从 PyTorch 工具导入随机张量函数
from ..utils.torch_utils import randn_tensor
# 从调度工具导入调度器和混合类
from .scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin
@dataclass
# 定义用于调度器输出的类,继承自基础输出类
class DDPMParallelSchedulerOutput(BaseOutput):
"""
调度器的 `step` 函数输出的输出类。
参数:
prev_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
先前时间步的计算样本 `(x_{t-1})`。 `prev_sample` 应用作下一次模型输入
在去噪循环中。
pred_original_sample (`torch.Tensor` 形状为 `(batch_size, num_channels, height, width)` 的图像):
基于当前时间步的模型输出的预测去噪样本 `(x_{0})`。
`pred_original_sample` 可用于预览进度或进行指导。
"""
# 先前样本的张量
prev_sample: torch.Tensor
# 可选的预测去噪样本张量
pred_original_sample: Optional[torch.Tensor] = None
# 从贝塔计算函数复制的定义
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个贝塔调度,离散化给定的 alpha_t_bar 函数,该函数定义了时间 t=[0,1] 的 (1-beta) 的累积乘积。
包含一个 alpha_bar 函数,该函数接收参数 t 并转换为 (1-beta) 在扩散过程中的累积乘积。
参数:
num_diffusion_timesteps (`int`): 要生成的贝塔数量。
max_beta (`float`): 要使用的最大贝塔值;使用小于 1 的值以防止奇异性。
alpha_transform_type (`str`, *可选*, 默认为 `cosine`): alpha_bar 的噪声调度类型。
选择 `cosine` 或 `exp`
返回:
betas (`np.ndarray`): 调度器用于逐步模型输出的贝塔值
"""
# 检查 alpha_transform_type 是否为 "cosine"
if alpha_transform_type == "cosine":
# 定义 alpha_bar_fn 函数,计算 cos 变换
def alpha_bar_fn(t):
# 返回 cos 函数值的平方,用于 alpha 变换
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
# 检查 alpha_transform_type 是否为 "exp"
elif alpha_transform_type == "exp":
# 定义 alpha_bar_fn 函数,计算指数衰减
def alpha_bar_fn(t):
# 返回指数衰减值,用于 alpha 变换
return math.exp(t * -12.0)
# 如果 alpha_transform_type 不符合预期,抛出异常
else:
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化空列表,用于存储 beta 值
betas = []
# 遍历每一个扩散时间步
for i in range(num_diffusion_timesteps):
# 计算当前时间步的比例 t1
t1 = i / num_diffusion_timesteps
# 计算下一个时间步的比例 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算 beta 值,并加入到 betas 列表中,取最小值以限制在 max_beta 之内
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 betas 列表转换为 PyTorch 张量,并指定数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 从 diffusers.schedulers.scheduling_ddim.rescale_zero_terminal_snr 复制而来
def rescale_zero_terminal_snr(betas):
"""
根据 https://arxiv.org/pdf/2305.08891.pdf (算法 1) 调整 betas 使其具有零终端 SNR
参数:
betas (`torch.Tensor`):
初始化调度器时使用的 betas。
返回:
`torch.Tensor`: 调整后的具有零终端 SNR 的 betas
"""
# 将 betas 转换为 alphas_bar_sqrt
alphas = 1.0 - betas # 计算 alphas,表示每个时间步的保留比率
alphas_cumprod = torch.cumprod(alphas, dim=0) # 计算 alphas 的累积乘积
alphas_bar_sqrt = alphas_cumprod.sqrt() # 计算累积乘积的平方根
# 存储旧值
alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() # 记录初始值
alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # 记录最后值
# 移动,使最后一个时间步为零
alphas_bar_sqrt -= alphas_bar_sqrt_T # 将最后时间步的值减去
# 缩放,使第一个时间步恢复到旧值
alphas_bar_sqrt *= alphas_bar_sqrt_0 / (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # 调整比例
# 将 alphas_bar_sqrt 转换回 betas
alphas_bar = alphas_bar_sqrt**2 # 还原平方根
alphas = alphas_bar[1:] / alphas_bar[:-1] # 还原累积乘积
alphas = torch.cat([alphas_bar[0:1], alphas]) # 将第一个元素加入 alphas
betas = 1 - alphas # 计算新的 betas
return betas # 返回调整后的 betas
class DDPMParallelScheduler(SchedulerMixin, ConfigMixin):
"""
去噪扩散概率模型 (DDPM) 探索去噪评分匹配与 Langevin 动力学采样之间的联系。
[`~ConfigMixin`] 负责存储调度器 `__init__` 函数中传入的所有配置属性,例如 `num_train_timesteps`。
这些属性可以通过 `scheduler.config.num_train_timesteps` 访问。
[`SchedulerMixin`] 提供通用的加载和保存功能,通过 [`SchedulerMixin.save_pretrained`] 和
[`~SchedulerMixin.from_pretrained`] 函数。
更多详细信息,请参阅原始论文: https://arxiv.org/abs/2006.11239
# 参数说明
Args:
# 训练模型所用的扩散步骤数量
num_train_timesteps (`int`): number of diffusion steps used to train the model.
# 推理时的起始 `beta` 值
beta_start (`float`): the starting `beta` value of inference.
# 推理时的最终 `beta` 值
beta_end (`float`): the final `beta` value.
# `beta` 调度策略
beta_schedule (`str`):
# 从 `beta` 范围映射到一系列 `beta` 值,用于模型的步骤选择
the beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
`linear`, `scaled_linear`, `squaredcos_cap_v2` or `sigmoid`.
# 可选参数,直接传递 `beta` 数组到构造函数,绕过 `beta_start` 和 `beta_end`
trained_betas (`np.ndarray`, optional):
option to pass an array of betas directly to the constructor to bypass `beta_start`, `beta_end` etc.
# 噪声添加时的方差裁剪选项
variance_type (`str`):
# 噪声添加时方差的裁剪选项,选择包括 `fixed_small`、`fixed_small_log`、`fixed_large`、`fixed_large_log`、`learned` 或 `learned_range`
options to clip the variance used when adding noise to the denoised sample. Choose from `fixed_small`,
`fixed_small_log`, `fixed_large`, `fixed_large_log`, `learned` or `learned_range`.
# 是否裁剪预测样本以确保数值稳定性
clip_sample (`bool`, default `True`):
option to clip predicted sample for numerical stability.
# 样本裁剪的最大幅度,仅在 `clip_sample=True` 时有效
clip_sample_range (`float`, default `1.0`):
the maximum magnitude for sample clipping. Valid only when `clip_sample=True`.
# 调度函数的预测类型
prediction_type (`str`, default `epsilon`, optional):
# 调度函数的预测类型,选择包括 `epsilon`(预测扩散过程的噪声)、`sample`(直接预测噪声样本)或 `v_prediction`
prediction type of the scheduler function, one of `epsilon` (predicting the noise of the diffusion
process), `sample` (directly predicting the noisy sample`) or `v_prediction` (see section 2.4
https://imagen.research.google/video/paper.pdf)
# 是否使用“动态阈值”方法
thresholding (`bool`, default `False`):
# 是否使用“动态阈值”方法,该方法由 Imagen 引入
whether to use the "dynamic thresholding" method (introduced by Imagen, https://arxiv.org/abs/2205.11487).
Note that the thresholding method is unsuitable for latent-space diffusion models (such as
stable-diffusion).
# 动态阈值方法的比例,仅在 `thresholding=True` 时有效
dynamic_thresholding_ratio (`float`, default `0.995`):
# 动态阈值方法的比例,默认值为 `0.995`
the ratio for the dynamic thresholding method. Default is `0.995`, the same as Imagen
(https://arxiv.org/abs/2205.11487). Valid only when `thresholding=True`.
# 动态阈值的阈值值,仅在 `thresholding=True` 时有效
sample_max_value (`float`, default `1.0`):
# 动态阈值的阈值值,默认值为 `1.0`
the threshold value for dynamic thresholding. Valid only when `thresholding=True`.
# 时间步长的缩放方式
timestep_spacing (`str`, default `"leading"`):
# 时间步长的缩放方式,参考文献提供的表格
The way the timesteps should be scaled. Refer to Table 2. of [Common Diffusion Noise Schedules and Sample
Steps are Flawed](https://arxiv.org/abs/2305.08891) for more information.
# 推理步骤的偏移量
steps_offset (`int`, default `0`):
# 添加到推理步骤的偏移量,一些模型族可能需要
An offset added to the inference steps, as required by some model families.
# 是否将 `beta` 重新缩放到零终端 SNR
rescale_betas_zero_snr (`bool`, defaults to `False`):
# 是否将 `beta` 重新缩放,以实现零终端 SNR
Whether to rescale the betas to have zero terminal SNR. This enables the model to generate very bright and
dark samples instead of limiting it to samples with medium brightness. Loosely related to
[`--offset_noise`](https://github.com/huggingface/diffusers/blob/74fd735eb073eb1d774b1ab4154a0876eb82f055/examples/dreambooth/train_dreambooth.py#L506).
"""
# 获取 KarrasDiffusionSchedulers 中所有调度器的名称,形成兼容列表
_compatibles = [e.name for e in KarrasDiffusionSchedulers]
# 设置调度器的顺序
order = 1
# 标记是否为 ODE 调度器
_is_ode_scheduler = False
@register_to_config
# 定义初始化函数,配置调度器参数
def __init__(
# 训练时间步的数量,默认值为 1000
num_train_timesteps: int = 1000,
# 初始 beta 值,默认值为 0.0001
beta_start: float = 0.0001,
# 结束 beta 值,默认值为 0.02
beta_end: float = 0.02,
# beta 调度方式,默认值为 "linear"
beta_schedule: str = "linear",
# 训练好的 beta 值,可以是 ndarray 或浮点数列表
trained_betas: Optional[Union[np.ndarray, List[float]]] = None,
# 方差类型,默认值为 "fixed_small"
variance_type: str = "fixed_small",
# 是否裁剪样本,默认值为 True
clip_sample: bool = True,
# 预测类型,默认值为 "epsilon"
prediction_type: str = "epsilon",
# 是否进行阈值处理,默认值为 False
thresholding: bool = False,
# 动态阈值处理的比率,默认值为 0.995
dynamic_thresholding_ratio: float = 0.995,
# 裁剪样本的范围,默认值为 1.0
clip_sample_range: float = 1.0,
# 时间步间隔类型,默认值为 "leading"
timestep_spacing: str = "leading",
# 步骤偏移量,默认值为 0
steps_offset: int = 0,
# 是否对零 SNR 进行重新缩放,默认值为 False
rescale_betas_zero_snr: bool = False,
):
# 如果提供了训练好的 beta 值,则将其转换为 tensor
if trained_betas is not None:
self.betas = torch.tensor(trained_betas, dtype=torch.float32)
# 如果使用线性调度,则生成线性变化的 beta 值
elif beta_schedule == "linear":
self.betas = torch.linspace(beta_start, beta_end, num_train_timesteps, dtype=torch.float32)
# 如果使用缩放线性调度,特定于潜在扩散模型
elif beta_schedule == "scaled_linear":
# 生成缩放后的 beta 值
self.betas = torch.linspace(beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2
# 如果使用平方余弦调度,生成对应的 beta 值
elif beta_schedule == "squaredcos_cap_v2":
# 使用 Glide 余弦调度
self.betas = betas_for_alpha_bar(num_train_timesteps)
# 如果使用 sigmoid 调度,生成对应的 beta 值
elif beta_schedule == "sigmoid":
# 使用 GeoDiff 的 sigmoid 调度
betas = torch.linspace(-6, 6, num_train_timesteps)
self.betas = torch.sigmoid(betas) * (beta_end - beta_start) + beta_start
# 如果调度方式未实现,抛出未实现错误
else:
raise NotImplementedError(f"{beta_schedule} is not implemented for {self.__class__}")
# 如果需要,重新缩放 beta 值以适应零 SNR
if rescale_betas_zero_snr:
self.betas = rescale_zero_terminal_snr(self.betas)
# 计算 alpha 值为 1 减去 beta 值
self.alphas = 1.0 - self.betas
# 计算累积 alpha 值的乘积
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
# 创建一个常数 tensor,值为 1.0
self.one = torch.tensor(1.0)
# 初始化噪声分布的标准差,默认值为 1.0
self.init_noise_sigma = 1.0
# 可设置的值,初始化为 False
self.custom_timesteps = False
# 进行推理步骤的数量,初始化为 None
self.num_inference_steps = None
# 创建时间步的 tensor,范围从 0 到 num_train_timesteps,反向排序
self.timesteps = torch.from_numpy(np.arange(0, num_train_timesteps)[::-1].copy())
# 设置方差类型
self.variance_type = variance_type
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.scale_model_input 复制的函数
# 定义一个方法,用于根据当前时间步调整模型输入的大小
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
确保与需要根据当前时间步缩放去噪模型输入的调度器之间的互换性。
参数:
sample (`torch.Tensor`):
输入样本。
timestep (`int`, *可选*):
扩散链中的当前时间步。
返回:
`torch.Tensor`:
一个缩放后的输入样本。
"""
# 直接返回输入样本,未进行任何缩放处理
return sample
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.set_timesteps 复制
def set_timesteps(
# 定义一个可选参数,表示推理步骤的数量
num_inference_steps: Optional[int] = None,
# 定义一个可选参数,表示设备类型
device: Union[str, torch.device] = None,
# 定义一个可选参数,表示时间步的列表
timesteps: Optional[List[int]] = None,
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler._get_variance 复制
def _get_variance(self, t, predicted_variance=None, variance_type=None):
# 获取当前时间步的前一个时间步
prev_t = self.previous_timestep(t)
# 获取当前时间步的累积 alpha 值
alpha_prod_t = self.alphas_cumprod[t]
# 获取前一个时间步的累积 alpha 值,如果前一个时间步小于 0,则使用默认值 one
alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one
# 计算当前时间步的 beta 值
current_beta_t = 1 - alpha_prod_t / alpha_prod_t_prev
# 对于 t > 0,计算预测的方差 βt(参见 https://arxiv.org/pdf/2006.11239.pdf 中的公式 (6) 和 (7))
# 从中进行采样以获得前一个样本
# x_{t-1} ~ N(pred_prev_sample, variance) == 将方差添加到预测样本
variance = (1 - alpha_prod_t_prev) / (1 - alpha_prod_t) * current_beta_t
# 始终取方差的对数,因此需要限制它以确保不为 0
variance = torch.clamp(variance, min=1e-20)
# 如果方差类型未指定,则使用配置中定义的方差类型
if variance_type is None:
variance_type = self.config.variance_type
# 针对不同的方差类型进行处理
# 固定小方差的情况
if variance_type == "fixed_small":
variance = variance
# 针对 rl-diffuser 的情况 https://arxiv.org/abs/2205.09991
elif variance_type == "fixed_small_log":
# 对方差取对数后取指数以计算最终方差
variance = torch.log(variance)
variance = torch.exp(0.5 * variance)
elif variance_type == "fixed_large":
# 将方差设置为当前 beta 值
variance = current_beta_t
elif variance_type == "fixed_large_log":
# 对当前 beta 值取对数
variance = torch.log(current_beta_t)
elif variance_type == "learned":
# 返回预测的方差
return predicted_variance
elif variance_type == "learned_range":
# 计算最小和最大方差的对数
min_log = torch.log(variance)
max_log = torch.log(current_beta_t)
# 计算 frac 以进行线性插值
frac = (predicted_variance + 1) / 2
# 通过线性插值计算方差
variance = frac * max_log + (1 - frac) * min_log
# 返回计算得到的方差
return variance
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler._threshold_sample 复制
# 定义私有方法 _threshold_sample,接受一个张量类型的样本并返回处理后的张量
def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor:
"""
"动态阈值:在每次采样步骤中,我们将 s 设置为 xt0(在时间步 t 对 x_0 的预测)中的某个百分位绝对像素值,
如果 s > 1,则我们将 xt0 的值阈值化到范围 [-s, s],然后除以 s。动态阈值推动饱和像素(接近 -1 和 1 的像素)向内移动,
从而在每一步主动防止像素饱和。我们发现动态阈值显著提高了照片真实感以及图像与文本的对齐,尤其是在使用非常大的引导权重时。"
https://arxiv.org/abs/2205.11487
"""
# 获取样本的数值类型
dtype = sample.dtype
# 获取批次大小、通道数和剩余维度
batch_size, channels, *remaining_dims = sample.shape
# 如果样本类型不是 float32 或 float64,则转换为 float 类型
if dtype not in (torch.float32, torch.float64):
sample = sample.float() # 为量化计算进行上升类型转换,且 cpu 半精度不支持 clamping
# 将样本展平,以便在每张图像上进行百分位计算
sample = sample.reshape(batch_size, channels * np.prod(remaining_dims))
# 计算样本的绝对值
abs_sample = sample.abs() # "某个百分位绝对像素值"
# 计算绝对样本的指定百分位数
s = torch.quantile(abs_sample, self.config.dynamic_thresholding_ratio, dim=1)
# 将 s 的值限制在 [1, sample_max_value] 范围内
s = torch.clamp(
s, min=1, max=self.config.sample_max_value
) # 限制最小值为 1,相当于标准裁剪到 [-1, 1]
# 在第一维度上添加一个维度以便广播
s = s.unsqueeze(1) # (batch_size, 1),因为 clamp 会在第 0 维广播
# 将样本值限制在 [-s, s] 范围内,并归一化
sample = torch.clamp(sample, -s, s) / s # "将 xt0 阈值化到范围 [-s, s],然后除以 s"
# 将样本恢复到原始形状
sample = sample.reshape(batch_size, channels, *remaining_dims)
# 将样本转换回原来的数值类型
sample = sample.to(dtype)
# 返回处理后的样本
return sample
# 定义步进方法 step,接受模型输出、时间步、样本等参数
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor,
generator=None,
return_dict: bool = True,
# 定义无噪声的批处理步骤方法 batch_step_no_noise
def batch_step_no_noise(
self,
model_output: torch.Tensor,
timesteps: List[int],
sample: torch.Tensor,
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.add_noise 复制的方法 add_noise
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.IntTensor,
# 返回类型为 torch.Tensor
) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 的设备和数据类型与 original_samples 相同
# 将 self.alphas_cumprod 移动到目标设备,以避免后续 add_noise 调用时重复的 CPU 到 GPU 数据移动
self.alphas_cumprod = self.alphas_cumprod.to(device=original_samples.device)
# 将 alphas_cumprod 转换为 original_samples 的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=original_samples.dtype)
# 将 timesteps 移动到 original_samples 的设备
timesteps = timesteps.to(original_samples.device)
# 计算平方根的 alpha 累积乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 确保 sqrt_alpha_prod 的维度与 original_samples 相同
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 1 - alpha 累积乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 确保 sqrt_one_minus_alpha_prod 的维度与 original_samples 相同
while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 通过加噪声生成带噪声的样本
noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise
# 返回带噪声的样本
return noisy_samples
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.get_velocity 复制
def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor:
# 确保 alphas_cumprod 和 timestep 的设备和数据类型与 sample 相同
self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device)
# 将 alphas_cumprod 转换为 sample 的数据类型
alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype)
# 将 timesteps 移动到 sample 的设备
timesteps = timesteps.to(sample.device)
# 计算平方根的 alpha 累积乘积
sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
# 将 sqrt_alpha_prod 展平
sqrt_alpha_prod = sqrt_alpha_prod.flatten()
# 确保 sqrt_alpha_prod 的维度与 sample 相同
while len(sqrt_alpha_prod.shape) < len(sample.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
# 计算平方根的 1 - alpha 累积乘积
sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
# 将 sqrt_one_minus_alpha_prod 展平
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
# 确保 sqrt_one_minus_alpha_prod 的维度与 sample 相同
while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape):
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
# 计算速度,即带噪声的样本与原始样本的差异
velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample
# 返回计算得到的速度
return velocity
# 定义获取对象长度的方法
def __len__(self):
# 返回训练时间步的数量
return self.config.num_train_timesteps
# 从 diffusers.schedulers.scheduling_ddpm.DDPMScheduler.previous_timestep 复制
# 获取给定时间步的前一个时间步
def previous_timestep(self, timestep):
# 检查是否使用自定义时间步
if self.custom_timesteps:
# 查找当前时间步在时间步数组中的索引
index = (self.timesteps == timestep).nonzero(as_tuple=True)[0][0]
# 如果当前时间步是最后一个时间步
if index == self.timesteps.shape[0] - 1:
# 将前一个时间步设置为 -1
prev_t = torch.tensor(-1)
else:
# 否则,取当前时间步之后的时间步作为前一个时间步
prev_t = self.timesteps[index + 1]
else:
# 根据推理步骤数量确定前一个时间步的计算方式
num_inference_steps = (
# 如果已定义推理步骤数量,则使用该值,否则使用训练时间步数量
self.num_inference_steps if self.num_inference_steps else self.config.num_train_timesteps
)
# 计算前一个时间步
prev_t = timestep - self.config.num_train_timesteps // num_inference_steps
# 返回前一个时间步
return prev_t
# 版权声明,标明版权所有者和许可证信息
# Copyright (c) 2022 Pablo Pernías MIT License
# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0 (“许可证”) 进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,分发的软件是按“原样”提供的,
# 不提供任何形式的保证或条件。
# 请参阅许可证以了解特定语言所管辖的权限和限制。
# 免责声明:此文件受到 https://github.com/ermongroup/ddim 的强烈影响
# 导入数学模块
import math
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 导入类型相关的类和接口
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从配置和注册相关的模块导入类
from ..configuration_utils import ConfigMixin, register_to_config
# 从工具模块导入 BaseOutput 类
from ..utils import BaseOutput
# 从 PyTorch 工具模块导入 randn_tensor 函数
from ..utils.torch_utils import randn_tensor
# 从调度工具模块导入 SchedulerMixin 类
from .scheduling_utils import SchedulerMixin
# 定义调度器输出类,包含上一时间步的样本
@dataclass
class DDPMWuerstchenSchedulerOutput(BaseOutput):
"""
调度器步骤函数输出的输出类。
参数:
prev_sample (`torch.Tensor`,形状为 `(batch_size, num_channels, height, width)`,用于图像):
计算得到的前一时间步样本 (x_{t-1})。`prev_sample` 应在去噪循环中作为下一个模型输入使用。
"""
# 前一时间步的样本
prev_sample: torch.Tensor
# 定义生成 beta 值的函数,基于 alpha_t_bar 函数
def betas_for_alpha_bar(
num_diffusion_timesteps,
max_beta=0.999,
alpha_transform_type="cosine",
):
"""
创建一个 beta 调度,该调度离散化给定的 alpha_t_bar 函数,定义了
随时间变化的 (1-beta) 的累积乘积,范围为 t = [0,1]。
包含一个 alpha_bar 函数,该函数接受参数 t 并将其转换为 (1-beta) 的累积乘积
直到扩散过程的该部分。
参数:
num_diffusion_timesteps (`int`): 要生成的 beta 数量。
max_beta (`float`): 使用的最大 beta 值;使用小于 1 的值
以防止奇异性。
alpha_transform_type (`str`, *可选*,默认为 `cosine`): alpha_bar 的噪声调度类型。
从 `cosine` 或 `exp` 中选择。
返回:
betas (`np.ndarray`): 调度器用于逐步模型输出的 beta 值
"""
# 根据 alpha_transform_type 选择不同的 alpha_bar 函数
if alpha_transform_type == "cosine":
# 定义基于余弦的 alpha_bar 函数
def alpha_bar_fn(t):
return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2
elif alpha_transform_type == "exp":
# 定义基于指数的 alpha_bar 函数
def alpha_bar_fn(t):
return math.exp(t * -12.0)
else:
# 抛出不支持的 alpha_transform_type 错误
raise ValueError(f"Unsupported alpha_transform_type: {alpha_transform_type}")
# 初始化 beta 列表
betas = []
# 生成 beta 值,遍历每个扩散时间步
for i in range(num_diffusion_timesteps):
# 当前时间步 t1
t1 = i / num_diffusion_timesteps
# 下一个时间步 t2
t2 = (i + 1) / num_diffusion_timesteps
# 计算并添加 beta 值,确保不超过 max_beta
betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta))
# 将 betas 列表转换为 PyTorch 张量,数据类型为 float32
return torch.tensor(betas, dtype=torch.float32)
# 定义 DDPMWuerstchenScheduler 类,继承自 SchedulerMixin 和 ConfigMixin
class DDPMWuerstchenScheduler(SchedulerMixin, ConfigMixin):
"""
Denoising diffusion probabilistic models (DDPMs) explores the connections between denoising score matching and
Langevin dynamics sampling.
[`~ConfigMixin`] takes care of storing all config attributes that are passed in the scheduler's `__init__`
function, such as `num_train_timesteps`. They can be accessed via `scheduler.config.num_train_timesteps`.
[`SchedulerMixin`] provides general loading and saving functionality via the [`SchedulerMixin.save_pretrained`] and
[`~SchedulerMixin.from_pretrained`] functions.
For more details, see the original paper: https://arxiv.org/abs/2006.11239
Args:
scaler (`float`): ....
s (`float`): ....
"""
# 注册配置函数
@register_to_config
def __init__(
# 初始化方法的参数,scaler 和 s,具有默认值
self,
scaler: float = 1.0,
s: float = 0.008,
):
# 将传入的 scaler 值赋给实例变量
self.scaler = scaler
# 将 s 转换为张量并赋值给实例变量
self.s = torch.tensor([s])
# 计算初始 alpha 累积乘积,使用 cos 函数公式
self._init_alpha_cumprod = torch.cos(self.s / (1 + self.s) * torch.pi * 0.5) ** 2
# 设置初始噪声分布的标准差
self.init_noise_sigma = 1.0
# 定义计算 alpha 累积乘积的方法
def _alpha_cumprod(self, t, device):
# 根据 scaler 的值调整时间 t
if self.scaler > 1:
t = 1 - (1 - t) ** self.scaler
elif self.scaler < 1:
t = t**self.scaler
# 计算 alpha 累积乘积
alpha_cumprod = torch.cos(
(t + self.s.to(device)) / (1 + self.s.to(device)) * torch.pi * 0.5
) ** 2 / self._init_alpha_cumprod.to(device)
# 限制 alpha 的范围在 [0.0001, 0.9999]
return alpha_cumprod.clamp(0.0001, 0.9999)
# 定义缩放模型输入的方法
def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor:
"""
Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
current timestep.
Args:
sample (`torch.Tensor`): input sample
timestep (`int`, optional): current timestep
Returns:
`torch.Tensor`: scaled input sample
"""
# 直接返回输入样本,暂时未实现缩放逻辑
return sample
# 定义设置时间步的方法
def set_timesteps(
self,
num_inference_steps: int = None,
timesteps: Optional[List[int]] = None,
device: Union[str, torch.device] = None,
):
"""
Sets the discrete timesteps used for the diffusion chain. Supporting function to be run before inference.
Args:
num_inference_steps (`Dict[float, int]`):
the number of diffusion steps used when generating samples with a pre-trained model. If passed, then
`timesteps` must be `None`.
device (`str` or `torch.device`, optional):
the device to which the timesteps are moved to. {2 / 3: 20, 0.0: 10}
"""
# 如果没有提供时间步,则生成从 1.0 到 0.0 的等间距张量
if timesteps is None:
timesteps = torch.linspace(1.0, 0.0, num_inference_steps + 1, device=device)
# 如果时间步不是张量,则将其转换为张量并移动到指定设备
if not isinstance(timesteps, torch.Tensor):
timesteps = torch.Tensor(timesteps).to(device)
# 将时间步赋值给实例变量
self.timesteps = timesteps
# 定义一个函数,进行反向扩散过程的预测
def step(
self, # self 引用实例对象
model_output: torch.Tensor, # 从学习到的扩散模型直接输出的张量
timestep: int, # 当前扩散链中的离散时间步
sample: torch.Tensor, # 当前扩散过程中生成的样本实例
generator=None, # 随机数生成器,默认为 None
return_dict: bool = True, # 是否返回字典,默认为 True
) -> Union[DDPMWuerstchenSchedulerOutput, Tuple]: # 函数返回类型为 DDPMWuerstchenSchedulerOutput 或元组
"""
通过反向 SDE 预测前一个时间步的样本。核心函数用于从学习到的模型输出传播扩散过程
(通常是预测的噪声)。
参数:
model_output (`torch.Tensor`): 学习到的扩散模型的直接输出。
timestep (`int`): 当前扩散链中的离散时间步。
sample (`torch.Tensor`):
当前扩散过程中生成的样本实例。
generator: 随机数生成器。
return_dict (`bool`): 选择返回元组而不是 DDPMWuerstchenSchedulerOutput 类
返回:
[`DDPMWuerstchenSchedulerOutput`] 或 `tuple`: 如果 `return_dict` 为 True,返回 [`DDPMWuerstchenSchedulerOutput`],
否则返回元组。当返回元组时,第一个元素是样本张量。
"""
# 获取模型输出的张量的数据类型
dtype = model_output.dtype
# 获取模型输出的张量所在设备
device = model_output.device
# 将当前时间步赋值给 t
t = timestep
# 获取前一个时间步
prev_t = self.previous_timestep(t)
# 计算当前时间步的累积 alpha 值,并调整维度
alpha_cumprod = self._alpha_cumprod(t, device).view(t.size(0), *[1 for _ in sample.shape[1:]])
# 计算前一个时间步的累积 alpha 值,并调整维度
alpha_cumprod_prev = self._alpha_cumprod(prev_t, device).view(prev_t.size(0), *[1 for _ in sample.shape[1:]])
# 计算 alpha 值
alpha = alpha_cumprod / alpha_cumprod_prev
# 根据当前样本和模型输出计算均值 mu
mu = (1.0 / alpha).sqrt() * (sample - (1 - alpha) * model_output / (1 - alpha_cumprod).sqrt())
# 生成随机噪声张量 std_noise
std_noise = randn_tensor(mu.shape, generator=generator, device=model_output.device, dtype=model_output.dtype)
# 计算标准差 std
std = ((1 - alpha) * (1.0 - alpha_cumprod_prev) / (1.0 - alpha_cumprod)).sqrt() * std_noise
# 预测前一个时间步的样本
pred = mu + std * (prev_t != 0).float().view(prev_t.size(0), *[1 for _ in sample.shape[1:]])
# 如果不返回字典,则返回元组形式
if not return_dict:
return (pred.to(dtype),)
# 返回 DDPMWuerstchenSchedulerOutput 对象
return DDPMWuerstchenSchedulerOutput(prev_sample=pred.to(dtype))
# 定义一个函数,将噪声添加到原始样本中
def add_noise(
self, # self 引用实例对象
original_samples: torch.Tensor, # 原始样本的张量
noise: torch.Tensor, # 添加的噪声张量
timesteps: torch.Tensor, # 时间步的张量
) -> torch.Tensor: # 函数返回类型为张量
# 获取原始样本所在的设备
device = original_samples.device
# 获取原始样本的数据类型
dtype = original_samples.dtype
# 计算给定时间步的累积 alpha 值,并调整维度
alpha_cumprod = self._alpha_cumprod(timesteps, device=device).view(
timesteps.size(0), *[1 for _ in original_samples.shape[1:]]
)
# 计算添加噪声后的样本
noisy_samples = alpha_cumprod.sqrt() * original_samples + (1 - alpha_cumprod).sqrt() * noise
# 返回噪声样本,调整数据类型
return noisy_samples.to(dtype=dtype)
# 定义一个函数,返回训练时间步的数量
def __len__(self):
# 返回配置中的训练时间步数
return self.config.num_train_timesteps
# 定义一个函数,获取前一个时间步
def previous_timestep(self, timestep):
# 计算与当前时间步的绝对差值并找到索引
index = (self.timesteps - timestep[0]).abs().argmin().item()
# 获取前一个时间步并调整维度
prev_t = self.timesteps[index + 1][None].expand(timestep.shape[0])
# 返回前一个时间步
return prev_t