diffusers 源码解析(四十七)
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_latent_upscale.py
# 版权声明,说明该文件的版权所有者及其保留的权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版的规定使用此文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 您不得在未遵守许可证的情况下使用此文件
# you may not use this file except in compliance with the License.
# 可在以下网址获取许可证副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有约定,软件按“原样”提供
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 无任何形式的保证或条件,无论明示或暗示
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 请查看许可证以获取特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings # 导入警告模块,用于发出警告信息
from typing import Callable, List, Optional, Union # 导入类型提示工具
import numpy as np # 导入 NumPy 库以支持数组操作
import PIL.Image # 导入 PIL 库用于图像处理
import torch # 导入 PyTorch 库用于深度学习操作
import torch.nn.functional as F # 导入 PyTorch 的功能性神经网络模块
from transformers import CLIPTextModel, CLIPTokenizer # 从 transformers 库导入 CLIP 模型和分词器
from ...image_processor import PipelineImageInput, VaeImageProcessor # 从本地模块导入图像处理相关类
from ...loaders import FromSingleFileMixin # 从本地模块导入单文件加载器
from ...models import AutoencoderKL, UNet2DConditionModel # 从本地模块导入模型
from ...schedulers import EulerDiscreteScheduler # 从本地模块导入调度器
from ...utils import deprecate, logging # 从本地模块导入工具函数和日志记录
from ...utils.torch_utils import randn_tensor # 从本地模块导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin # 从管道工具导入相关类
logger = logging.get_logger(__name__) # 创建一个日志记录器,用于记录信息,禁用 pylint 的无效名称警告
# 预处理函数,用于处理输入图像
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.preprocess
def preprocess(image):
# 发出关于该方法已弃用的警告,提示将来版本会移除
warnings.warn(
"The preprocess method is deprecated and will be removed in a future version. Please"
" use VaeImageProcessor.preprocess instead",
FutureWarning,
)
# 如果输入是张量,直接返回
if isinstance(image, torch.Tensor):
return image
# 如果输入是 PIL 图像,将其包装成列表
elif isinstance(image, PIL.Image.Image):
image = [image]
# 检查列表中的第一个元素是否为 PIL 图像
if isinstance(image[0], PIL.Image.Image):
# 获取图像的宽度和高度
w, h = image[0].size
# 将宽度和高度调整为 64 的整数倍
w, h = (x - x % 64 for x in (w, h)) # resize to integer multiple of 64
# 将图像调整为新的大小并转换为数组格式
image = [np.array(i.resize((w, h)))[None, :] for i in image]
# 沿第一个轴连接图像数组
image = np.concatenate(image, axis=0)
# 归一化图像数据并转换数据类型
image = np.array(image).astype(np.float32) / 255.0
# 调整数组维度顺序
image = image.transpose(0, 3, 1, 2)
# 将图像数据从 [0, 1] 范围缩放到 [-1, 1] 范围
image = 2.0 * image - 1.0
# 将 NumPy 数组转换为 PyTorch 张量
image = torch.from_numpy(image)
# 如果列表中的第一个元素是张量,沿着第 0 维度连接它们
elif isinstance(image[0], torch.Tensor):
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
class StableDiffusionLatentUpscalePipeline(DiffusionPipeline, StableDiffusionMixin, FromSingleFileMixin):
r"""
用于将 Stable Diffusion 输出图像分辨率按 2 倍放大的管道。
该模型继承自 [`DiffusionPipeline`]. 有关所有管道通用方法(下载、保存、在特定设备上运行等)的文档,请查看超类文档
The pipeline also inherits the following loading methods:
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
# 定义参数说明
Args:
vae ([`AutoencoderKL`]):
# 用于编码和解码图像的变分自编码器(VAE)模型,处理图像和潜在表示之间的转换
text_encoder ([`~transformers.CLIPTextModel`]):
# 冻结的文本编码器,使用 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 模型
tokenizer ([`~transformers.CLIPTokenizer`]):
# 一个 `CLIPTokenizer` 用于对文本进行分词
unet ([`UNet2DConditionModel`]):
# 用于对编码后的图像潜在表示进行去噪的 `UNet2DConditionModel`
scheduler ([`SchedulerMixin`]):
# 一个 [`EulerDiscreteScheduler`],与 `unet` 配合使用以去噪编码后的图像潜在表示
"""
# 定义模型的计算顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
def __init__(
# 初始化方法,接受多个模型作为参数
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler: EulerDiscreteScheduler,
):
# 调用父类的初始化方法
super().__init__()
# 注册模块,将传入的模型进行存储
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
)
# 计算 VAE 的缩放因子,基于配置中的块输出通道数量
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,用于处理解码后的图像
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, resample="bicubic")
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的方法,用于解码潜在表示
def decode_latents(self, latents):
# 设置弃用信息,提示用户该方法将在1.0.0版本中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 的缩放因子调整潜在表示的值
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在表示,返回图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将解码后的图像归一化到[0, 1]范围
image = (image / 2 + 0.5).clamp(0, 1)
# 转换图像数据为 float32 格式,以确保与 bfloat16 兼容,并避免显著开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 检查输入参数的有效性
def check_inputs(self, prompt, image, callback_steps):
# 如果 `prompt` 不是字符串或列表类型,则引发错误
if not isinstance(prompt, str) and not isinstance(prompt, list):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 如果 `image` 不是张量、PIL图像或列表类型,则引发错误
if (
not isinstance(image, torch.Tensor)
and not isinstance(image, PIL.Image.Image)
and not isinstance(image, list)
):
raise ValueError(
f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or `list` but is {type(image)}"
)
# 验证如果 `image` 是列表或张量,则 `prompt` 和 `image` 的批量大小应相同
if isinstance(image, (list, torch.Tensor)):
# 如果 `prompt` 是字符串,批量大小为1
if isinstance(prompt, str):
batch_size = 1
else:
# 否则,批量大小为 `prompt` 的长度
batch_size = len(prompt)
# 如果 `image` 是列表,获取其批量大小
if isinstance(image, list):
image_batch_size = len(image)
else:
# 否则,获取 `image` 的第一维度大小(批量大小)
image_batch_size = image.shape[0] if image.ndim == 4 else 1
# 如果 `prompt` 和 `image` 的批量大小不匹配,则引发错误
if batch_size != image_batch_size:
raise ValueError(
f"`prompt` has batch size {batch_size} and `image` has batch size {image_batch_size}."
" Please make sure that passed `prompt` matches the batch size of `image`."
)
# 检查 `callback_steps` 是否为正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale 中复制的方法
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状
shape = (batch_size, num_channels_latents, height, width)
# 如果没有提供 `latents`,则生成随机的潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供的 `latents` 形状不匹配,则引发错误
if latents.shape != shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
# 将 `latents` 移动到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 在计算图中不追踪梯度
@torch.no_grad()
# 定义一个可调用的类方法,接收多个参数用于生成图像
def __call__(
self,
# 用户输入的提示文本,可以是字符串或字符串列表
prompt: Union[str, List[str]],
# 输入的图像,默认为 None
image: PipelineImageInput = None,
# 推理步骤的数量,默认为 75
num_inference_steps: int = 75,
# 指导缩放因子,默认为 9.0
guidance_scale: float = 9.0,
# 负提示文本,可以是字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 随机数生成器,可以是单个或多个 torch.Generator,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,默认为 None
latents: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil" (Python Imaging Library)
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认为 True
return_dict: bool = True,
# 回调函数,用于在推理过程中处理某些操作,默认为 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调函数调用的步数间隔,默认为 1
callback_steps: int = 1,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_upscale.py
# 版权声明,表明该代码的版权所有者及相关权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版("许可证")授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,软件
# 在许可证下分发是以“原样”基础提供的,
# 不附带任何明示或暗示的担保或条件。
# 请参阅许可证以了解有关权限和
# 限制的具体信息。
# 导入 inspect 模块,用于获取对象的信息
import inspect
# 导入 warnings 模块,用于发出警告
import warnings
# 从 typing 模块导入类型注解支持
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 PIL 库中的 Image 类,用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 CLIP 相关类
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer
# 从相对路径导入其他处理器和加载器
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import FromSingleFileMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
from ...models import AutoencoderKL, UNet2DConditionModel
# 从注意力处理器模块导入相关类
from ...models.attention_processor import (
AttnProcessor2_0,
XFormersAttnProcessor,
)
# 从 Lora 模块导入调整 Lora 规模的函数
from ...models.lora import adjust_lora_scale_text_encoder
# 从调度器模块导入相关类
from ...schedulers import DDPMScheduler, KarrasDiffusionSchedulers
# 从工具模块导入各种工具函数
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
# 从 Torch 工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入相关类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从当前包导入稳定扩散管道输出
from . import StableDiffusionPipelineOutput
# 创建日志记录器,用于记录当前模块的日志
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义预处理函数,处理输入图像
def preprocess(image):
# 发出警告,提示该方法已弃用,未来版本将被移除
warnings.warn(
"The preprocess method is deprecated and will be removed in a future version. Please"
" use VaeImageProcessor.preprocess instead",
FutureWarning,
)
# 如果输入是张量,直接返回
if isinstance(image, torch.Tensor):
return image
# 如果输入是 PIL 图像,转换为列表
elif isinstance(image, PIL.Image.Image):
image = [image]
# 检查列表中第一个元素是否为 PIL 图像
if isinstance(image[0], PIL.Image.Image):
# 获取图像的宽和高
w, h = image[0].size
# 将宽高调整为 64 的整数倍
w, h = (x - x % 64 for x in (w, h)) # resize to integer multiple of 64
# 调整所有图像的大小并转换为 NumPy 数组
image = [np.array(i.resize((w, h)))[None, :] for i in image]
# 将所有图像数组沿第一个轴连接
image = np.concatenate(image, axis=0)
# 将数组转换为浮点型并归一化到 [0, 1]
image = np.array(image).astype(np.float32) / 255.0
# 调整数组维度顺序为 (N, C, H, W)
image = image.transpose(0, 3, 1, 2)
# 将像素值范围从 [0, 1] 映射到 [-1, 1]
image = 2.0 * image - 1.0
# 将 NumPy 数组转换为 PyTorch 张量
image = torch.from_numpy(image)
# 如果列表中第一个元素是张量,沿着第一个维度连接它们
elif isinstance(image[0], torch.Tensor):
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
# 定义 StableDiffusionUpscalePipeline 类,继承多个基类
class StableDiffusionUpscalePipeline(
DiffusionPipeline,
StableDiffusionMixin,
TextualInversionLoaderMixin,
StableDiffusionLoraLoaderMixin,
FromSingleFileMixin,
):
r"""
使用 Stable Diffusion 2 进行文本引导的图像超分辨率的管道。
此模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道通用方法的实现(下载、保存、在特定设备上运行等)。
# 管道还继承了以下加载方法:
# - `~loaders.TextualInversionLoaderMixin.load_textual_inversion` 用于加载文本反转嵌入
# - `~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights` 用于加载 LoRA 权重
# - `~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights` 用于保存 LoRA 权重
# - `~loaders.FromSingleFileMixin.from_single_file` 用于加载 `.ckpt` 文件
# 参数说明
Args:
vae ([`AutoencoderKL`]): # 变分自编码器(VAE)模型,用于将图像编码和解码为潜在表示
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
text_encoder ([`~transformers.CLIPTextModel`]): # 冻结的文本编码器
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
tokenizer ([`~transformers.CLIPTokenizer`]): # 用于标记化文本的 CLIPTokenizer
A `CLIPTokenizer` to tokenize text.
unet ([`UNet2DConditionModel`]): # 用于去噪编码图像潜在的 UNet2DConditionModel
A `UNet2DConditionModel` to denoise the encoded image latents.
low_res_scheduler ([`SchedulerMixin`]): # 用于向低分辨率条件图像添加初始噪声的调度器,必须是 DDPMScheduler 的实例
A scheduler used to add initial noise to the low resolution conditioning image. It must be an instance of
[`DDPMScheduler`].
scheduler ([`SchedulerMixin`]): # 与 UNet 一起使用的调度器,用于去噪编码图像潜在
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
"""
# 定义 CPU 离线加载的模型顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件的列表
_optional_components = ["watermarker", "safety_checker", "feature_extractor"]
# 定义不包含在 CPU 离线加载中的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法
def __init__( # 定义初始化方法
self,
vae: AutoencoderKL, # 接收变分自编码器(VAE)模型
text_encoder: CLIPTextModel, # 接收文本编码器
tokenizer: CLIPTokenizer, # 接收标记化器
unet: UNet2DConditionModel, # 接收 UNet 模型
low_res_scheduler: DDPMScheduler, # 接收低分辨率调度器
scheduler: KarrasDiffusionSchedulers, # 接收调度器
safety_checker: Optional[Any] = None, # 可选的安全检查器
feature_extractor: Optional[CLIPImageProcessor] = None, # 可选的特征提取器
watermarker: Optional[Any] = None, # 可选的水印处理器
max_noise_level: int = 350, # 最大噪声级别,默认为350
):
# 初始化父类
super().__init__()
# 检查 VAE 是否有配置属性
if hasattr(
vae, "config"
): # 检查 VAE 是否具有配置属性 `scaling_factor`,如果未设置为 0.08333,则设置为 0.08333 并发出弃用警告
# 确定 `scaling_factor` 是否已设置为 0.08333
is_vae_scaling_factor_set_to_0_08333 = (
hasattr(vae.config, "scaling_factor") and vae.config.scaling_factor == 0.08333
)
# 如果 `scaling_factor` 未设置为 0.08333,则执行以下操作
if not is_vae_scaling_factor_set_to_0_08333:
# 创建弃用消息,说明配置问题及建议
deprecation_message = (
"The configuration file of the vae does not contain `scaling_factor` or it is set to"
f" {vae.config.scaling_factor}, which seems highly unlikely. If your checkpoint is a fine-tuned"
" version of `stabilityai/stable-diffusion-x4-upscaler` you should change 'scaling_factor' to"
" 0.08333 Please make sure to update the config accordingly, as not doing so might lead to"
" incorrect results in future versions. If you have downloaded this checkpoint from the Hugging"
" Face Hub, it would be very nice if you could open a Pull Request for the `vae/config.json` file"
)
# 记录弃用警告,并更新 `scaling_factor` 为 0.08333
deprecate("wrong scaling_factor", "1.0.0", deprecation_message, standard_warn=False)
vae.register_to_config(scaling_factor=0.08333)
# 注册各个模块到当前配置中
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
low_res_scheduler=low_res_scheduler,
scheduler=scheduler,
safety_checker=safety_checker,
watermarker=watermarker,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器实例,使用双三次插值法
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, resample="bicubic")
# 将最大噪声水平注册到配置中
self.register_to_config(max_noise_level=max_noise_level)
def run_safety_checker(self, image, device, dtype):
# 如果存在安全检查器,则执行安全检查
if self.safety_checker is not None:
# 对输入图像进行后处理以适配安全检查器
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
# 将后处理后的图像转换为张量,并移动到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 进行安全检查,获取处理后的图像及检测结果
image, nsfw_detected, watermark_detected = self.safety_checker(
images=image,
clip_input=safety_checker_input.pixel_values.to(dtype=dtype),
)
else:
# 如果没有安全检查器,则将检测结果设置为 None
nsfw_detected = None
watermark_detected = None
# 如果存在 UNet 的卸载钩子,则执行卸载操作
if hasattr(self, "unet_offload_hook") and self.unet_offload_hook is not None:
self.unet_offload_hook.offload()
# 返回处理后的图像及检测结果
return image, nsfw_detected, watermark_detected
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的代码
# 定义一个私有方法用于编码提示信息
def _encode_prompt(
self, # 方法的第一个参数,表示实例本身
prompt, # 要编码的提示信息
device, # 设备(如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 可选的负面提示信息
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
**kwargs, # 其他可选参数
):
# 警告信息,表示此方法已被弃用,未来版本将删除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 方法记录弃用信息
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法编码提示,返回一个元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 传递提示信息
device=device, # 传递设备
num_images_per_prompt=num_images_per_prompt, # 传递每个提示的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 传递无分类器引导标志
negative_prompt=negative_prompt, # 传递负面提示
prompt_embeds=prompt_embeds, # 传递提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 传递负面提示嵌入
lora_scale=lora_scale, # 传递 LoRA 缩放因子
**kwargs, # 传递其他参数
)
# 为向后兼容,将元组中的提示嵌入连接在一起
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 类中复制的编码提示方法
def encode_prompt(
self, # 方法的第一个参数,表示实例本身
prompt, # 要编码的提示信息
device, # 设备(如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 可选的负面提示信息
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
clip_skip: Optional[int] = None, # 可选的剪切跳过参数
# 从 StableDiffusionPipeline 类中复制的准备额外步骤参数的方法
def prepare_extra_step_kwargs(self, generator, eta): # 准备额外的调度器步骤参数
# 为调度器步骤准备额外的参数,因为并非所有调度器的签名相同
# eta (η) 仅在 DDIMScheduler 中使用,对其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应在 [0, 1] 之间
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {} # 初始化一个空字典以存储额外参数
if accepts_eta: # 如果接受 eta
extra_step_kwargs["eta"] = eta # 将 eta 添加到字典中
# 检查调度器步骤是否接受生成器参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator: # 如果接受生成器
extra_step_kwargs["generator"] = generator # 将生成器添加到字典中
# 返回准备好的额外步骤参数
return extra_step_kwargs
# 从 StableDiffusionPipeline 类中复制的解码潜在变量的方法
# 解码潜在表示
def decode_latents(self, latents):
# 定义一个弃用提示消息,告知用户该方法即将被移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用函数,记录使用该方法的警告信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 的缩放因子调整潜在表示的值
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在表示,返回的第一项为解码后的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像值缩放到 [0, 1] 范围并进行裁剪
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,以提高兼容性
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 检查输入参数的有效性
def check_inputs(
self,
prompt, # 输入提示
image, # 输入图像
noise_level, # 噪声水平
callback_steps, # 回调步骤
negative_prompt=None, # 可选的负面提示
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
)
# 准备潜在表示
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在表示的形状
shape = (batch_size, num_channels_latents, height, width)
# 如果没有提供潜在表示,随机生成
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供的潜在表示形状不匹配,则引发错误
if latents.shape != shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
# 将潜在表示移动到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在表示
return latents
# 升级 VAE 的数据类型
def upcast_vae(self):
# 获取 VAE 的当前数据类型
dtype = self.vae.dtype
# 将 VAE 转换为 float32 数据类型
self.vae.to(dtype=torch.float32)
# 检查是否使用了特定的注意力处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
),
)
# 如果使用 xformers 或 torch_2_0,则注意力块不需要是 float32,从而节省内存
if use_torch_2_0_or_xformers:
# 将后量化卷积层转换为相同数据类型
self.vae.post_quant_conv.to(dtype)
# 将解码器输入卷积层转换为相同数据类型
self.vae.decoder.conv_in.to(dtype)
# 将解码器中间块转换为相同数据类型
self.vae.decoder.mid_block.to(dtype)
# 关闭梯度计算以节省内存
@torch.no_grad()
# 定义一个可调用的方法,允许传入多个参数进行处理
def __call__(
# 用户输入的提示信息,可以是字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 输入的图像数据,类型为 PipelineImageInput
image: PipelineImageInput = None,
# 推理步骤的数量,默认值为 75
num_inference_steps: int = 75,
# 指导比例,用于控制生成图像的样式,默认值为 9.0
guidance_scale: float = 9.0,
# 噪声级别,影响生成图像的随机性,默认值为 20
noise_level: int = 20,
# 负提示信息,可以是字符串或字符串列表,控制生成图像的方向
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 额外的参数,影响生成过程,默认为 0.0
eta: float = 0.0,
# 随机数生成器,可以是单个或多个 torch.Generator
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 预先计算的潜在表示,类型为 torch.Tensor
latents: Optional[torch.Tensor] = None,
# 预先计算的提示嵌入,类型为 torch.Tensor
prompt_embeds: Optional[torch.Tensor] = None,
# 预先计算的负提示嵌入,类型为 torch.Tensor
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil",表示以 PIL 格式返回
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认为 True
return_dict: bool = True,
# 回调函数,可用于处理生成过程中的状态,返回 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调函数的调用频率,默认为每一步 1 次
callback_steps: int = 1,
# 跨注意力的额外参数,可以为字典类型
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 指定跳过的剪切层级,默认为 None
clip_skip: int = None,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_unclip.py
# 版权信息,表示此文件的版权归 HuggingFace 团队所有
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可证发布的声明
# Licensed under the Apache License, Version 2.0 (the "License");
# 只有在遵循许可证的情况下才能使用此文件
# you may not use this file except in compliance with the License.
# 可在以下链接获取许可证副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非法律适用或书面同意,否则本软件按 "现状" 方式分发
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 不提供任何形式的担保或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 详见许可证中对权限和限制的具体规定
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入 inspect 模块,用于检查对象的属性和方法
import inspect
# 从 typing 模块导入各种类型注解,方便类型检查
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 导入 CLIP 文本模型和分词器
from transformers import CLIPTextModel, CLIPTextModelWithProjection, CLIPTokenizer
# 从 CLIP 模型中导入文本模型输出的类型
from transformers.models.clip.modeling_clip import CLIPTextModelOutput
# 导入图像处理器
from ...image_processor import VaeImageProcessor
# 导入加载器的混合类,用于处理特定加载逻辑
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入自动编码器和其他模型
from ...models import AutoencoderKL, PriorTransformer, UNet2DConditionModel
# 导入时间步嵌入函数
from ...models.embeddings import get_timestep_embedding
# 导入 LoRA 相关的调整函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 导入工具函数,处理各种实用功能
from ...utils import (
USE_PEFT_BACKEND,
deprecate,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
# 导入用于生成随机张量的函数
from ...utils.torch_utils import randn_tensor
# 导入扩散管道相关类
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin
# 导入稳定反向图像归一化器
from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer
# 创建日志记录器,记录该模块的日志信息
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,展示如何使用 StableUnCLIPPipeline
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import StableUnCLIPPipeline
>>> pipe = StableUnCLIPPipeline.from_pretrained(
... "fusing/stable-unclip-2-1-l", torch_dtype=torch.float16
... ) # TODO update model path
>>> pipe = pipe.to("cuda")
>>> prompt = "a photo of an astronaut riding a horse on mars"
>>> images = pipe(prompt).images
>>> images[0].save("astronaut_horse.png")
```py
"""
# 定义 StableUnCLIPPipeline 类,继承多个混合类
class StableUnCLIPPipeline(
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin
):
"""
使用稳定反向 CLIP 的文本到图像生成管道。
此模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
该管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
# 定义函数参数
Args:
prior_tokenizer ([`CLIPTokenizer`]): # 指定用于文本的 CLIP 标记器
A [`CLIPTokenizer`]. # 说明这是一个 CLIP 标记器
prior_text_encoder ([`CLIPTextModelWithProjection`]): # 指定冻结的 CLIP 文本编码器
Frozen [`CLIPTextModelWithProjection`] text-encoder. # 说明这是一个冻结的文本编码器
prior ([`PriorTransformer`]): # 指定用于图像嵌入的 unCLIP 先验模型
The canonical unCLIP prior to approximate the image embedding from the text embedding. # 说明这是一个标准的 unCLIP 先验,用于从文本嵌入近似图像嵌入
prior_scheduler ([`KarrasDiffusionSchedulers`]): # 指定用于去噪过程的调度器
Scheduler used in the prior denoising process. # 说明这是在先验去噪过程中使用的调度器
image_normalizer ([`StableUnCLIPImageNormalizer`]): # 指定用于标准化图像嵌入的标准化器
Used to normalize the predicted image embeddings before the noise is applied and un-normalize the image # 说明用于在应用噪声之前标准化预测的图像嵌入,并在应用噪声后反标准化图像嵌入
embeddings after the noise has been applied. # 继续说明标准化的过程
image_noising_scheduler ([`KarrasDiffusionSchedulers`]): # 指定用于添加噪声的调度器
Noise schedule for adding noise to the predicted image embeddings. The amount of noise to add is determined # 说明这是用于对预测的图像嵌入添加噪声的调度器,噪声量由 `noise_level` 决定
by the `noise_level`. # 说明噪声量的确定依据
tokenizer ([`CLIPTokenizer`]): # 指定用于文本的 CLIP 标记器
A [`CLIPTokenizer`]. # 说明这是一个 CLIP 标记器
text_encoder ([`CLIPTextModel`]): # 指定冻结的 CLIP 文本编码器
Frozen [`CLIPTextModel`] text-encoder. # 说明这是一个冻结的文本编码器
unet ([`UNet2DConditionModel`]): # 指定用于去噪的 UNet 模型
A [`UNet2DConditionModel`] to denoise the encoded image latents. # 说明这是一个用于去噪编码图像潜变量的 UNet 模型
scheduler ([`KarrasDiffusionSchedulers`]): # 指定与 UNet 结合使用的调度器
A scheduler to be used in combination with `unet` to denoise the encoded image latents. # 说明这是与 UNet 结合使用的去噪调度器
vae ([`AutoencoderKL`]): # 指定变分自编码器模型
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations. # 说明这是一个变分自编码器模型,用于将图像编码和解码为潜在表示
""" # 结束文档字符串
_exclude_from_cpu_offload = ["prior", "image_normalizer"] # 指定不进行 CPU 卸载的组件列表
model_cpu_offload_seq = "text_encoder->prior_text_encoder->unet->vae" # 定义模型的 CPU 卸载顺序
# prior components # 注释说明以下是先验组件
prior_tokenizer: CLIPTokenizer # 声明 prior_tokenizer 为 CLIPTokenizer 类型
prior_text_encoder: CLIPTextModelWithProjection # 声明 prior_text_encoder 为 CLIPTextModelWithProjection 类型
prior: PriorTransformer # 声明 prior 为 PriorTransformer 类型
prior_scheduler: KarrasDiffusionSchedulers # 声明 prior_scheduler 为 KarrasDiffusionSchedulers 类型
# image noising components # 注释说明以下是图像噪声组件
image_normalizer: StableUnCLIPImageNormalizer # 声明 image_normalizer 为 StableUnCLIPImageNormalizer 类型
image_noising_scheduler: KarrasDiffusionSchedulers # 声明 image_noising_scheduler 为 KarrasDiffusionSchedulers 类型
# regular denoising components # 注释说明以下是常规去噪组件
tokenizer: CLIPTokenizer # 声明 tokenizer 为 CLIPTokenizer 类型
text_encoder: CLIPTextModel # 声明 text_encoder 为 CLIPTextModel 类型
unet: UNet2DConditionModel # 声明 unet 为 UNet2DConditionModel 类型
scheduler: KarrasDiffusionSchedulers # 声明 scheduler 为 KarrasDiffusionSchedulers 类型
vae: AutoencoderKL # 声明 vae 为 AutoencoderKL 类型
def __init__( # 定义构造函数
self, # 指向实例本身
# prior components # 注释说明以下是先验组件
prior_tokenizer: CLIPTokenizer, # 指定构造函数参数 prior_tokenizer 为 CLIPTokenizer 类型
prior_text_encoder: CLIPTextModelWithProjection, # 指定构造函数参数 prior_text_encoder 为 CLIPTextModelWithProjection 类型
prior: PriorTransformer, # 指定构造函数参数 prior 为 PriorTransformer 类型
prior_scheduler: KarrasDiffusionSchedulers, # 指定构造函数参数 prior_scheduler 为 KarrasDiffusionSchedulers 类型
# image noising components # 注释说明以下是图像噪声组件
image_normalizer: StableUnCLIPImageNormalizer, # 指定构造函数参数 image_normalizer 为 StableUnCLIPImageNormalizer 类型
image_noising_scheduler: KarrasDiffusionSchedulers, # 指定构造函数参数 image_noising_scheduler 为 KarrasDiffusionSchedulers 类型
# regular denoising components # 注释说明以下是常规去噪组件
tokenizer: CLIPTokenizer, # 指定构造函数参数 tokenizer 为 CLIPTokenizer 类型
text_encoder: CLIPTextModelWithProjection, # 指定构造函数参数 text_encoder 为 CLIPTextModelWithProjection 类型
unet: UNet2DConditionModel, # 指定构造函数参数 unet 为 UNet2DConditionModel 类型
scheduler: KarrasDiffusionSchedulers, # 指定构造函数参数 scheduler 为 KarrasDiffusionSchedulers 类型
# vae # 注释说明以下是变分自编码器
vae: AutoencoderKL, # 指定构造函数参数 vae 为 AutoencoderKL 类型
# 结束括号,表示类构造函数的参数列表结束
):
# 调用父类构造函数
super().__init__()
# 注册所需模块及其参数
self.register_modules(
prior_tokenizer=prior_tokenizer, # 注册先前的分词器
prior_text_encoder=prior_text_encoder, # 注册先前的文本编码器
prior=prior, # 注册先前模型
prior_scheduler=prior_scheduler, # 注册先前的调度器
image_normalizer=image_normalizer, # 注册图像归一化器
image_noising_scheduler=image_noising_scheduler, # 注册图像噪声调度器
tokenizer=tokenizer, # 注册当前的分词器
text_encoder=text_encoder, # 注册当前的文本编码器
unet=unet, # 注册 U-Net 模型
scheduler=scheduler, # 注册调度器
vae=vae, # 注册变分自编码器
)
# 计算 VAE 的缩放因子,基于其配置中的输出通道数
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 从 UnCLIPPipeline 复制的方法,调整为处理先前的提示
def _encode_prior_prompt(
self,
prompt, # 输入提示
device, # 设备信息
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器自由引导
text_model_output: Optional[Union[CLIPTextModelOutput, Tuple]] = None, # 可选的文本模型输出
text_attention_mask: Optional[torch.Tensor] = None, # 可选的文本注意力掩码
# 从 StableDiffusionPipeline 复制的方法,调整为处理当前的提示
def _encode_prompt(
self,
prompt, # 输入提示
device, # 设备信息
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器自由引导
negative_prompt=None, # 可选的负面提示
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 Lora 缩放因子
**kwargs, # 其他可选参数
):
# 发出弃用警告,提示将来版本中移除此方法
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用新的编码提示方法,生成嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 输入提示
device=device, # 设备信息
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器自由引导
negative_prompt=negative_prompt, # 负面提示
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负面提示嵌入
lora_scale=lora_scale, # Lora 缩放因子
**kwargs, # 其他参数
)
# 将嵌入元组的内容连接以兼容旧版本
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回合并后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 复制的方法,用于编码提示
# 定义一个编码提示的函数
def encode_prompt(
# 提示内容
self,
prompt,
# 设备信息
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否进行无分类器自由引导
do_classifier_free_guidance,
# 可选的负面提示
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 Lora 缩放因子
lora_scale: Optional[float] = None,
# 可选的剪辑跳过参数
clip_skip: Optional[int] = None,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
def decode_latents(self, latents):
# 定义过时消息
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用 deprecate 函数以显示警告信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 配置缩放因子调整潜变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜变量生成图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像数据归一化到 [0, 1] 之间
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式以便兼容 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_prior_extra_step_kwargs(self, generator, eta):
# 准备用于 prior_scheduler 步骤的额外参数,因为并非所有的 prior_schedulers 具有相同的参数签名
# eta 仅在 DDIMScheduler 中使用,其他 prior_schedulers 会忽略它
# eta 在 DDIM 论文中的对应符号为 η: https://arxiv.org/abs/2010.02502
# eta 的值应在 [0, 1] 之间
# 检查 prior_scheduler 是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.prior_scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果接受 eta,添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查 prior_scheduler 是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.prior_scheduler.step).parameters.keys())
# 如果接受 generator,添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
# 定义准备额外参数的函数,供调度器步骤使用
def prepare_extra_step_kwargs(self, generator, eta):
# 准备调度器步骤所需的额外参数,因为不同调度器的参数签名不同
# eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 其值应在 [0, 1] 之间
# 检查调度器的步骤函数是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化存储额外参数的字典
extra_step_kwargs = {}
# 如果接受 eta 参数,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤函数是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator 参数,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回构建好的额外参数字典
return extra_step_kwargs
# 定义检查输入参数的函数
def check_inputs(
self,
prompt, # 输入提示
height, # 生成图像的高度
width, # 生成图像的宽度
callback_steps, # 回调的步数
noise_level, # 噪声级别
negative_prompt=None, # 可选的负面提示
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
):
# 检查高度和宽度是否为8的倍数,如果不是,则抛出值错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步骤是否为正整数,如果不是,则抛出值错误
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查是否同时提供了提示和提示嵌入,如果是,则抛出值错误
if prompt is not None and prompt_embeds is not None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Please make sure to define only one of the two."
)
# 检查提示和提示嵌入是否都未定义,如果是,则抛出值错误
if prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示是否为字符串或列表,如果不是,则抛出值错误
if prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了负提示和负提示嵌入,如果是,则抛出值错误
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
"Provide either `negative_prompt` or `negative_prompt_embeds`. Cannot leave both `negative_prompt` and `negative_prompt_embeds` undefined."
)
# 检查是否同时提供了提示和负提示,如果是,则检查类型是否一致
if prompt is not None and negative_prompt is not None:
if type(prompt) is not type(negative_prompt):
raise TypeError(
f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="
f" {type(prompt)}."
)
# 检查提示嵌入和负提示嵌入的形状是否一致,如果不一致则抛出值错误
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 检查噪声级别是否在有效范围内,如果不在,则抛出值错误
if noise_level < 0 or noise_level >= self.image_noising_scheduler.config.num_train_timesteps:
raise ValueError(
f"`noise_level` must be between 0 and {self.image_noising_scheduler.config.num_train_timesteps - 1}, inclusive."
)
# 从 diffusers.pipelines.unclip.pipeline_unclip.UnCLIPPipeline.prepare_latents 复制的代码
# 准备潜在向量,参数包括形状、数据类型、设备、生成器、潜在向量和调度器
def prepare_latents(self, shape, dtype, device, generator, latents, scheduler):
# 如果没有给定潜在向量,则随机生成一个
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 检查给定潜在向量的形状是否与预期形状一致
if latents.shape != shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
# 将潜在向量移动到指定设备
latents = latents.to(device)
# 将潜在向量与调度器的初始噪声标准差相乘
latents = latents * scheduler.init_noise_sigma
# 返回处理后的潜在向量
return latents
# 为图像嵌入添加噪声,噪声量由噪声级别控制
def noise_image_embeddings(
self,
image_embeds: torch.Tensor,
noise_level: int,
noise: Optional[torch.Tensor] = None,
generator: Optional[torch.Generator] = None,
):
"""
向图像嵌入添加噪声,噪声量由 `noise_level` 输入控制。较高的
`noise_level` 会增加最终未去噪图像的方差。
噪声的应用有两种方式:
1. 噪声调度直接应用于嵌入。
2. 将正弦时间嵌入向量附加到输出。
在这两种情况下,噪声量均由相同的 `noise_level` 控制。
嵌入在应用噪声之前会被归一化,应用噪声后会被反归一化。
"""
# 如果没有提供噪声,则随机生成噪声
if noise is None:
noise = randn_tensor(
image_embeds.shape, generator=generator, device=image_embeds.device, dtype=image_embeds.dtype
)
# 创建与图像嵌入数量相同的噪声级别张量
noise_level = torch.tensor([noise_level] * image_embeds.shape[0], device=image_embeds.device)
# 将图像归一化器移动到图像嵌入所在设备
self.image_normalizer.to(image_embeds.device)
# 对图像嵌入进行归一化处理
image_embeds = self.image_normalizer.scale(image_embeds)
# 使用噪声调度器将噪声添加到图像嵌入
image_embeds = self.image_noising_scheduler.add_noise(image_embeds, timesteps=noise_level, noise=noise)
# 对图像嵌入进行反归一化处理
image_embeds = self.image_normalizer.unscale(image_embeds)
# 获取时间步嵌入,控制噪声的时间步
noise_level = get_timestep_embedding(
timesteps=noise_level, embedding_dim=image_embeds.shape[-1], flip_sin_to_cos=True, downscale_freq_shift=0
)
# 将时间步嵌入转换为与图像嵌入相同的数据类型
noise_level = noise_level.to(image_embeds.dtype)
# 将时间步嵌入与图像嵌入拼接在一起
image_embeds = torch.cat((image_embeds, noise_level), 1)
# 返回包含噪声的图像嵌入
return image_embeds
# 禁用梯度计算以提高推理性能
@torch.no_grad()
# 用示例文档字符串替换当前文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义一个可调用的方法,用于处理去噪过程
def __call__(
self,
# 正常的去噪过程参数
# 提示文本,可以是字符串或字符串列表,默认为 None
prompt: Optional[Union[str, List[str]]] = None,
# 输出图像的高度,默认为 None
height: Optional[int] = None,
# 输出图像的宽度,默认为 None
width: Optional[int] = None,
# 推理步骤的数量,默认为 20
num_inference_steps: int = 20,
# 引导比例,默认为 10.0
guidance_scale: float = 10.0,
# 负提示文本,可以是字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 控制噪声的参数,默认为 0.0
eta: float = 0.0,
# 用于随机数生成的生成器,默认为 None
generator: Optional[torch.Generator] = None,
# 潜在变量张量,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示嵌入张量,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示嵌入张量,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式,默认为 True
return_dict: bool = True,
# 可选的回调函数,接收步骤和张量,默认为 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调函数调用的步骤间隔,默认为 1
callback_steps: int = 1,
# 跨注意力的可选参数,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 噪声水平,默认为 0
noise_level: int = 0,
# 先验参数
# 先验推理步骤的数量,默认为 25
prior_num_inference_steps: int = 25,
# 先验引导比例,默认为 4.0
prior_guidance_scale: float = 4.0,
# 先验潜在变量张量,默认为 None
prior_latents: Optional[torch.Tensor] = None,
# 可选的跳过剪辑步骤,默认为 None
clip_skip: Optional[int] = None,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_unclip_img2img.py
# 版权声明,2024年由 HuggingFace 团队保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守该许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有约定,根据许可证分发的软件是按“原样”提供的,
# 不提供任何种类的担保或条件,无论是明示还是暗示。
# 请参阅许可证以了解特定语言规定的权限和限制。
import inspect # 导入 inspect 模块以获取有关活跃对象的信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示,便于静态类型检查
import PIL.Image # 导入 PIL.Image 用于图像处理
import torch # 导入 PyTorch 以使用深度学习功能
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection # 从 transformers 导入 CLIP 相关模型和处理器
from ...image_processor import VaeImageProcessor # 导入 VAE 图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入稳定扩散和文本反转加载混合器
from ...models import AutoencoderKL, UNet2DConditionModel # 导入模型类
from ...models.embeddings import get_timestep_embedding # 导入获取时间步嵌入的函数
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LoRA 缩放的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
from ...utils import ( # 导入工具函数
USE_PEFT_BACKEND, # 导入用于 PEFT 后端的常量
deprecate, # 导入用于标记弃用功能的装饰器
logging, # 导入日志记录模块
replace_example_docstring, # 导入用于替换示例文档字符串的函数
scale_lora_layers, # 导入缩放 LoRA 层的函数
unscale_lora_layers, # 导入取消缩放 LoRA 层的函数
)
from ...utils.torch_utils import randn_tensor # 从自定义 PyTorch 工具中导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput, StableDiffusionMixin # 导入扩散管道及其输出类
from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer # 导入稳定 UnCLIP 图像归一化器
logger = logging.get_logger(__name__) # 创建一个用于当前模块的日志记录器,禁用 pylint 名称检查
EXAMPLE_DOC_STRING = """ # 定义示例文档字符串,用于展示如何使用管道
Examples: # 示例的说明
```py # 示例代码块的开始
>>> import requests # 导入 requests 库用于发起 HTTP 请求
>>> import torch # 导入 PyTorch 库
>>> from PIL import Image # 从 PIL 导入图像处理模块
>>> from io import BytesIO # 导入 BytesIO 用于处理字节流
>>> from diffusers import StableUnCLIPImg2ImgPipeline # 从 diffusers 导入 StableUnCLIPImg2ImgPipeline 类
>>> pipe = StableUnCLIPImg2ImgPipeline.from_pretrained( # 从预训练模型加载管道
... "stabilityai/stable-diffusion-2-1-unclip-small", torch_dtype=torch.float16 # 指定模型名称和数据类型
... )
>>> pipe = pipe.to("cuda") # 将管道移动到 CUDA 设备以加速计算
>>> url = "https://raw.githubusercontent.com/CompVis/stable-diffusion/main/assets/stable-samples/img2img/sketch-mountains-input.jpg" # 定义要加载的图像 URL
>>> response = requests.get(url) # 发起 GET 请求以获取图像
>>> init_image = Image.open(BytesIO(response.content)).convert("RGB") # 打开图像并转换为 RGB 格式
>>> init_image = init_image.resize((768, 512)) # 将图像调整为指定大小
>>> prompt = "A fantasy landscape, trending on artstation" # 定义生成图像的提示文本
>>> images = pipe(init_image, prompt).images # 使用管道生成图像
>>> images[0].save("fantasy_landscape.png") # 保存生成的图像为 PNG 文件
```py # 示例代码块的结束
"""
class StableUnCLIPImg2ImgPipeline( # 定义 StableUnCLIPImg2ImgPipeline 类
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin # 继承多个混合器以实现功能组合
):
"""
使用稳定的 unCLIP 进行文本引导的图像到图像生成的管道。 # 类文档字符串,说明该类的功能
该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道的通用方法(下载、保存、在特定设备上运行等)。 # 提供有关超类的信息
# 管道还继承以下加载方法:
# - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
# - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
# - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
# 参数说明:
# feature_extractor ([`CLIPImageProcessor`]):
# 图像预处理的特征提取器,编码之前使用。
# image_encoder ([`CLIPVisionModelWithProjection`]):
# CLIP 视觉模型,用于编码图像。
# image_normalizer ([`StableUnCLIPImageNormalizer`]):
# 用于在添加噪声之前规范化预测的图像嵌入,并在添加噪声后反规范化图像嵌入。
# image_noising_scheduler ([`KarrasDiffusionSchedulers`]):
# 添加噪声到预测的图像嵌入的噪声调度器,噪声量由 `noise_level` 决定。
# tokenizer (`~transformers.CLIPTokenizer`):
# 一个 [`~transformers.CLIPTokenizer`]。
# text_encoder ([`~transformers.CLIPTextModel`]):
# 冻结的 [`~transformers.CLIPTextModel`] 文本编码器。
# unet ([`UNet2DConditionModel`]):
# 用于去噪编码图像潜变量的 [`UNet2DConditionModel`]。
# scheduler ([`KarrasDiffusionSchedulers`]):
# 与 `unet` 结合使用的调度器,用于去噪编码图像潜变量。
# vae ([`AutoencoderKL`]):
# 变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。
# """
# 定义模型在 CPU 上卸载的顺序
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
# 要从 CPU 卸载中排除的组件
_exclude_from_cpu_offload = ["image_normalizer"]
# 图像编码组件
feature_extractor: CLIPImageProcessor
image_encoder: CLIPVisionModelWithProjection
# 图像加噪声组件
image_normalizer: StableUnCLIPImageNormalizer
image_noising_scheduler: KarrasDiffusionSchedulers
# 常规去噪组件
tokenizer: CLIPTokenizer
text_encoder: CLIPTextModel
unet: UNet2DConditionModel
scheduler: KarrasDiffusionSchedulers
# 变分自编码器
vae: AutoencoderKL
# 初始化方法
def __init__(
# 图像编码组件
feature_extractor: CLIPImageProcessor,
image_encoder: CLIPVisionModelWithProjection,
# 图像加噪声组件
image_normalizer: StableUnCLIPImageNormalizer,
image_noising_scheduler: KarrasDiffusionSchedulers,
# 常规去噪组件
tokenizer: CLIPTokenizer,
text_encoder: CLIPTextModel,
unet: UNet2DConditionModel,
scheduler: KarrasDiffusionSchedulers,
# 变分自编码器
vae: AutoencoderKL,
# 初始化父类
):
super().__init__()
# 注册模块,包括特征提取器、图像编码器等
self.register_modules(
feature_extractor=feature_extractor,
image_encoder=image_encoder,
image_normalizer=image_normalizer,
image_noising_scheduler=image_noising_scheduler,
tokenizer=tokenizer,
text_encoder=text_encoder,
unet=unet,
scheduler=scheduler,
vae=vae,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 从 StableDiffusionPipeline 复制的编码提示函数
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
**kwargs,
):
# 警告用户该函数已弃用,未来版本将被移除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 函数以编码提示
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt,
device=device,
num_images_per_prompt=num_images_per_prompt,
do_classifier_free_guidance=do_classifier_free_guidance,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
lora_scale=lora_scale,
**kwargs,
)
# 连接提示嵌入以便于向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 编码图像的函数
def _encode_image(
self,
image,
device,
batch_size,
num_images_per_prompt,
do_classifier_free_guidance,
noise_level,
generator,
image_embeds,
):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 检查输入的图像是否为 PIL 图像类型
if isinstance(image, PIL.Image.Image):
# 如果是 PIL 图像,则重复次数与批量大小相同
repeat_by = batch_size
else:
# 否则,假设图像输入已经正确批处理,只需重复以匹配每个提示的图像数量
#
# 注意:这可能缺少一些边缘情况,比如已批处理和未批处理的 `image_embeds`。
# 如果这些情况比较常见,需要仔细考虑输入的预期维度及其编码处理。
repeat_by = num_images_per_prompt
# 检查图像嵌入是否为 None
if image_embeds is None:
# 如果输入图像不是张量,则使用特征提取器将其转换为张量
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(images=image, return_tensors="pt").pixel_values
# 将图像转移到指定设备,并转换为适当的数据类型
image = image.to(device=device, dtype=dtype)
# 使用图像编码器对图像进行编码,得到图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 对图像嵌入应用噪声图像嵌入处理
image_embeds = self.noise_image_embeddings(
image_embeds=image_embeds,
noise_level=noise_level,
generator=generator,
)
# 为每个生成的提示复制图像嵌入,使用适合 mps 的方法
image_embeds = image_embeds.unsqueeze(1)
# 获取嵌入的批量大小、序列长度和最后一个维度
bs_embed, seq_len, _ = image_embeds.shape
# 根据 repeat_by 重复图像嵌入
image_embeds = image_embeds.repeat(1, repeat_by, 1)
# 重新调整图像嵌入的形状
image_embeds = image_embeds.view(bs_embed * repeat_by, seq_len, -1)
# 挤出多余的维度
image_embeds = image_embeds.squeeze(1)
# 如果需要进行无分类器引导
if do_classifier_free_guidance:
# 创建与图像嵌入形状相同的零张量作为负提示嵌入
negative_prompt_embeds = torch.zeros_like(image_embeds)
# 对于无分类器引导,我们需要进行两次前向传播
# 这里将无条件和文本嵌入拼接到一个批次中,以避免进行两次前向传播
image_embeds = torch.cat([negative_prompt_embeds, image_embeds])
# 返回处理后的图像嵌入
return image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
def encode_prompt(
self,
# 输入的提示文本
prompt,
# 设备类型
device,
# 每个提示的图像数量
num_images_per_prompt,
# 是否进行无分类器引导
do_classifier_free_guidance,
# 可选的负提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LoRA 缩放因子
lora_scale: Optional[float] = None,
# 可选的剪切跳过参数
clip_skip: Optional[int] = None,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的部分
# 解码潜在向量
def decode_latents(self, latents):
# 定义过时警告信息,提示用户该方法将在 1.0.0 中被移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 触发过时警告,告知用户替代方法
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据缩放因子调整潜在向量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在向量,返回的第一个元素是图像数据
image = self.vae.decode(latents, return_dict=False)[0]
# 归一化图像数据到 [0, 1] 范围
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像数据从 GPU 转移到 CPU,调整维度顺序并转换为 float32 格式,返回为 NumPy 数组
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的方法
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的参数,因为并非所有调度器都有相同的签名
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应该在 [0, 1] 范围内
# 检查调度器是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建额外参数字典
extra_step_kwargs = {}
# 如果接受 eta,添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回额外参数字典
return extra_step_kwargs
# 检查输入参数
def check_inputs(
self,
prompt, # 输入的提示文本
image, # 输入的图像数据
height, # 图像高度
width, # 图像宽度
callback_steps, # 回调步骤
noise_level, # 噪声水平
negative_prompt=None, # 可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
image_embeds=None, # 可选的图像嵌入
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的部分
# 准备潜在空间的张量,输入包括批量大小、通道数、图像高度和宽度等参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 根据输入参数计算潜在张量的形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器是否为列表且长度与批量大小匹配,若不匹配则抛出错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在张量,则生成一个随机的潜在张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在张量,则将其移动到指定设备
latents = latents.to(device)
# 根据调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在张量
return latents
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_unclip.StableUnCLIPPipeline.noise_image_embeddings 复制的函数
def noise_image_embeddings(
self,
image_embeds: torch.Tensor,
noise_level: int,
noise: Optional[torch.Tensor] = None,
generator: Optional[torch.Generator] = None,
):
"""
向图像嵌入添加噪声。噪声的量由 `noise_level` 输入控制。较高的
`noise_level` 增加最终无噪声图像的方差。
噪声通过两种方式应用:
1. 噪声调度直接应用于嵌入。
2. 一个正弦时间嵌入向量附加到输出中。
在这两种情况下,噪声的量由相同的 `noise_level` 控制。
在应用噪声之前,嵌入会被归一化,在应用噪声之后再进行反归一化。
"""
# 如果未提供噪声,则生成与图像嵌入形状相同的随机噪声
if noise is None:
noise = randn_tensor(
# 生成随机噪声的形状与图像嵌入相同
image_embeds.shape, generator=generator, device=image_embeds.device, dtype=image_embeds.dtype
)
# 创建一个与图像嵌入数量相同的噪声水平张量
noise_level = torch.tensor([noise_level] * image_embeds.shape[0], device=image_embeds.device)
# 将图像归一化器移动到与图像嵌入相同的设备
self.image_normalizer.to(image_embeds.device)
# 对图像嵌入进行归一化处理
image_embeds = self.image_normalizer.scale(image_embeds)
# 向图像嵌入添加噪声,使用噪声调度器
image_embeds = self.image_noising_scheduler.add_noise(image_embeds, timesteps=noise_level, noise=noise)
# 对添加噪声后的图像嵌入进行反归一化处理
image_embeds = self.image_normalizer.unscale(image_embeds)
# 获取时间步嵌入,并将其应用于图像嵌入
noise_level = get_timestep_embedding(
# 传入时间步、嵌入维度等参数来生成时间步嵌入
timesteps=noise_level, embedding_dim=image_embeds.shape[-1], flip_sin_to_cos=True, downscale_freq_shift=0
)
# `get_timestep_embeddings` 不包含任何权重,始终返回 f32 张量,
# 但我们可能在 fp16 中运行,因此需要在这里转换类型。
# 可能有更好的封装方式。
noise_level = noise_level.to(image_embeds.dtype)
# 将时间步嵌入与图像嵌入在维度1上进行拼接
image_embeds = torch.cat((image_embeds, noise_level), 1)
# 返回处理后的图像嵌入
return image_embeds
# 不计算梯度以节省内存和加快计算
@torch.no_grad()
# 用于替换示例文档字符串的装饰器
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 接收的图像,可以是张量或 PIL 图像
image: Union[torch.Tensor, PIL.Image.Image] = None,
# 用户提供的提示文本,可以是字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 生成图像的高度,可选
height: Optional[int] = None,
# 生成图像的宽度,可选
width: Optional[int] = None,
# 推理步骤的数量,默认为20
num_inference_steps: int = 20,
# 引导缩放因子,控制生成图像与提示的匹配程度
guidance_scale: float = 10,
# 可选的负面提示文本,可以是字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# 调整生成图像的随机性参数,默认为0.0
eta: float = 0.0,
# 可选的随机数生成器
generator: Optional[torch.Generator] = None,
# 可选的潜在张量,通常用于输入
latents: Optional[torch.Tensor] = None,
# 可选的提示嵌入张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,可选,默认为"pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式的输出,默认为True
return_dict: bool = True,
# 可选的回调函数,在特定步骤调用
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调函数调用的步数间隔
callback_steps: int = 1,
# 可选的交叉注意力参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 噪声水平,控制噪声的强度
noise_level: int = 0,
# 可选的图像嵌入张量
image_embeds: Optional[torch.Tensor] = None,
# 可选的跳过的剪辑步骤
clip_skip: Optional[int] = None,
.\diffusers\pipelines\stable_diffusion\safety_checker.py
# 版权声明,表明该代码的版权所有者及其权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 在 Apache 2.0 许可证下授权("许可证");
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,
# 否则根据许可证分发的软件是按“原样”基础提供的,
# 不提供任何明示或暗示的担保或条件。
# 有关许可证所 governing 权限和限制的详细信息,请参见许可证。
# 导入 NumPy 库,用于数值计算
import numpy as np
# 导入 PyTorch 库
import torch
# 导入 PyTorch 神经网络模块
import torch.nn as nn
# 从 transformers 库导入 CLIP 配置、视觉模型和预训练模型
from transformers import CLIPConfig, CLIPVisionModel, PreTrainedModel
# 从上级目录导入 logging 工具
from ...utils import logging
# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)
# 定义计算余弦距离的函数
def cosine_distance(image_embeds, text_embeds):
# 对图像嵌入进行归一化处理
normalized_image_embeds = nn.functional.normalize(image_embeds)
# 对文本嵌入进行归一化处理
normalized_text_embeds = nn.functional.normalize(text_embeds)
# 返回归一化的图像嵌入和文本嵌入的矩阵乘法结果
return torch.mm(normalized_image_embeds, normalized_text_embeds.t())
# 定义 StableDiffusionSafetyChecker 类,继承自 PreTrainedModel
class StableDiffusionSafetyChecker(PreTrainedModel):
# 设置配置类为 CLIPConfig
config_class = CLIPConfig
# 指定主要输入名称为 "clip_input"
main_input_name = "clip_input"
# 指定不需要拆分的模块列表
_no_split_modules = ["CLIPEncoderLayer"]
# 初始化方法,接收 CLIPConfig 配置
def __init__(self, config: CLIPConfig):
# 调用父类的初始化方法
super().__init__(config)
# 初始化视觉模型,使用配置中的视觉部分
self.vision_model = CLIPVisionModel(config.vision_config)
# 创建线性层进行视觉投影,输入维度为 hidden_size,输出维度为 projection_dim
self.visual_projection = nn.Linear(config.vision_config.hidden_size, config.projection_dim, bias=False)
# 创建概念嵌入的可学习参数,维度为 (17, projection_dim),并设置为不需要梯度更新
self.concept_embeds = nn.Parameter(torch.ones(17, config.projection_dim), requires_grad=False)
# 创建特殊关心的嵌入参数,维度为 (3, projection_dim),并设置为不需要梯度更新
self.special_care_embeds = nn.Parameter(torch.ones(3, config.projection_dim), requires_grad=False)
# 创建概念嵌入权重参数,维度为 (17),并设置为不需要梯度更新
self.concept_embeds_weights = nn.Parameter(torch.ones(17), requires_grad=False)
# 创建特殊关心嵌入权重参数,维度为 (3),并设置为不需要梯度更新
self.special_care_embeds_weights = nn.Parameter(torch.ones(3), requires_grad=False)
# 该装饰器指示后续方法不需要计算梯度
@torch.no_grad()
# 前向传播方法,接收 CLIP 输入和图像
def forward(self, clip_input, images):
# 使用视觉模型处理 CLIP 输入,获取池化输出
pooled_output = self.vision_model(clip_input)[1] # pooled_output
# 将池化输出通过视觉投影层,生成图像嵌入
image_embeds = self.visual_projection(pooled_output)
# 始终将结果转换为 float32,避免显著的开销,且与 bfloat16 兼容
special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds).cpu().float().numpy()
# 计算图像嵌入与概念嵌入之间的余弦距离
cos_dist = cosine_distance(image_embeds, self.concept_embeds).cpu().float().numpy()
# 初始化结果列表
result = []
# 获取批次大小
batch_size = image_embeds.shape[0]
# 遍历每一张图像
for i in range(batch_size):
# 初始化结果字典,包含特殊分数、特殊关心和概念分数
result_img = {"special_scores": {}, "special_care": [], "concept_scores": {}, "bad_concepts": []}
# 增加此值以创建更强的 `nfsw` 过滤器
# 代价是增加过滤良性图像的可能性
adjustment = 0.0
# 遍历每个特殊概念的余弦距离
for concept_idx in range(len(special_cos_dist[0])):
# 获取当前概念的余弦距离
concept_cos = special_cos_dist[i][concept_idx]
# 获取当前概念的阈值
concept_threshold = self.special_care_embeds_weights[concept_idx].item()
# 计算并存储特殊分数
result_img["special_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
# 如果特殊分数大于0,添加到特殊关心列表
if result_img["special_scores"][concept_idx] > 0:
result_img["special_care"].append({concept_idx, result_img["special_scores"][concept_idx]})
# 增加调整值
adjustment = 0.01
# 遍历每个概念的余弦距离
for concept_idx in range(len(cos_dist[0])):
# 获取当前概念的余弦距离
concept_cos = cos_dist[i][concept_idx]
# 获取当前概念的阈值
concept_threshold = self.concept_embeds_weights[concept_idx].item()
# 计算并存储概念分数
result_img["concept_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
# 如果概念分数大于0,添加到不良概念列表
if result_img["concept_scores"][concept_idx] > 0:
result_img["bad_concepts"].append(concept_idx)
# 将当前图像的结果添加到结果列表
result.append(result_img)
# 检查是否存在任何不适宜内容的概念
has_nsfw_concepts = [len(res["bad_concepts"]) > 0 for res in result]
# 遍历每个结果,处理不适宜内容的图像
for idx, has_nsfw_concept in enumerate(has_nsfw_concepts):
if has_nsfw_concept:
# 如果 images 是张量,将该图像替换为黑图
if torch.is_tensor(images) or torch.is_tensor(images[0]):
images[idx] = torch.zeros_like(images[idx]) # black image
# 如果 images 是 NumPy 数组,将该图像替换为黑图
else:
images[idx] = np.zeros(images[idx].shape) # black image
# 如果检测到任何不适宜内容,记录警告信息
if any(has_nsfw_concepts):
logger.warning(
"Potential NSFW content was detected in one or more images. A black image will be returned instead."
" Try again with a different prompt and/or seed."
)
# 返回处理后的图像和不适宜内容的概念标识
return images, has_nsfw_concepts
# 在不计算梯度的情况下进行操作
@torch.no_grad()
# 定义一个处理输入的前向传播方法,接收 CLIP 输入和图像张量
def forward_onnx(self, clip_input: torch.Tensor, images: torch.Tensor):
# 通过视觉模型处理 CLIP 输入,获取池化输出
pooled_output = self.vision_model(clip_input)[1] # pooled_output
# 将池化输出通过视觉投影生成图像嵌入
image_embeds = self.visual_projection(pooled_output)
# 计算图像嵌入与特殊关心嵌入之间的余弦距离
special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds)
# 计算图像嵌入与概念嵌入之间的余弦距离
cos_dist = cosine_distance(image_embeds, self.concept_embeds)
# 增加此值以创建更强的 NSFW 过滤器
# 代价是增加过滤良性图像的可能性
adjustment = 0.0
# 计算特殊分数,考虑余弦距离、特殊关心嵌入权重和调整值
special_scores = special_cos_dist - self.special_care_embeds_weights + adjustment
# 对特殊分数进行四舍五入(注释掉的代码)
# special_scores = special_scores.round(decimals=3)
# 检查特殊分数是否大于零,返回布尔值表示是否关注
special_care = torch.any(special_scores > 0, dim=1)
# 如果特殊关心成立,调整值增加
special_adjustment = special_care * 0.01
# 扩展调整值以匹配余弦距离的形状
special_adjustment = special_adjustment.unsqueeze(1).expand(-1, cos_dist.shape[1])
# 计算概念分数,考虑余弦距离、概念嵌入权重和特殊调整
concept_scores = (cos_dist - self.concept_embeds_weights) + special_adjustment
# 对概念分数进行四舍五入(注释掉的代码)
# concept_scores = concept_scores.round(decimals=3)
# 检查概念分数是否大于零,返回布尔值表示是否有 NSFW 概念
has_nsfw_concepts = torch.any(concept_scores > 0, dim=1)
# 将具有 NSFW 概念的图像设置为黑色图像
images[has_nsfw_concepts] = 0.0 # black image
# 返回处理后的图像和 NSFW 概念的布尔值
return images, has_nsfw_concepts
.\diffusers\pipelines\stable_diffusion\safety_checker_flax.py
# 版权声明,表示此代码归 HuggingFace 团队所有,保留所有权利。
#
# 根据 Apache 2.0 许可证(“许可证”)授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,否则根据许可证分发的软件是以“原样”基础分发的,
# 不提供任何明示或暗示的保证或条件。
# 有关许可证下特定语言的权限和限制,请参阅许可证。
# 导入可选类型和元组类型
from typing import Optional, Tuple
# 导入 jax 库及其 numpy 模块
import jax
import jax.numpy as jnp
# 导入 flax 的模块和相关类
from flax import linen as nn
from flax.core.frozen_dict import FrozenDict
# 导入 CLIP 配置和模型基类
from transformers import CLIPConfig, FlaxPreTrainedModel
# 导入 CLIP 视觉模块
from transformers.models.clip.modeling_flax_clip import FlaxCLIPVisionModule
# 定义计算两个嵌入之间的余弦距离的函数
def jax_cosine_distance(emb_1, emb_2, eps=1e-12):
# 归一化第一个嵌入,防止除以零
norm_emb_1 = jnp.divide(emb_1.T, jnp.clip(jnp.linalg.norm(emb_1, axis=1), a_min=eps)).T
# 归一化第二个嵌入,防止除以零
norm_emb_2 = jnp.divide(emb_2.T, jnp.clip(jnp.linalg.norm(emb_2, axis=1), a_min=eps)).T
# 返回两个归一化嵌入的点积,作为余弦相似度
return jnp.matmul(norm_emb_1, norm_emb_2.T)
# 定义 Flax 稳定扩散安全检查器模块的类
class FlaxStableDiffusionSafetyCheckerModule(nn.Module):
# 定义配置和数据类型属性
config: CLIPConfig
dtype: jnp.dtype = jnp.float32
# 设置模块的组件
def setup(self):
# 初始化视觉模型
self.vision_model = FlaxCLIPVisionModule(self.config.vision_config)
# 定义视觉投影层,使用无偏置和指定数据类型
self.visual_projection = nn.Dense(self.config.projection_dim, use_bias=False, dtype=self.dtype)
# 定义概念嵌入的参数,初始化为全1的矩阵
self.concept_embeds = self.param("concept_embeds", jax.nn.initializers.ones, (17, self.config.projection_dim))
# 定义特殊关怀嵌入的参数,初始化为全1的矩阵
self.special_care_embeds = self.param(
"special_care_embeds", jax.nn.initializers.ones, (3, self.config.projection_dim)
)
# 定义概念嵌入权重的参数,初始化为全1的向量
self.concept_embeds_weights = self.param("concept_embeds_weights", jax.nn.initializers.ones, (17,))
# 定义特殊关怀嵌入权重的参数,初始化为全1的向量
self.special_care_embeds_weights = self.param("special_care_embeds_weights", jax.nn.initializers.ones, (3,))
# 定义调用方法,接受输入片段
def __call__(self, clip_input):
# 通过视觉模型处理输入片段,获取池化输出
pooled_output = self.vision_model(clip_input)[1]
# 将池化输出映射到图像嵌入
image_embeds = self.visual_projection(pooled_output)
# 计算图像嵌入与特殊关怀嵌入之间的余弦距离
special_cos_dist = jax_cosine_distance(image_embeds, self.special_care_embeds)
# 计算图像嵌入与概念嵌入之间的余弦距离
cos_dist = jax_cosine_distance(image_embeds, self.concept_embeds)
# 增加该值可创建更强的 `nfsw` 过滤器
# 但可能会增加过滤良性图像输入的可能性
adjustment = 0.0
# 计算特殊关怀分数,考虑权重和调整值
special_scores = special_cos_dist - self.special_care_embeds_weights[None, :] + adjustment
# 将特殊关怀分数四舍五入到小数点后三位
special_scores = jnp.round(special_scores, 3)
# 判断是否有特殊关怀分数大于0
is_special_care = jnp.any(special_scores > 0, axis=1, keepdims=True)
# 如果图像有任何特殊关怀概念,使用较低的阈值
special_adjustment = is_special_care * 0.01
# 计算概念分数,考虑权重和特殊调整值
concept_scores = cos_dist - self.concept_embeds_weights[None, :] + special_adjustment
# 将概念分数四舍五入到小数点后三位
concept_scores = jnp.round(concept_scores, 3)
# 判断是否有 nfsw 概念分数大于0
has_nsfw_concepts = jnp.any(concept_scores > 0, axis=1)
# 返回是否包含 nfsw 概念的布尔值
return has_nsfw_concepts
# 定义一个 Flax 稳定扩散安全检查器类,继承自 FlaxPreTrainedModel
class FlaxStableDiffusionSafetyChecker(FlaxPreTrainedModel):
# 指定配置类为 CLIPConfig
config_class = CLIPConfig
# 指定主输入名称为 "clip_input"
main_input_name = "clip_input"
# 指定模块类为 FlaxStableDiffusionSafetyCheckerModule
module_class = FlaxStableDiffusionSafetyCheckerModule
# 初始化方法,接受配置和其他参数
def __init__(
self,
config: CLIPConfig,
input_shape: Optional[Tuple] = None,
seed: int = 0,
dtype: jnp.dtype = jnp.float32,
_do_init: bool = True,
**kwargs,
):
# 如果输入形状未提供,使用默认值 (1, 224, 224, 3)
if input_shape is None:
input_shape = (1, 224, 224, 3)
# 创建模块实例,传入配置和数据类型
module = self.module_class(config=config, dtype=dtype, **kwargs)
# 调用父类的初始化方法
super().__init__(config, module, input_shape=input_shape, seed=seed, dtype=dtype, _do_init=_do_init)
# 初始化权重的方法
def init_weights(self, rng: jax.Array, input_shape: Tuple, params: FrozenDict = None) -> FrozenDict:
# 生成输入张量,使用随机正态分布
clip_input = jax.random.normal(rng, input_shape)
# 分割随机数生成器以获得不同的随机种子
params_rng, dropout_rng = jax.random.split(rng)
# 定义随机种子字典
rngs = {"params": params_rng, "dropout": dropout_rng}
# 初始化模块参数
random_params = self.module.init(rngs, clip_input)["params"]
# 返回初始化的参数
return random_params
# 定义调用方法
def __call__(
self,
clip_input,
params: dict = None,
):
# 转置输入张量的维度
clip_input = jnp.transpose(clip_input, (0, 2, 3, 1))
# 应用模块并返回结果
return self.module.apply(
{"params": params or self.params},
jnp.array(clip_input, dtype=jnp.float32),
rngs={},
)
.\diffusers\pipelines\stable_diffusion\stable_unclip_image_normalizer.py
# 版权声明,说明版权信息和许可条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 该文件的使用须遵循许可证的规定
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非在适用的情况下或以书面形式达成一致,软件
# 在许可证下分发是基于 "按现状" 的基础,不提供任何形式的保证或条件
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 见许可证了解特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 从 typing 模块导入可选和联合类型
from typing import Optional, Union
# 导入 PyTorch 库
import torch
# 从 torch 库中导入神经网络模块
from torch import nn
# 从配置工具中导入基类和注册装饰器
from ...configuration_utils import ConfigMixin, register_to_config
# 从模型工具中导入模型基类
from ...models.modeling_utils import ModelMixin
# 定义 StableUnCLIPImageNormalizer 类,继承自 ModelMixin 和 ConfigMixin
class StableUnCLIPImageNormalizer(ModelMixin, ConfigMixin):
"""
该类用于保存用于稳定 unCLIP 的 CLIP 嵌入器的均值和标准差。
在应用噪声之前,用于对图像嵌入进行标准化,并在去除标准化后的噪声图像
嵌入时使用。
"""
# 使用注册装饰器将此方法注册到配置中
@register_to_config
def __init__(
self,
embedding_dim: int = 768, # 初始化时设定嵌入维度,默认值为 768
):
# 调用父类构造函数
super().__init__()
# 定义均值参数,初始化为零的张量,形状为 (1, embedding_dim)
self.mean = nn.Parameter(torch.zeros(1, embedding_dim))
# 定义标准差参数,初始化为一的张量,形状为 (1, embedding_dim)
self.std = nn.Parameter(torch.ones(1, embedding_dim))
# 定义设备转换方法,可以将均值和标准差移动到指定设备和数据类型
def to(
self,
torch_device: Optional[Union[str, torch.device]] = None, # 可选的设备参数
torch_dtype: Optional[torch.dtype] = None, # 可选的数据类型参数
):
# 将均值参数转换到指定的设备和数据类型
self.mean = nn.Parameter(self.mean.to(torch_device).to(torch_dtype))
# 将标准差参数转换到指定的设备和数据类型
self.std = nn.Parameter(self.std.to(torch_device).to(torch_dtype))
# 返回当前对象
return self
# 定义缩放方法,对嵌入进行标准化
def scale(self, embeds):
# 根据均值和标准差标准化嵌入
embeds = (embeds - self.mean) * 1.0 / self.std
# 返回标准化后的嵌入
return embeds
# 定义反缩放方法,将标准化的嵌入恢复为原始值
def unscale(self, embeds):
# 根据标准差和均值反标准化嵌入
embeds = (embeds * self.std) + self.mean
# 返回恢复后的嵌入
return embeds
.\diffusers\pipelines\stable_diffusion\__init__.py
# 导入类型检查相关的模块
from typing import TYPE_CHECKING
# 从工具模块导入多个依赖项和函数
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢速导入标志
OptionalDependencyNotAvailable, # 可选依赖未找到的异常
_LazyModule, # 懒加载模块的工具
get_objects_from_module, # 从模块获取对象的函数
is_flax_available, # 检查 Flax 库是否可用
is_k_diffusion_available, # 检查 K-Diffusion 库是否可用
is_k_diffusion_version, # 检查 K-Diffusion 版本
is_onnx_available, # 检查 ONNX 库是否可用
is_torch_available, # 检查 PyTorch 库是否可用
is_transformers_available, # 检查 Transformers 库是否可用
is_transformers_version, # 检查 Transformers 版本
)
# 创建空字典以存储虚拟对象
_dummy_objects = {}
# 创建空字典以存储额外导入
_additional_imports = {}
# 定义初始导入结构,包含管道输出
_import_structure = {"pipeline_output": ["StableDiffusionPipelineOutput"]}
# 如果 Transformers 和 Flax 可用,则扩展管道输出
if is_transformers_available() and is_flax_available():
_import_structure["pipeline_output"].extend(["FlaxStableDiffusionPipelineOutput"])
# 尝试检查 PyTorch 和 Transformers 的可用性
try:
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable() # 如果不可用则引发异常
except OptionalDependencyNotAvailable:
# 导入虚拟对象以处理缺少的依赖项
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果可用,更新导入结构,添加多个管道
_import_structure["clip_image_project_model"] = ["CLIPImageProjection"]
_import_structure["pipeline_cycle_diffusion"] = ["CycleDiffusionPipeline"]
_import_structure["pipeline_stable_diffusion"] = ["StableDiffusionPipeline"]
_import_structure["pipeline_stable_diffusion_attend_and_excite"] = ["StableDiffusionAttendAndExcitePipeline"]
_import_structure["pipeline_stable_diffusion_gligen"] = ["StableDiffusionGLIGENPipeline"]
_import_structure["pipeline_stable_diffusion_gligen_text_image"] = ["StableDiffusionGLIGENTextImagePipeline"]
_import_structure["pipeline_stable_diffusion_img2img"] = ["StableDiffusionImg2ImgPipeline"]
_import_structure["pipeline_stable_diffusion_inpaint"] = ["StableDiffusionInpaintPipeline"]
_import_structure["pipeline_stable_diffusion_inpaint_legacy"] = ["StableDiffusionInpaintPipelineLegacy"]
_import_structure["pipeline_stable_diffusion_instruct_pix2pix"] = ["StableDiffusionInstructPix2PixPipeline"]
_import_structure["pipeline_stable_diffusion_latent_upscale"] = ["StableDiffusionLatentUpscalePipeline"]
_import_structure["pipeline_stable_diffusion_model_editing"] = ["StableDiffusionModelEditingPipeline"]
_import_structure["pipeline_stable_diffusion_paradigms"] = ["StableDiffusionParadigmsPipeline"]
_import_structure["pipeline_stable_diffusion_upscale"] = ["StableDiffusionUpscalePipeline"]
_import_structure["pipeline_stable_unclip"] = ["StableUnCLIPPipeline"]
_import_structure["pipeline_stable_unclip_img2img"] = ["StableUnCLIPImg2ImgPipeline"]
_import_structure["safety_checker"] = ["StableDiffusionSafetyChecker"]
_import_structure["stable_unclip_image_normalizer"] = ["StableUnCLIPImageNormalizer"]
# 尝试检查更严格的 Transformers 和 PyTorch 依赖条件
try:
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
raise OptionalDependencyNotAvailable() # 如果条件不满足则引发异常
except OptionalDependencyNotAvailable:
# 导入缺失的管道以处理可选依赖
from ...utils.dummy_torch_and_transformers_objects import (
StableDiffusionImageVariationPipeline, # 导入图像变体管道
)
# 更新 _dummy_objects 字典,将键 "StableDiffusionImageVariationPipeline" 映射到 StableDiffusionImageVariationPipeline 对象
_dummy_objects.update({"StableDiffusionImageVariationPipeline": StableDiffusionImageVariationPipeline})
# 如果没有可选依赖,则添加 StableDiffusionImageVariationPipeline 到导入结构
else:
_import_structure["pipeline_stable_diffusion_image_variation"] = ["StableDiffusionImageVariationPipeline"]
# 尝试检查依赖条件是否满足
try:
# 判断 transformers 和 torch 是否可用,且 transformers 版本是否满足条件
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.26.0")):
# 如果条件不满足,抛出依赖不可用异常
raise OptionalDependencyNotAvailable()
# 捕获依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从虚拟对象模块导入 StableDiffusionDepth2ImgPipeline
from ...utils.dummy_torch_and_transformers_objects import (
StableDiffusionDepth2ImgPipeline,
)
# 更新虚拟对象字典,添加 StableDiffusionDepth2ImgPipeline
_dummy_objects.update(
{
"StableDiffusionDepth2ImgPipeline": StableDiffusionDepth2ImgPipeline,
}
)
# 如果没有抛出异常,则添加 StableDiffusionDepth2ImgPipeline 到导入结构
else:
_import_structure["pipeline_stable_diffusion_depth2img"] = ["StableDiffusionDepth2ImgPipeline"]
# 尝试检查其他依赖条件是否满足
try:
# 判断 transformers 和 onnx 是否可用
if not (is_transformers_available() and is_onnx_available()):
# 如果条件不满足,抛出依赖不可用异常
raise OptionalDependencyNotAvailable()
# 捕获依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从虚拟 ONNX 对象模块导入
from ...utils import dummy_onnx_objects # noqa F403
# 更新虚拟对象字典,添加 ONNX 对象
_dummy_objects.update(get_objects_from_module(dummy_onnx_objects))
# 如果没有抛出异常,则添加 ONNX 相关的导入结构
else:
_import_structure["pipeline_onnx_stable_diffusion"] = [
"OnnxStableDiffusionPipeline",
"StableDiffusionOnnxPipeline",
]
_import_structure["pipeline_onnx_stable_diffusion_img2img"] = ["OnnxStableDiffusionImg2ImgPipeline"]
_import_structure["pipeline_onnx_stable_diffusion_inpaint"] = ["OnnxStableDiffusionInpaintPipeline"]
_import_structure["pipeline_onnx_stable_diffusion_inpaint_legacy"] = ["OnnxStableDiffusionInpaintPipelineLegacy"]
_import_structure["pipeline_onnx_stable_diffusion_upscale"] = ["OnnxStableDiffusionUpscalePipeline"]
# 检查 transformers 和 flax 是否可用
if is_transformers_available() and is_flax_available():
# 从调度器模块导入 PNDMSchedulerState
from ...schedulers.scheduling_pndm_flax import PNDMSchedulerState
# 更新额外导入字典,添加 PNDMSchedulerState
_additional_imports.update({"PNDMSchedulerState": PNDMSchedulerState})
# 添加 flax 相关的导入结构
_import_structure["pipeline_flax_stable_diffusion"] = ["FlaxStableDiffusionPipeline"]
_import_structure["pipeline_flax_stable_diffusion_img2img"] = ["FlaxStableDiffusionImg2ImgPipeline"]
_import_structure["pipeline_flax_stable_diffusion_inpaint"] = ["FlaxStableDiffusionInpaintPipeline"]
_import_structure["safety_checker_flax"] = ["FlaxStableDiffusionSafetyChecker"]
# 检查类型检查或慢导入条件
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 尝试检查依赖条件
try:
# 判断 transformers 和 torch 是否可用
if not (is_transformers_available() and is_torch_available()):
# 如果条件不满足,抛出依赖不可用异常
raise OptionalDependencyNotAvailable()
# 捕获依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从虚拟对象模块导入所有内容
from ...utils.dummy_torch_and_transformers_objects import *
else:
# 从模块中导入 CLIPImageProjection 类
from .clip_image_project_model import CLIPImageProjection
# 从模块中导入 StableDiffusionPipeline 和 StableDiffusionPipelineOutput 类
from .pipeline_stable_diffusion import (
StableDiffusionPipeline,
StableDiffusionPipelineOutput,
)
# 从模块中导入 StableDiffusionImg2ImgPipeline 类
from .pipeline_stable_diffusion_img2img import StableDiffusionImg2ImgPipeline
# 从模块中导入 StableDiffusionInpaintPipeline 类
from .pipeline_stable_diffusion_inpaint import StableDiffusionInpaintPipeline
# 从模块中导入 StableDiffusionInstructPix2PixPipeline 类
from .pipeline_stable_diffusion_instruct_pix2pix import (
StableDiffusionInstructPix2PixPipeline,
)
# 从模块中导入 StableDiffusionLatentUpscalePipeline 类
from .pipeline_stable_diffusion_latent_upscale import (
StableDiffusionLatentUpscalePipeline,
)
# 从模块中导入 StableDiffusionUpscalePipeline 类
from .pipeline_stable_diffusion_upscale import StableDiffusionUpscalePipeline
# 从模块中导入 StableUnCLIPPipeline 类
from .pipeline_stable_unclip import StableUnCLIPPipeline
# 从模块中导入 StableUnCLIPImg2ImgPipeline 类
from .pipeline_stable_unclip_img2img import StableUnCLIPImg2ImgPipeline
# 从模块中导入 StableDiffusionSafetyChecker 类
from .safety_checker import StableDiffusionSafetyChecker
# 从模块中导入 StableUnCLIPImageNormalizer 类
from .stable_unclip_image_normalizer import StableUnCLIPImageNormalizer
try:
# 检查是否可用所需的 transformers 和 torch 库及其版本
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
# 抛出可选依赖不可用异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 导入 dummy_torch_and_transformers_objects 中的 StableDiffusionImageVariationPipeline 类
from ...utils.dummy_torch_and_transformers_objects import (
StableDiffusionImageVariationPipeline,
)
else:
# 从模块中导入 StableDiffusionImageVariationPipeline 类
from .pipeline_stable_diffusion_image_variation import (
StableDiffusionImageVariationPipeline,
)
try:
# 检查是否可用所需的 transformers 和 torch 库及其版本
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.26.0")):
# 抛出可选依赖不可用异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 导入 dummy_torch_and_transformers_objects 中的 StableDiffusionDepth2ImgPipeline 类
from ...utils.dummy_torch_and_transformers_objects import StableDiffusionDepth2ImgPipeline
else:
# 从模块中导入 StableDiffusionDepth2ImgPipeline 类
from .pipeline_stable_diffusion_depth2img import (
StableDiffusionDepth2ImgPipeline,
)
try:
# 检查是否可用所需的 transformers 和 onnx 库
if not (is_transformers_available() and is_onnx_available()):
# 抛出可选依赖不可用异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 从 dummy_onnx_objects 中导入所有对象
from ...utils.dummy_onnx_objects import *
else:
# 从模块中导入 OnnxStableDiffusionPipeline 和 StableDiffusionOnnxPipeline 类
from .pipeline_onnx_stable_diffusion import (
OnnxStableDiffusionPipeline,
StableDiffusionOnnxPipeline,
)
# 从模块中导入 OnnxStableDiffusionImg2ImgPipeline 类
from .pipeline_onnx_stable_diffusion_img2img import (
OnnxStableDiffusionImg2ImgPipeline,
)
# 从模块中导入 OnnxStableDiffusionInpaintPipeline 类
from .pipeline_onnx_stable_diffusion_inpaint import (
OnnxStableDiffusionInpaintPipeline,
)
# 从模块中导入 OnnxStableDiffusionUpscalePipeline 类
from .pipeline_onnx_stable_diffusion_upscale import (
OnnxStableDiffusionUpscalePipeline,
)
try:
# 检查是否可用所需的 transformers 和 flax 库
if not (is_transformers_available() and is_flax_available()):
# 抛出可选依赖不可用异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 从 dummy_flax_objects 中导入所有对象
from ...utils.dummy_flax_objects import *
# 如果条件不满足,则执行以下导入操作
else:
# 从模块中导入 FlaxStableDiffusionPipeline 类
from .pipeline_flax_stable_diffusion import FlaxStableDiffusionPipeline
# 从模块中导入 FlaxStableDiffusionImg2ImgPipeline 类
from .pipeline_flax_stable_diffusion_img2img import (
FlaxStableDiffusionImg2ImgPipeline,
)
# 从模块中导入 FlaxStableDiffusionInpaintPipeline 类
from .pipeline_flax_stable_diffusion_inpaint import (
FlaxStableDiffusionInpaintPipeline,
)
# 从模块中导入 FlaxStableDiffusionPipelineOutput 类
from .pipeline_output import FlaxStableDiffusionPipelineOutput
# 从模块中导入 FlaxStableDiffusionSafetyChecker 类
from .safety_checker_flax import FlaxStableDiffusionSafetyChecker
# 否则分支,处理模块的懒加载
else:
# 导入 sys 模块以便操作模块相关功能
import sys
# 将当前模块名映射到一个懒加载模块实例
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块的名称
globals()["__file__"], # 当前模块的文件路径
_import_structure, # 定义的导入结构
module_spec=__spec__, # 当前模块的规格
)
# 遍历虚拟对象字典,将每个对象设置到当前模块中
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
# 遍历附加导入字典,将每个对象设置到当前模块中
for name, value in _additional_imports.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_3\pipeline_output.py
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 从 typing 模块导入 List 和 Union 类型提示
from typing import List, Union
# 导入 numpy 库并简化为 np
import numpy as np
# 导入 PIL.Image 模块,用于处理图像
import PIL.Image
# 从上级模块的 utils 中导入 BaseOutput 类
from ...utils import BaseOutput
# 定义一个数据类 StableDiffusion3PipelineOutput,继承自 BaseOutput
@dataclass
class StableDiffusion3PipelineOutput(BaseOutput):
"""
Stable Diffusion 管道的输出类。
参数:
images (`List[PIL.Image.Image]` 或 `np.ndarray`)
长度为 `batch_size` 的去噪 PIL 图像列表或形状为 `(batch_size, height, width,
num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组表示扩散管道的去噪图像。
"""
# 定义一个属性 images,类型为列表或 numpy 数组
images: Union[List[PIL.Image.Image], np.ndarray]
.\diffusers\pipelines\stable_diffusion_3\pipeline_stable_diffusion_3.py
# 版权声明,表明版权归 2024 Stability AI 和 HuggingFace Team 所有
#
# 根据 Apache 许可证,版本 2.0("许可证")进行许可;
# 除非遵循许可证,否则您不能使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,否则根据许可证分发的软件是以“现状”基础提供的,
# 不提供任何形式的保证或条件,无论是明示或暗示的。
# 有关许可证下权限和限制的具体条款,请参见许可证。
# 导入 inspect 模块,用于获取对象的各种信息
import inspect
# 从 typing 模块导入类型提示相关的类型
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库导入相关的模型和分词器
from transformers import (
CLIPTextModelWithProjection, # 导入 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 分词器
T5EncoderModel, # 导入 T5 编码器模型
T5TokenizerFast, # 导入快速 T5 分词器
)
# 从本地模块导入图像处理器
from ...image_processor import VaeImageProcessor
# 从本地模块导入加载器混合类
from ...loaders import FromSingleFileMixin, SD3LoraLoaderMixin
# 从本地模块导入自动编码器模型
from ...models.autoencoders import AutoencoderKL
# 从本地模块导入变换器模型
from ...models.transformers import SD3Transformer2DModel
# 从本地模块导入调度器
from ...schedulers import FlowMatchEulerDiscreteScheduler
# 从本地模块导入各种工具函数
from ...utils import (
USE_PEFT_BACKEND, # 导入 PEFT 后端使用标志
is_torch_xla_available, # 导入检查 XLA 可用性的函数
logging, # 导入日志记录工具
replace_example_docstring, # 导入替换示例文档字符串的工具
scale_lora_layers, # 导入缩放 LoRA 层的工具
unscale_lora_layers, # 导入取消缩放 LoRA 层的工具
)
# 从本地模块导入随机张量生成工具
from ...utils.torch_utils import randn_tensor
# 从本地模块导入扩散管道工具
from ..pipeline_utils import DiffusionPipeline
# 从本地模块导入稳定扩散管道输出类
from .pipeline_output import StableDiffusion3PipelineOutput
# 检查是否可用 torch_xla 库,若可用则导入
if is_torch_xla_available():
import torch_xla.core.xla_model as xm # 导入 XLA 模型相关功能
XLA_AVAILABLE = True # 设置 XLA 可用标志为 True
else:
XLA_AVAILABLE = False # 设置 XLA 可用标志为 False
# 获取当前模块的日志记录器
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,用于说明如何使用 StableDiffusion3Pipeline
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import StableDiffusion3Pipeline
>>> pipe = StableDiffusion3Pipeline.from_pretrained(
... "stabilityai/stable-diffusion-3-medium-diffusers", torch_dtype=torch.float16
... )
>>> pipe.to("cuda") # 将管道移动到 GPU
>>> prompt = "A cat holding a sign that says hello world" # 定义生成图像的提示
>>> image = pipe(prompt).images[0] # 生成图像并提取第一张
>>> image.save("sd3.png") # 保存生成的图像
```py
"""
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 中复制的函数
def retrieve_timesteps(
scheduler, # 调度器对象,用于管理时间步
num_inference_steps: Optional[int] = None, # 可选参数,推理步骤数
device: Optional[Union[str, torch.device]] = None, # 可选参数,设备类型
timesteps: Optional[List[int]] = None, # 可选参数,自定义时间步列表
sigmas: Optional[List[float]] = None, # 可选参数,自定义 sigma 值列表
**kwargs, # 额外的关键字参数
):
"""
调用调度器的 `set_timesteps` 方法并在调用后从调度器中检索时间步。处理
自定义时间步。所有的关键字参数将传递给 `scheduler.set_timesteps`。
# 定义参数的说明
Args:
scheduler (`SchedulerMixin`): # 调度器,用于获取时间步
The scheduler to get timesteps from.
num_inference_steps (`int`): # 生成样本时使用的扩散步骤数
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*): # 指定时间步要移动到的设备
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*): # 自定义时间步以覆盖调度器的时间步间隔策略
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*): # 自定义sigma以覆盖调度器的时间步间隔策略
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
`Tuple[torch.Tensor, int]`: 返回一个元组,第一个元素是调度器的时间步调度,第二个元素是推理步骤的数量。
"""
# 检查是否同时传入了时间步和sigma,若是则抛出错误
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查是否传入了时间步
if timesteps is not None:
# 检查调度器是否支持自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps: # 不支持则抛出错误
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取当前调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 检查是否传入了sigma
elif sigmas is not None:
# 检查调度器是否支持自定义sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accept_sigmas: # 不支持则抛出错误
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取当前调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果没有传入时间步和sigma
else:
# 使用推理步骤数量设置调度器的时间步
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取当前调度器的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤数量
return timesteps, num_inference_steps
# 定义 StableDiffusion3Pipeline 类,继承自 DiffusionPipeline、SD3LoraLoaderMixin 和 FromSingleFileMixin
class StableDiffusion3Pipeline(DiffusionPipeline, SD3LoraLoaderMixin, FromSingleFileMixin):
r""" # 文档字符串,描述类的参数及其作用
Args: # 指明构造函数参数的开始
transformer ([`SD3Transformer2DModel`]): # 条件 Transformer(MMDiT)架构,用于去噪编码后的图像潜变量
Conditional Transformer (MMDiT) architecture to denoise the encoded image latents.
scheduler ([`FlowMatchEulerDiscreteScheduler`]): # 与 transformer 结合使用的调度器,用于去噪图像潜变量
A scheduler to be used in combination with `transformer` to denoise the encoded image latents.
vae ([`AutoencoderKL`]): # 用于图像编码和解码的变分自编码器模型
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModelWithProjection`]): # 特定 CLIP 变体的文本编码器,具有额外的投影层
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant,
with an additional added projection layer that is initialized with a diagonal matrix with the `hidden_size`
as its dimension.
text_encoder_2 ([`CLIPTextModelWithProjection`]): # 第二个特定 CLIP 变体的文本编码器
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
variant.
text_encoder_3 ([`T5EncoderModel`]): # 冻结的文本编码器,使用 T5 变体
Frozen text-encoder. Stable Diffusion 3 uses
[T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel), specifically the
[t5-v1_1-xxl](https://huggingface.co/google/t5-v1_1-xxl) variant.
tokenizer (`CLIPTokenizer`): # CLIPTokenizer 类的标记器
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_2 (`CLIPTokenizer`): # 第二个 CLIPTokenizer 类的标记器
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_3 (`T5TokenizerFast`): # T5Tokenizer 类的标记器
Tokenizer of class
[T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer).
""" # 文档字符串结束
# 定义模型的 CPU 离线加载顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->text_encoder_3->transformer->vae"
# 定义可选组件为空列表
_optional_components = []
# 定义回调张量输入的列表
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "negative_pooled_prompt_embeds"]
# 初始化方法,定义类的构造函数
def __init__(
self,
transformer: SD3Transformer2DModel, # 接收的 transformer 参数,类型为 SD3Transformer2DModel
scheduler: FlowMatchEulerDiscreteScheduler, # 接收的调度器参数,类型为 FlowMatchEulerDiscreteScheduler
vae: AutoencoderKL, # 接收的 VAE 参数,类型为 AutoencoderKL
text_encoder: CLIPTextModelWithProjection, # 接收的文本编码器参数,类型为 CLIPTextModelWithProjection
tokenizer: CLIPTokenizer, # 接收的标记器参数,类型为 CLIPTokenizer
text_encoder_2: CLIPTextModelWithProjection, # 接收的第二个文本编码器参数,类型为 CLIPTextModelWithProjection
tokenizer_2: CLIPTokenizer, # 接收的第二个标记器参数,类型为 CLIPTokenizer
text_encoder_3: T5EncoderModel, # 接收的第三个文本编码器参数,类型为 T5EncoderModel
tokenizer_3: T5TokenizerFast, # 接收的第三个标记器参数,类型为 T5TokenizerFast
# 初始化父类
):
super().__init__()
# 注册多个模块,包括变分自编码器、文本编码器和调度器
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
text_encoder_3=text_encoder_3,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
tokenizer_3=tokenizer_3,
transformer=transformer,
scheduler=scheduler,
)
# 计算 VAE 缩放因子,基于配置的块输出通道数
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
# 创建图像处理器,使用计算得出的 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 获取最大标记长度,如果有 tokenizer 的话
self.tokenizer_max_length = (
self.tokenizer.model_max_length if hasattr(self, "tokenizer") and self.tokenizer is not None else 77
)
# 设置默认样本大小,基于 transformer 的配置
self.default_sample_size = (
self.transformer.config.sample_size
if hasattr(self, "transformer") and self.transformer is not None
else 128
)
# 定义获取 T5 提示嵌入的方法
def _get_t5_prompt_embeds(
self,
# 提示文本,支持单个字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 每个提示生成的图像数量
num_images_per_prompt: int = 1,
# 最大序列长度限制
max_sequence_length: int = 256,
# 设备类型选择
device: Optional[torch.device] = None,
# 数据类型选择
dtype: Optional[torch.dtype] = None,
# 处理设备和数据类型的设置
):
# 如果未指定设备,则使用默认执行设备
device = device or self._execution_device
# 如果未指定数据类型,则使用文本编码器的数据类型
dtype = dtype or self.text_encoder.dtype
# 将输入的 prompt 转换为列表形式(如果是字符串则包裹在列表中)
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取 prompt 的批处理大小
batch_size = len(prompt)
# 如果没有第三个文本编码器,则返回零张量
if self.text_encoder_3 is None:
return torch.zeros(
# 创建一个零张量,形状由批处理大小和其他参数决定
(
batch_size * num_images_per_prompt,
self.tokenizer_max_length,
self.transformer.config.joint_attention_dim,
),
device=device,
dtype=dtype,
)
# 使用第三个文本编码器对 prompt 进行编码,返回张量格式
text_inputs = self.tokenizer_3(
prompt,
padding="max_length", # 填充到最大长度
max_length=max_sequence_length, # 最大序列长度
truncation=True, # 启用截断
add_special_tokens=True, # 添加特殊标记
return_tensors="pt", # 返回 PyTorch 张量
)
# 提取输入 ID
text_input_ids = text_inputs.input_ids
# 获取未截断的 ID
untruncated_ids = self.tokenizer_3(prompt, padding="longest", return_tensors="pt").input_ids
# 检查未截断 ID 是否长于或等于输入 ID,且不相等
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的部分并记录警告
removed_text = self.tokenizer_3.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}"
)
# 获取文本编码器对输入 ID 的嵌入
prompt_embeds = self.text_encoder_3(text_input_ids.to(device))[0]
# 设置数据类型
dtype = self.text_encoder_3.dtype
# 将嵌入转换为指定的数据类型和设备
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 获取嵌入的形状信息
_, seq_len, _ = prompt_embeds.shape
# 为每个 prompt 生成的图像重复文本嵌入和注意力掩码,采用适合 mps 的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 调整嵌入的形状
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 返回最终的文本嵌入
return prompt_embeds
# 获取 CLIP 的提示嵌入
def _get_clip_prompt_embeds(
self,
# 接受字符串或字符串列表作为提示
prompt: Union[str, List[str]],
# 每个提示生成的图像数量,默认值为 1
num_images_per_prompt: int = 1,
# 设备选择
device: Optional[torch.device] = None,
# CLIP 跳过的层数(可选)
clip_skip: Optional[int] = None,
# CLIP 模型索引,默认值为 0
clip_model_index: int = 0,
):
# 确定设备,如果没有指定,则使用默认执行设备
device = device or self._execution_device
# 定义 CLIP 的两个分词器
clip_tokenizers = [self.tokenizer, self.tokenizer_2]
# 定义 CLIP 的两个文本编码器
clip_text_encoders = [self.text_encoder, self.text_encoder_2]
# 根据给定的模型索引选择对应的分词器
tokenizer = clip_tokenizers[clip_model_index]
# 根据给定的模型索引选择对应的文本编码器
text_encoder = clip_text_encoders[clip_model_index]
# 如果 prompt 是字符串,则将其放入列表中,否则保持原样
prompt = [prompt] if isinstance(prompt, str) else prompt
# 计算批处理大小,即 prompt 的数量
batch_size = len(prompt)
# 使用选定的分词器对 prompt 进行编码,返回张量
text_inputs = tokenizer(
prompt,
padding="max_length", # 填充到最大长度
max_length=self.tokenizer_max_length, # 最大长度限制
truncation=True, # 允许截断
return_tensors="pt", # 返回 PyTorch 张量
)
# 提取编码后的输入 ID
text_input_ids = text_inputs.input_ids
# 对原始 prompt 进行长格式编码以获取未截断的 ID
untruncated_ids = tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 检查是否需要警告用户输入被截断
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的文本
removed_text = tokenizer.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
# 记录警告,通知用户被截断的部分
logger.warning(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {self.tokenizer_max_length} tokens: {removed_text}"
)
# 使用选定的文本编码器生成提示的嵌入表示
prompt_embeds = text_encoder(text_input_ids.to(device), output_hidden_states=True)
# 提取 pooled 提示嵌入
pooled_prompt_embeds = prompt_embeds[0]
# 根据是否指定 clip_skip 来选择对应的隐藏状态
if clip_skip is None:
prompt_embeds = prompt_embeds.hidden_states[-2]
else:
prompt_embeds = prompt_embeds.hidden_states[-(clip_skip + 2)]
# 将嵌入转换为指定的数据类型和设备
prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)
# 获取提示嵌入的形状
_, seq_len, _ = prompt_embeds.shape
# 对每个 prompt 生成多个图像时复制文本嵌入,使用兼容 mps 的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 重塑为批处理大小与生成图像数量的组合
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 复制 pooled 提示嵌入以适应多个生成的图像
pooled_prompt_embeds = pooled_prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 重塑为批处理大小与生成图像数量的组合
pooled_prompt_embeds = pooled_prompt_embeds.view(batch_size * num_images_per_prompt, -1)
# 返回提示嵌入和 pooled 提示嵌入
return prompt_embeds, pooled_prompt_embeds
# 定义一个方法用于编码提示信息
def encode_prompt(
self, # 方法参数开始
prompt: Union[str, List[str]], # 第一个提示,支持字符串或字符串列表
prompt_2: Union[str, List[str]], # 第二个提示,支持字符串或字符串列表
prompt_3: Union[str, List[str]], # 第三个提示,支持字符串或字符串列表
device: Optional[torch.device] = None, # 可选参数,指定设备
num_images_per_prompt: int = 1, # 每个提示生成的图像数量,默认1
do_classifier_free_guidance: bool = True, # 是否执行无分类器引导,默认是
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负面提示
negative_prompt_2: Optional[Union[str, List[str]]] = None, # 第二个负面提示
negative_prompt_3: Optional[Union[str, List[str]]] = None, # 第三个负面提示
prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的负面提示嵌入
pooled_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的池化提示嵌入
negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选的负面池化提示嵌入
clip_skip: Optional[int] = None, # 可选参数,指定跳过的剪辑步骤
max_sequence_length: int = 256, # 最大序列长度,默认256
# 定义一个检查输入的方法
def check_inputs(
self, # 方法参数开始
prompt, # 第一个提示
prompt_2, # 第二个提示
prompt_3, # 第三个提示
height, # 图像高度
width, # 图像宽度
negative_prompt=None, # 可选的负面提示
negative_prompt_2=None, # 第二个负面提示
negative_prompt_3=None, # 第三个负面提示
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
pooled_prompt_embeds=None, # 可选的池化提示嵌入
negative_pooled_prompt_embeds=None, # 可选的负面池化提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选的回调输入
max_sequence_length=None, # 可选的最大序列长度
# 定义一个准备潜在变量的方法
def prepare_latents(
self, # 方法参数开始
batch_size, # 批次大小
num_channels_latents, # 潜在变量通道数
height, # 图像高度
width, # 图像宽度
dtype, # 数据类型
device, # 设备
generator, # 随机生成器
latents=None, # 可选的潜在变量
):
if latents is not None: # 检查是否提供了潜在变量
return latents.to(device=device, dtype=dtype) # 转移潜在变量到指定设备和数据类型
shape = ( # 定义潜在变量的形状
batch_size, # 批次大小
num_channels_latents, # 潜在变量通道数
int(height) // self.vae_scale_factor, # 高度缩放
int(width) // self.vae_scale_factor, # 宽度缩放
)
if isinstance(generator, list) and len(generator) != batch_size: # 检查生成器列表的长度
raise ValueError( # 如果长度不匹配则抛出错误
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype) # 生成潜在变量
return latents # 返回生成的潜在变量
@property
def guidance_scale(self): # 定义一个属性用于获取引导比例
return self._guidance_scale # 返回引导比例
@property
def clip_skip(self): # 定义一个属性用于获取剪辑跳过值
return self._clip_skip # 返回剪辑跳过值
# 这里`guidance_scale`定义类似于Imagen论文中方程(2)的引导权重`w`
# https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
# 表示不执行无分类器引导。
@property
def do_classifier_free_guidance(self): # 定义一个属性判断是否执行无分类器引导
return self._guidance_scale > 1 # 如果引导比例大于1,则返回True
@property
def joint_attention_kwargs(self): # 定义一个属性用于获取联合注意力参数
return self._joint_attention_kwargs # 返回联合注意力参数
@property
def num_timesteps(self): # 定义一个属性用于获取时间步数
return self._num_timesteps # 返回时间步数
@property
def interrupt(self): # 定义一个属性用于获取中断状态
return self._interrupt # 返回中断状态
# 禁用梯度计算,以节省内存和加快计算速度
@torch.no_grad()
# 装饰器,用于替换示例文档字符串为预定义的文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的方法,接收多个参数用于生成图像
def __call__(
# 提示文本,可以是字符串或字符串列表,默认为 None
self,
prompt: Union[str, List[str]] = None,
# 第二个提示文本,类型和默认值与 prompt 相同
prompt_2: Optional[Union[str, List[str]]] = None,
# 第三个提示文本,类型和默认值与 prompt 相同
prompt_3: Optional[Union[str, List[str]]] = None,
# 输出图像的高度,可选参数,默认为 None
height: Optional[int] = None,
# 输出图像的宽度,可选参数,默认为 None
width: Optional[int] = None,
# 进行推理的步骤数,默认为 28
num_inference_steps: int = 28,
# 指定时间步的列表,默认为 None
timesteps: List[int] = None,
# 引导强度,默认为 7.0
guidance_scale: float = 7.0,
# 负面提示文本,可以是字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 第二个负面提示文本,类型和默认值与 negative_prompt 相同
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 第三个负面提示文本,类型和默认值与 negative_prompt 相同
negative_prompt_3: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 随机数生成器,可以是单个生成器或生成器列表,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在表示,可选的浮点张量,默认为 None
latents: Optional[torch.FloatTensor] = None,
# 提示嵌入,可选的浮点张量,默认为 None
prompt_embeds: Optional[torch.FloatTensor] = None,
# 负面提示嵌入,可选的浮点张量,默认为 None
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
# 聚合的提示嵌入,可选的浮点张量,默认为 None
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 负面聚合提示嵌入,可选的浮点张量,默认为 None
negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典,默认为 True
return_dict: bool = True,
# 联合注意力的额外参数,默认为 None
joint_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的剪辑步骤,默认为 None
clip_skip: Optional[int] = None,
# 每步结束时的回调函数,默认为 None
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 在每步结束时要回调的张量输入列表,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 最大序列长度,默认为 256
max_sequence_length: int = 256,