diffusers 源码解析(四十)
.\diffusers\pipelines\pag\pipeline_pag_sd_xl.py
# 版权声明,标识本文件的版权信息
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 许可声明,说明文件使用的许可证
# Licensed under the Apache License, Version 2.0 (the "License");
# 仅在遵守许可证的情况下才能使用该文件
# you may not use this file except in compliance with the License.
# 获取许可证的链接
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 指定除非适用,否则以 "AS IS" 基础分发软件
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 没有任何形式的担保或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证的具体语言以及权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入 Python 内置的 inspect 模块
import inspect
# 从 typing 模块导入类型注解
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 CLIP 相关的类
from transformers import (
CLIPImageProcessor, # 图像处理器
CLIPTextModel, # 文本模型
CLIPTextModelWithProjection, # 带投影的文本模型
CLIPTokenizer, # 标记器
CLIPVisionModelWithProjection, # 带投影的视觉模型
)
# 从相对路径导入自定义模块
from ...image_processor import PipelineImageInput, VaeImageProcessor # 图像输入和变分自编码器处理器
from ...loaders import (
FromSingleFileMixin, # 单文件加载混合类
IPAdapterMixin, # IP 适配器混合类
StableDiffusionXLLoraLoaderMixin, # Stable Diffusion XL LoRA 加载混合类
TextualInversionLoaderMixin, # 文本反转加载混合类
)
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel # 各种模型
from ...models.attention_processor import (
AttnProcessor2_0, # 注意力处理器版本 2.0
FusedAttnProcessor2_0, # 融合注意力处理器版本 2.0
XFormersAttnProcessor, # XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder # 调整文本编码器的 LoRA 比例
from ...schedulers import KarrasDiffusionSchedulers # Karras 扩散调度器
from ...utils import (
USE_PEFT_BACKEND, # 是否使用 PEFT 后端
is_invisible_watermark_available, # 检查隐形水印功能是否可用
is_torch_xla_available, # 检查 Torch XLA 是否可用
logging, # 日志记录模块
replace_example_docstring, # 替换示例文档字符串的工具
scale_lora_layers, # 调整 LoRA 层
unscale_lora_layers, # 反调整 LoRA 层
)
from ...utils.torch_utils import randn_tensor # 随机生成张量的工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 扩散管道和混合类
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput # Stable Diffusion XL 输出
from .pag_utils import PAGMixin # PAG 混合类
# 如果隐形水印可用,则导入相关的水印模块
if is_invisible_watermark_available():
from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker # Stable Diffusion XL 水印处理器
# 检查并导入 XLA 相关模块
if is_torch_xla_available():
import torch_xla.core.xla_model as xm # 导入 XLA 模型模块
XLA_AVAILABLE = True # 设置 XLA 可用标志为真
else:
XLA_AVAILABLE = False # 设置 XLA 可用标志为假
# 创建日志记录器,使用模块的名称
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,提供用法示例
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import AutoPipelineForText2Image
>>> pipe = AutoPipelineForText2Image.from_pretrained(
... "stabilityai/stable-diffusion-xl-base-1.0",
... torch_dtype=torch.float16,
... enable_pag=True,
... )
>>> pipe = pipe.to("cuda")
>>> prompt = "a photo of an astronaut riding a horse on mars"
>>> image = pipe(prompt, pag_scale=0.3).images[0]
```py
"""
# 从 diffusers 库复制的函数,重标定噪声配置
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
根据 guidance_rescale 重新标定 `noise_cfg`。基于对 [Common Diffusion Noise Schedules 和
# 文档引用,说明样本步骤存在缺陷,详细信息请参见文档第3.4节
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True) # 计算噪声预测文本的标准差,维度上保留原始形状
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True) # 计算噪声配置的标准差,维度上保留原始形状
# 通过标准化结果来重新缩放指导结果,修正过度曝光的问题
noise_pred_rescaled = noise_cfg * (std_text / std_cfg) # 计算重新缩放后的噪声预测,按标准差进行调整
# 通过引导缩放因子与原始指导结果混合,避免生成“普通”外观的图像
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg # 将重新缩放的结果与噪声配置混合
return noise_cfg # 返回最终调整后的噪声配置
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 中复制的函数
def retrieve_timesteps(
# 调度器对象
scheduler,
# 用于生成样本的推理步骤数量,可选
num_inference_steps: Optional[int] = None,
# 要移动到的设备,可选
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步,可选
timesteps: Optional[List[int]] = None,
# 自定义 sigma 值,可选
sigmas: Optional[List[float]] = None,
# 其他关键字参数
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法并在调用后从调度器获取时间步。处理自定义时间步。任何关键字参数将被传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
获取时间步的调度器。
num_inference_steps (`int`):
用于生成样本的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
device (`str` 或 `torch.device`,*可选*):
要移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`,*可选*):
自定义时间步,用于覆盖调度器的时间步间距策略。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`,*可选*):
自定义 sigma 值,用于覆盖调度器的时间步间距策略。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,其中第一个元素是调度器的时间步安排,第二个元素是推理步骤的数量。
"""
# 如果同时提供了时间步和 sigma,则抛出错误
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果提供了时间步
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受时间步参数
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果提供了 sigma
elif sigmas is not None:
# 检查调度器的 set_timesteps 方法是否接受 sigma 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果不是前面的条件,则执行以下代码块
else:
# 设置调度器的时间步长,使用指定的推理步骤数和设备,传入额外参数
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器的时间步长并存储到变量中
timesteps = scheduler.timesteps
# 返回时间步长和推理步骤数
return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionXLPAGPipeline 的类,继承多个混合类
class StableDiffusionXLPAGPipeline(
# 继承自 DiffusionPipeline,提供基本扩散管道功能
DiffusionPipeline,
# 继承自 StableDiffusionMixin,提供稳定扩散特性
StableDiffusionMixin,
# 继承自 FromSingleFileMixin,支持从单文件加载模型
FromSingleFileMixin,
# 继承自 StableDiffusionXLLoraLoaderMixin,支持加载 LoRA 权重
StableDiffusionXLLoraLoaderMixin,
# 继承自 TextualInversionLoaderMixin,支持加载文本反转嵌入
TextualInversionLoaderMixin,
# 继承自 IPAdapterMixin,支持加载 IP 适配器
IPAdapterMixin,
# 继承自 PAGMixin,提供 PAG 相关功能
PAGMixin,
):
# 文档字符串,描述该管道的功能和继承关系
r"""
用于使用 Stable Diffusion XL 的文本到图像生成的管道。
该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解库为所有管道实现的通用方法
(如下载或保存,特定设备上的运行等)。
该管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
- [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
"""
# 定义函数参数的文档字符串,说明每个参数的用途和类型
Args:
vae ([`AutoencoderKL`]): # 变分自编码器(VAE)模型,用于将图像编码和解码为潜在表示
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModel`]): # 冻结的文本编码器,Stable Diffusion XL 使用 CLIP 的文本部分
Frozen text-encoder. Stable Diffusion XL uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel),具体是
[clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
text_encoder_2 ([` CLIPTextModelWithProjection`]): # 第二个冻结文本编码器,使用 CLIP 的文本和池部分
Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
具体是
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
变体。
tokenizer (`CLIPTokenizer`): # CLIPTokenizer 类的分词器,用于将文本转换为标记
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_2 (`CLIPTokenizer`): # 第二个 CLIPTokenizer 类的分词器
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
unet ([`UNet2DConditionModel`]): # 条件 U-Net 结构,用于去噪编码的图像潜在表示
Conditional U-Net architecture to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]): # 与 U-Net 结合使用的调度器,用于去噪编码的图像潜在表示
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`],或 [`PNDMScheduler`]。
force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`): # 是否强制将负提示嵌入设置为 0
Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
`stabilityai/stable-diffusion-xl-base-1-0`.
add_watermarker (`bool`, *optional*): # 是否使用不可见水印库对输出图像进行水印处理
Whether to use the [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) to
watermark output images. If not defined, it will default to True if the package is installed, otherwise no
watermarker will be used.
"""
# 定义模型在 CPU 上的卸载顺序,便于管理资源
model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
# 定义可选组件的列表,便于灵活使用不同模块
_optional_components = [
"tokenizer", # 第一个分词器
"tokenizer_2", # 第二个分词器
"text_encoder", # 第一个文本编码器
"text_encoder_2", # 第二个文本编码器
"image_encoder", # 图像编码器
"feature_extractor", # 特征提取器
]
# 定义回调的张量输入列表,方便进行数据管理
_callback_tensor_inputs = [
"latents", # 潜在表示
"prompt_embeds", # 提示嵌入
"negative_prompt_embeds", # 负提示嵌入
"add_text_embeds", # 添加的文本嵌入
"add_time_ids", # 添加的时间 ID
"negative_pooled_prompt_embeds", # 负池化提示嵌入
"negative_add_time_ids", # 负添加时间 ID
]
# 初始化类,设置所需参数
def __init__(
# 变分自编码器
self,
vae: AutoencoderKL,
# 文本编码器
text_encoder: CLIPTextModel,
# 第二个文本编码器,带投影
text_encoder_2: CLIPTextModelWithProjection,
# 第一个分词器
tokenizer: CLIPTokenizer,
# 第二个分词器
tokenizer_2: CLIPTokenizer,
# UNet 2D 条件模型
unet: UNet2DConditionModel,
# 调度器
scheduler: KarrasDiffusionSchedulers,
# 可选的图像编码器
image_encoder: CLIPVisionModelWithProjection = None,
# 可选的特征提取器
feature_extractor: CLIPImageProcessor = None,
# 当提示为空时,强制使用零
force_zeros_for_empty_prompt: bool = True,
# 可选的水印添加标志
add_watermarker: Optional[bool] = None,
# 应用层的选择,默认为"mid"
pag_applied_layers: Union[str, List[str]] = "mid", # ["mid"],["down.block_1"],["up.block_0.attentions_0"]
):
# 调用父类初始化
super().__init__()
# 注册所需模块,便于后续调用
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
scheduler=scheduler,
image_encoder=image_encoder,
feature_extractor=feature_extractor,
)
# 将配置参数注册到对象中
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 初始化图像处理器
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 设置默认的采样大小
self.default_sample_size = self.unet.config.sample_size
# 如果未指定水印标志,则检查水印可用性
add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
# 如果需要添加水印,则初始化水印对象
if add_watermarker:
self.watermark = StableDiffusionXLWatermarker()
else:
# 否则将水印设置为 None
self.watermark = None
# 设置应用层
self.set_pag_applied_layers(pag_applied_layers)
# 从 StableDiffusionXLPipeline 复制的编码提示函数
def encode_prompt(
# 输入提示
self,
prompt: str,
# 可选的第二个提示
prompt_2: Optional[str] = None,
# 可选的设备配置
device: Optional[torch.device] = None,
# 每个提示生成的图像数量
num_images_per_prompt: int = 1,
# 是否进行无分类器引导
do_classifier_free_guidance: bool = True,
# 可选的负提示
negative_prompt: Optional[str] = None,
# 可选的第二个负提示
negative_prompt_2: Optional[str] = None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的池化提示嵌入
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负池化提示嵌入
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 Lora 缩放因子
lora_scale: Optional[float] = None,
# 可选的剪辑跳过参数
clip_skip: Optional[int] = None,
# 从 StableDiffusionPipeline 复制的编码图像函数
# 定义编码图像的方法,输入图像、设备、每个提示的图像数量及可选的隐藏状态输出
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 检查输入是否为张量,如果不是则通过特征提取器转换为张量
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定设备并转换为指定数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 获取图像编码器的隐藏状态并选择倒数第二层
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 根据每个提示数量重复隐藏状态
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 创建一个与图像相同形状的零张量,获取其编码器的隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 根据每个提示数量重复未条件化的隐藏状态
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回编码图像和未条件化图像的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 如果不需要隐藏状态,直接获取图像的嵌入表示
image_embeds = self.image_encoder(image).image_embeds
# 根据每个提示数量重复图像嵌入
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入相同形状的零张量作为未条件化图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回图像嵌入和未条件化图像嵌入
return image_embeds, uncond_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的方法
def prepare_ip_adapter_image_embeds(
# 定义方法参数,包括适配器图像、图像嵌入、设备、每个提示的图像数量和分类器自由引导标志
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 初始化一个空列表,用于存储图像嵌入
image_embeds = []
# 如果启用分类器自由引导,则初始化一个空列表用于负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果输入适配器图像嵌入为空
if ip_adapter_image_embeds is None:
# 如果输入适配器图像不是列表,则将其转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查输入适配器图像的长度是否与 IP 适配器的数量相同
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
raise ValueError(
# 抛出错误,说明输入图像数量与 IP 适配器数量不匹配
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历输入适配器图像和对应的图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 检查图像投影层是否为 ImageProjection 类型
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个适配器图像,获取嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将单个图像嵌入添加到图像嵌入列表中
image_embeds.append(single_image_embeds[None, :])
# 如果启用分类器自由引导,添加负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 遍历已存在的输入适配器图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用分类器自由引导,则拆分图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
# 将负图像嵌入添加到列表中
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化一个空列表,用于存储最终的适配器图像嵌入
ip_adapter_image_embeds = []
# 遍历图像嵌入的索引和内容
for i, single_image_embeds in enumerate(image_embeds):
# 将单个图像嵌入按每个提示的数量进行扩展
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用分类器自由引导,扩展负图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负图像嵌入和正图像嵌入连接
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入移动到指定设备
single_image_embeds = single_image_embeds.to(device=device)
# 添加到最终的适配器图像嵌入列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回最终的适配器图像嵌入列表
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 中复制
# 准备调度器步骤的额外参数,因不同调度器的参数签名可能不同
def prepare_extra_step_kwargs(self, generator, eta):
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器会忽略该参数
# eta 对应 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# eta 应在 [0, 1] 范围内
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建一个字典用于存储额外的步骤参数
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外步骤参数的字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl.StableDiffusionXLPipeline.check_inputs 复制而来
def check_inputs(
# 定义方法所需的输入参数
self,
prompt, # 主提示词
prompt_2, # 备用提示词
height, # 图像高度
width, # 图像宽度
callback_steps, # 回调步骤
negative_prompt=None, # 负提示词
negative_prompt_2=None, # 备用负提示词
prompt_embeds=None, # 提示词嵌入
negative_prompt_embeds=None, # 负提示词嵌入
pooled_prompt_embeds=None, # 池化的提示词嵌入
negative_pooled_prompt_embeds=None, # 负池化的提示词嵌入
ip_adapter_image=None, # 图像适配器输入
ip_adapter_image_embeds=None, # 图像适配器嵌入
callback_on_step_end_tensor_inputs=None, # 步骤结束时的张量输入
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制而来
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义拉丁变换的形状
shape = (
batch_size, # 批次大小
num_channels_latents, # 拉丁变量通道数
int(height) // self.vae_scale_factor, # 高度按 VAE 缩放因子调整
int(width) // self.vae_scale_factor, # 宽度按 VAE 缩放因子调整
)
# 如果生成器是列表且其长度不等于批次大小,则抛出错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果拉丁变量为空,则生成随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果已有拉丁变量,则将其移动到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的拉丁变量
return latents
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl.StableDiffusionXLPipeline._get_add_time_ids 复制而来
def _get_add_time_ids(
self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
# 生成添加时间 ID 列表,包括原始大小、裁剪坐标和目标大小
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 计算通过添加时间嵌入维度和文本编码器投影维度得到的总维度
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取模型期望的添加时间嵌入维度
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
# 检查期望维度与实际维度是否一致
if expected_add_embed_dim != passed_add_embed_dim:
# 抛出错误,提示模型配置不正确
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将添加时间 ID 转换为张量,指定数据类型
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
# 返回添加时间 ID 张量
return add_time_ids
# 从 StableDiffusionXLPipeline 中复制的方法,用于上采样 VAE
def upcast_vae(self):
# 获取 VAE 的数据类型
dtype = self.vae.dtype
# 将 VAE 转换为 float32 类型
self.vae.to(dtype=torch.float32)
# 检查是否使用了 Torch 2.0 或 XFormers 的注意力处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
FusedAttnProcessor2_0,
),
)
# 如果使用了 XFormers 或 Torch 2.0,注意力块不需要为 float32,节省内存
if use_torch_2_0_or_xformers:
# 将后量化卷积层转换为指定数据类型
self.vae.post_quant_conv.to(dtype)
# 将输入卷积层转换为指定数据类型
self.vae.decoder.conv_in.to(dtype)
# 将中间块转换为指定数据类型
self.vae.decoder.mid_block.to(dtype)
# 从 LatentConsistencyModelPipeline 中复制的方法,用于获取引导缩放嵌入
def get_guidance_scale_embedding(
# 输入参数:权重张量、嵌入维度(默认为512)、数据类型(默认为 float32)
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor: # 声明返回类型为 torch.Tensor
""" # 开始文档字符串
See https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298 # 文档字符串中提供的链接
Args: # 参数说明部分
w (`torch.Tensor`): # 输入参数 w,类型为 torch.Tensor
Generate embedding vectors with a specified guidance scale to subsequently enrich timestep embeddings. # 描述输入参数的作用
embedding_dim (`int`, *optional*, defaults to 512): # 可选参数 embedding_dim,默认值为 512
Dimension of the embeddings to generate. # 描述 embedding_dim 的作用
dtype (`torch.dtype`, *optional*, defaults to `torch.float32`): # 可选参数 dtype,默认值为 torch.float32
Data type of the generated embeddings. # 描述 dtype 的作用
Returns: # 返回值说明部分
`torch.Tensor`: Embedding vectors with shape `(len(w), embedding_dim)`. # 返回一个形状为 (len(w), embedding_dim) 的 tensor
""" # 结束文档字符串
assert len(w.shape) == 1 # 确保 w 的形状是一维的
w = w * 1000.0 # 将 w 的值放大 1000 倍
half_dim = embedding_dim // 2 # 计算 embedding_dim 的一半
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1) # 计算对数缩放因子
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb) # 生成基于指数衰减的嵌入向量
emb = w.to(dtype)[:, None] * emb[None, :] # 将 w 转换为指定数据类型,并与 emb 进行广播相乘
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) # 将正弦和余弦值沿维度 1 连接
if embedding_dim % 2 == 1: # 如果 embedding_dim 是奇数
emb = torch.nn.functional.pad(emb, (0, 1)) # 在最后一维填充零以保持形状一致
assert emb.shape == (w.shape[0], embedding_dim) # 确保最终嵌入的形状正确
return emb # 返回生成的嵌入
@property # 将方法定义为属性
def guidance_scale(self): # 定义 guidance_scale 属性
return self._guidance_scale # 返回存储的指导比例
@property # 将方法定义为属性
def guidance_rescale(self): # 定义 guidance_rescale 属性
return self._guidance_rescale # 返回存储的重新调整比例
@property # 将方法定义为属性
def clip_skip(self): # 定义 clip_skip 属性
return self._clip_skip # 返回存储的剪辑跳过值
# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # 说明 guidance_scale 类似于方程 (2) 中的指导权重 w
# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # 说明 guidance_scale = 1 时无分类器自由引导
# corresponds to doing no classifier free guidance. # 进一步解释
@property # 将方法定义为属性
def do_classifier_free_guidance(self): # 定义 do_classifier_free_guidance 属性
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None # 返回是否进行分类器自由引导的布尔值
@property # 将方法定义为属性
def cross_attention_kwargs(self): # 定义 cross_attention_kwargs 属性
return self._cross_attention_kwargs # 返回交叉注意力的关键字参数
@property # 将方法定义为属性
def denoising_end(self): # 定义 denoising_end 属性
return self._denoising_end # 返回去噪结束的标记
@property # 将方法定义为属性
def num_timesteps(self): # 定义 num_timesteps 属性
return self._num_timesteps # 返回时间步数
@property # 将方法定义为属性
def interrupt(self): # 定义 interrupt 属性
return self._interrupt # 返回中断标志
@torch.no_grad() # 指定后续操作不计算梯度
@replace_example_docstring(EXAMPLE_DOC_STRING) # 使用装饰器替换示例文档字符串
# 定义可调用对象的方法,接受多个参数以生成图像或处理输入
def __call__(
# 主提示文本,可以是单个字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 第二个提示文本,默认为 None
prompt_2: Optional[Union[str, List[str]]] = None,
# 图像的高度,默认为 None
height: Optional[int] = None,
# 图像的宽度,默认为 None
width: Optional[int] = None,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 指定的时间步列表,默认为 None
timesteps: List[int] = None,
# 噪声级别的列表,默认为 None
sigmas: List[float] = None,
# 去噪结束值,默认为 None
denoising_end: Optional[float] = None,
# 引导比例,默认为 5.0
guidance_scale: float = 5.0,
# 负面提示文本,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 第二个负面提示文本,默认为 None
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 额外的超参数,默认为 0.0
eta: float = 0.0,
# 随机数生成器,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在表示,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示嵌入,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负面提示嵌入,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 池化的提示嵌入,默认为 None
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 负面池化提示嵌入,默认为 None
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 图像适配器的输入,默认为 None
ip_adapter_image: Optional[PipelineImageInput] = None,
# 图像适配器的嵌入列表,默认为 None
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典,默认为 True
return_dict: bool = True,
# 交叉注意力参数,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 引导重标定,默认为 0.0
guidance_rescale: float = 0.0,
# 原始图像的大小,默认为 None
original_size: Optional[Tuple[int, int]] = None,
# 图像左上角的裁剪坐标,默认为 (0, 0)
crops_coords_top_left: Tuple[int, int] = (0, 0),
# 目标大小,默认为 None
target_size: Optional[Tuple[int, int]] = None,
# 负面图像的原始大小,默认为 None
negative_original_size: Optional[Tuple[int, int]] = None,
# 负面图像左上角的裁剪坐标,默认为 (0, 0)
negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
# 负面图像的目标大小,默认为 None
negative_target_size: Optional[Tuple[int, int]] = None,
# 跳过的剪辑数量,默认为 None
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数,默认为 None
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 步骤结束时的张量输入回调,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# PAG 缩放因子,默认为 3.0
pag_scale: float = 3.0,
# 自适应 PAG 缩放因子,默认为 0.0
pag_adaptive_scale: float = 0.0,
.\diffusers\pipelines\pag\pipeline_pag_sd_xl_img2img.py
# 版权声明,标识文件归 HuggingFace 团队所有
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下网址获得许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件
# 是在“按现状”基础上分发的,不提供任何明示或暗示的担保或条件。
# 请参见许可证以了解管理权限和
# 限制的具体条款。
# 导入 inspect 模块以检查对象的类型
import inspect
# 从 typing 模块导入所需的类型注解
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PIL.Image 用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 从 transformers 库中导入必要的 CLIP 模型和处理器
from transformers import (
CLIPImageProcessor, # 图像处理器
CLIPTextModel, # 文本模型
CLIPTextModelWithProjection, # 带投影的文本模型
CLIPTokenizer, # 分词器
CLIPVisionModelWithProjection, # 带投影的视觉模型
)
# 从相对路径导入各类回调和图像处理器
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import (
FromSingleFileMixin, # 单文件加载混合器
IPAdapterMixin, # IP 适配器混合器
StableDiffusionXLLoraLoaderMixin, # 稳定扩散 XL Lora 加载器混合器
TextualInversionLoaderMixin, # 文本反转加载器混合器
)
# 从模型模块导入必要的模型
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
from ...models.attention_processor import (
AttnProcessor2_0, # 注意力处理器 2.0
XFormersAttnProcessor, # XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder # 调整 Lora 的文本编码器比例
from ...schedulers import KarrasDiffusionSchedulers # Karras 扩散调度器
from ...utils import (
USE_PEFT_BACKEND, # 是否使用 PEFT 后端
is_invisible_watermark_available, # 检查不可见水印是否可用
is_torch_xla_available, # 检查 Torch XLA 是否可用
logging, # 日志记录模块
replace_example_docstring, # 替换示例文档字符串的工具
scale_lora_layers, # 缩放 Lora 层
unscale_lora_layers, # 反缩放 Lora 层
)
from ...utils.torch_utils import randn_tensor # 导入生成随机张量的工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和稳定扩散混合器
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput # 导入稳定扩散 XL 管道输出
from .pag_utils import PAGMixin # 导入 PAG 混合器
# 检查不可见水印是否可用,如果可用则导入水印模块
if is_invisible_watermark_available():
from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker
# 检查 Torch XLA 是否可用,如果可用则导入 XLA 模块
if is_torch_xla_available():
import torch_xla.core.xla_model as xm # 导入 XLA 核心模型
XLA_AVAILABLE = True # 设置 XLA 可用标志
else:
XLA_AVAILABLE = False # 设置 XLA 不可用标志
# 使用日志记录模块获取当前模块的记录器
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义示例文档字符串
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import AutoPipelineForImage2Image
>>> from diffusers.utils import load_image
>>> pipe = AutoPipelineForImage2Image.from_pretrained(
... "stabilityai/stable-diffusion-xl-refiner-1.0",
... torch_dtype=torch.float16,
... enable_pag=True,
... )
>>> pipe = pipe.to("cuda") # 将管道移动到 GPU
>>> url = "https://huggingface.co/datasets/patrickvonplaten/images/resolve/main/aa_xl/000000009.png"
>>> init_image = load_image(url).convert("RGB") # 加载并转换初始图像为 RGB
>>> prompt = "a photo of an astronaut riding a horse on mars" # 定义提示语
>>> image = pipe(prompt, image=init_image, pag_scale=0.3).images[0] # 生成图像
```py
"""
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
根据 `guidance_rescale` 对 `noise_cfg` 进行重新缩放。基于 [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 的发现。参见第 3.4 节
"""
# 计算噪声预测文本的标准差,维度保持不变
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
# 计算噪声配置的标准差,维度保持不变
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# 根据标准差缩放噪声预测,修复过度曝光问题
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# 根据指导的缩放因子,将缩放后的噪声与原始噪声混合,避免生成“平淡”的图像
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
# 返回重新缩放后的噪声配置
return noise_cfg
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制
def retrieve_latents(
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
# 如果 encoder_output 包含 latent_dist 且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从 latent_dist 中采样
return encoder_output.latent_dist.sample(generator)
# 如果 encoder_output 包含 latent_dist 且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的众数
return encoder_output.latent_dist.mode()
# 如果 encoder_output 包含 latents
elif hasattr(encoder_output, "latents"):
# 返回 encoder_output 的 latents
return encoder_output.latents
# 如果都不满足,抛出属性错误
else:
raise AttributeError("Could not access latents of provided encoder_output")
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法并在调用后从调度器中检索时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。
# 函数参数说明
Args:
scheduler (`SchedulerMixin`):
# 用于获取时间步的调度器
The scheduler to get timesteps from.
num_inference_steps (`int`):
# 生成样本时使用的扩散步数。如果使用此参数,则 `timesteps` 必须为 `None`
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
# 用于移动时间步的设备。如果为 `None`,则时间步不会被移动
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
# 自定义时间步,覆盖调度器的时间步间隔策略。如果传入 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
# 自定义 sigma,覆盖调度器的时间步间隔策略。如果传入 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
# 返回值说明
Returns:
`Tuple[torch.Tensor, int]`:
# 返回一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步数
A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
# 检查是否同时传入了 `timesteps` 和 `sigmas`
if timesteps is not None and sigmas is not None:
# 如果同时传入,抛出错误提示
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查是否传入了 `timesteps`
if timesteps is not None:
# 检查当前调度器类是否支持自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accepts_timesteps:
# 如果不支持,抛出错误提示
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器中获取设置后的时间步
timesteps = scheduler.timesteps
# 计算推理步数
num_inference_steps = len(timesteps)
# 检查是否传入了 `sigmas`
elif sigmas is not None:
# 检查当前调度器类是否支持自定义 sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
if not accept_sigmas:
# 如果不支持,抛出错误提示
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器中获取设置后的时间步
timesteps = scheduler.timesteps
# 计算推理步数
num_inference_steps = len(timesteps)
# 如果既没有传入 `timesteps` 也没有 `sigmas`
else:
# 根据推理步数设置时间步
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 从调度器中获取设置后的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步数
return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionXLPAGImg2ImgPipeline 的类,继承多个混入类
class StableDiffusionXLPAGImg2ImgPipeline(
# 继承自 DiffusionPipeline 类,提供基本的扩散管道功能
DiffusionPipeline,
# 继承自 StableDiffusionMixin 类,提供与 Stable Diffusion 相关的功能
StableDiffusionMixin,
# 继承自 TextualInversionLoaderMixin 类,提供文本反演加载功能
TextualInversionLoaderMixin,
# 继承自 FromSingleFileMixin 类,提供从单个文件加载的功能
FromSingleFileMixin,
# 继承自 StableDiffusionXLLoraLoaderMixin 类,提供加载 LoRA 权重的功能
StableDiffusionXLLoraLoaderMixin,
# 继承自 IPAdapterMixin 类,提供 IP 适配器加载功能
IPAdapterMixin,
# 继承自 PAGMixin 类,提供 PAG 相关的功能
PAGMixin,
):
# 文档字符串,描述该管道的用途和功能
r"""
Pipeline for text-to-image generation using Stable Diffusion XL.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)
The pipeline also inherits the following loading methods:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
- [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
- [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
# 文档字符串,描述各参数的功能和用途
Args:
vae ([`AutoencoderKL`]):
变分自编码器(VAE)模型,用于对图像进行编码和解码,生成潜在表示。
text_encoder ([`CLIPTextModel`]):
冻结的文本编码器。Stable Diffusion XL 使用 CLIP 的文本部分,
特别是 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
text_encoder_2 ([` CLIPTextModelWithProjection`]):
第二个冻结的文本编码器。Stable Diffusion XL 使用 CLIP 的文本和池部分,
特别是 [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k) 变体。
tokenizer (`CLIPTokenizer`):
`CLIPTokenizer` 类的分词器。
tokenizer_2 (`CLIPTokenizer`):
第二个 `CLIPTokenizer` 类的分词器。
unet ([`UNet2DConditionModel`]): 条件 U-Net 架构,用于去噪编码后的图像潜在表示。
scheduler ([`SchedulerMixin`]):
与 `unet` 一起使用的调度器,用于去噪编码后的图像潜在表示。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
requires_aesthetics_score (`bool`, *optional*, defaults to `"False"`):
`unet` 是否需要在推理过程中传递 `aesthetic_score` 条件。另见
`stabilityai/stable-diffusion-xl-refiner-1-0` 的配置。
force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
是否强制将负提示嵌入始终设置为 0。另见
`stabilityai/stable-diffusion-xl-base-1-0` 的配置。
add_watermarker (`bool`, *optional*):
是否使用 [invisible_watermark 库](https://github.com/ShieldMnt/invisible-watermark/) 对输出图像进行水印。如果未定义,默认将设置为 True(如果安装了该包),否则不使用水印。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
# 定义可选组件列表
_optional_components = [
"tokenizer", # 第一个分词器
"tokenizer_2", # 第二个分词器
"text_encoder", # 第一个文本编码器
"text_encoder_2", # 第二个文本编码器
"image_encoder", # 图像编码器
"feature_extractor", # 特征提取器
]
# 定义回调张量输入的名称列表
_callback_tensor_inputs = [
"latents", # 潜在表示
"prompt_embeds", # 提示嵌入
"negative_prompt_embeds", # 负向提示嵌入
"add_text_embeds", # 添加的文本嵌入
"add_time_ids", # 添加的时间标识
"negative_pooled_prompt_embeds", # 负向池化提示嵌入
"add_neg_time_ids", # 添加的负向时间标识
]
# 初始化方法,定义模型的各个组件
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器
text_encoder: CLIPTextModel, # 文本编码器
text_encoder_2: CLIPTextModelWithProjection, # 第二个文本编码器,带投影
tokenizer: CLIPTokenizer, # 文本分词器
tokenizer_2: CLIPTokenizer, # 第二个文本分词器
unet: UNet2DConditionModel, # UNet2D条件模型
scheduler: KarrasDiffusionSchedulers, # Karras扩散调度器
image_encoder: CLIPVisionModelWithProjection = None, # 图像编码器,带投影,默认为None
feature_extractor: CLIPImageProcessor = None, # 特征提取器,默认为None
requires_aesthetics_score: bool = False, # 是否需要美学评分
force_zeros_for_empty_prompt: bool = True, # 是否强制将空提示的值设为零
add_watermarker: Optional[bool] = None, # 添加水印的选项
pag_applied_layers: Union[str, List[str]] = "mid", # 应用层的选项,默认为"mid"
):
# 调用父类的初始化方法
super().__init__()
# 注册模型的各个模块
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
image_encoder=image_encoder,
feature_extractor=feature_extractor,
scheduler=scheduler,
)
# 将配置信息注册到类中
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
# 计算变分自编码器的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器实例
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 如果没有指定水印选项,检查水印是否可用
add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
# 根据水印选项创建水印对象
if add_watermarker:
self.watermark = StableDiffusionXLWatermarker() # 创建水印实例
else:
self.watermark = None # 不使用水印
# 设置应用的层
self.set_pag_applied_layers(pag_applied_layers)
# 从StableDiffusionXLPipeline复制的编码提示方法
def encode_prompt(
self,
prompt: str, # 主要提示字符串
prompt_2: Optional[str] = None, # 第二个提示字符串,默认为None
device: Optional[torch.device] = None, # 指定的设备,默认为None
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
do_classifier_free_guidance: bool = True, # 是否使用无分类器自由引导
negative_prompt: Optional[str] = None, # 负向提示字符串,默认为None
negative_prompt_2: Optional[str] = None, # 第二个负向提示字符串,默认为None
prompt_embeds: Optional[torch.Tensor] = None, # 提示嵌入的张量,默认为None
negative_prompt_embeds: Optional[torch.Tensor] = None, # 负向提示嵌入的张量,默认为None
pooled_prompt_embeds: Optional[torch.Tensor] = None, # 池化提示嵌入的张量,默认为None
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None, # 负向池化提示嵌入的张量,默认为None
lora_scale: Optional[float] = None, # LoRA缩放因子,默认为None
clip_skip: Optional[int] = None, # 跳过的CLIP层数,默认为None
# 从StableDiffusionPipeline复制的准备额外步骤关键字的方法
# 为调度器步骤准备额外的关键字参数,不同调度器的签名可能不同
def prepare_extra_step_kwargs(self, generator, eta):
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化一个空字典用于存储额外的步骤关键字参数
extra_step_kwargs = {}
# 如果调度器接受 eta,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline.check_inputs 复制而来
def check_inputs(
self,
prompt,
prompt_2,
strength,
num_inference_steps,
callback_steps,
# 可选的负向提示参数
negative_prompt=None,
negative_prompt_2=None,
# 提示嵌入参数
prompt_embeds=None,
negative_prompt_embeds=None,
# 输入适配器图像及其嵌入参数
ip_adapter_image=None,
ip_adapter_image_embeds=None,
# 处理步骤结束时的回调输入
callback_on_step_end_tensor_inputs=None,
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline.get_timesteps 复制而来
# 获取时间步,考虑推理步骤、强度、设备及去噪起始时间
def get_timesteps(self, num_inference_steps, strength, device, denoising_start=None):
# 根据去噪起始时间确定初始时间步
if denoising_start is None:
# 计算初始时间步,确保不超过推理步骤总数
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算开始时间步,确保不小于零
t_start = max(num_inference_steps - init_timestep, 0)
else:
# 如果有去噪起始时间,开始时间步设为零
t_start = 0
# 从调度器获取时间步,从计算的起始时间步开始
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果请求特定时间步,强度不再相关,由去噪起始时间决定
if denoising_start is not None:
# 计算离散时间步截止点
discrete_timestep_cutoff = int(
round(
self.scheduler.config.num_train_timesteps
- (denoising_start * self.scheduler.config.num_train_timesteps)
)
)
# 计算在截止点之前的推理步骤数量
num_inference_steps = (timesteps < discrete_timestep_cutoff).sum().item()
if self.scheduler.order == 2 and num_inference_steps % 2 == 0:
# 如果调度器为二阶调度器,调整推理步骤数量以避免错误
num_inference_steps = num_inference_steps + 1
# 从时间步数组末尾切片,确保符合推理步骤数量
timesteps = timesteps[-num_inference_steps:]
return timesteps, num_inference_steps
# 返回时间步和推理步骤数,考虑开始时间步
return timesteps, num_inference_steps - t_start
# 从图像生成管道准备潜变量的函数,参数包括图像、时间步、批量大小等
def prepare_latents(
self, image, timestep, batch_size, num_images_per_prompt, dtype, device, generator=None, add_noise=True
# 从图像编码管道复制的函数
# 编码图像并返回图像嵌入或隐藏状态
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入的图像不是张量,使用特征提取器处理图像
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像转移到指定设备并转换为目标数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态,进行编码并处理隐藏状态
if output_hidden_states:
# 获取图像的隐藏状态,并选取倒数第二层的输出
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 重复隐藏状态以适应每个提示的图像数量
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 对于无条件图像,使用零张量进行编码并获取隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 重复无条件隐藏状态以适应每个提示的图像数量
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回图像和无条件图像的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 编码图像并获取图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 重复图像嵌入以适应每个提示的图像数量
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入相同形状的零张量作为无条件图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回图像嵌入和无条件图像嵌入
return image_embeds, uncond_image_embeds
# 从 StableDiffusionPipeline 复制的函数,用于准备 IP 适配器的图像嵌入
def prepare_ip_adapter_image_embeds(
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
# 定义处理图像嵌入的代码块
):
# 初始化图像嵌入列表
image_embeds = []
# 如果启用无分类器引导,则初始化负图像嵌入列表
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果输入适配器图像嵌入为 None
if ip_adapter_image_embeds is None:
# 检查输入适配器图像是否为列表
if not isinstance(ip_adapter_image, list):
# 如果不是,转换为单元素列表
ip_adapter_image = [ip_adapter_image]
# 确保输入适配器图像的数量与 IP 适配器数量相同
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
# 如果不匹配,抛出值错误
raise ValueError(
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历每个适配器图像及其对应的图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 判断输出隐藏状态是否为布尔值
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像,获取图像嵌入和负图像嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds[None, :])
# 如果启用无分类器引导,添加负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 遍历已存在的图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用无分类器引导,则分离负图像嵌入和图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化输入适配器图像嵌入列表
ip_adapter_image_embeds = []
# 遍历图像嵌入及其索引
for i, single_image_embeds in enumerate(image_embeds):
# 将单个图像嵌入重复指定次数
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用无分类器引导,重复负图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负图像嵌入与图像嵌入连接
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入移动到指定设备
single_image_embeds = single_image_embeds.to(device=device)
# 将处理后的图像嵌入添加到列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回输入适配器图像嵌入列表
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline 复制的函数
def _get_add_time_ids(
self,
original_size,
crops_coords_top_left,
target_size,
aesthetic_score,
negative_aesthetic_score,
negative_original_size,
negative_crops_coords_top_left,
negative_target_size,
dtype,
text_encoder_projection_dim=None,
):
# 检查配置是否需要美学评分
if self.config.requires_aesthetics_score:
# 生成包含原始大小、裁剪坐标和美学评分的列表
add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
# 生成包含负样本的原始大小、裁剪坐标和负美学评分的列表
add_neg_time_ids = list(
negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
)
else:
# 生成包含原始大小、裁剪坐标和目标大小的列表
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 生成包含负样本的原始大小、裁剪坐标和负目标大小的列表
add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)
# 计算通过时间嵌入的维度与文本编码器投影维度的乘积
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取模型预期的时间嵌入维度
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
# 检查预期的嵌入维度是否大于实际传入的维度,并确保差值等于配置中的时间嵌入维度
if (
expected_add_embed_dim > passed_add_embed_dim
and (expected_add_embed_dim - passed_add_embed_dim) == self.unet.config.addition_time_embed_dim
):
# 如果条件不满足,抛出错误并提示用户检查配置
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to enable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=True)` to make sure `aesthetic_score` {aesthetic_score} and `negative_aesthetic_score` {negative_aesthetic_score} is correctly used by the model."
)
# 检查预期的嵌入维度是否小于实际传入的维度,并确保差值等于配置中的时间嵌入维度
elif (
expected_add_embed_dim < passed_add_embed_dim
and (passed_add_embed_dim - expected_add_embed_dim) == self.unet.config.addition_time_embed_dim
):
# 如果条件不满足,抛出错误并提示用户检查配置
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to disable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=False)` to make sure `target_size` {target_size} is correctly used by the model."
)
# 检查预期的嵌入维度是否与实际传入的维度不相等
elif expected_add_embed_dim != passed_add_embed_dim:
# 如果条件不满足,抛出错误并提示用户检查配置
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将时间 ID 列表转换为张量,并指定数据类型
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
# 将负时间 ID 列表转换为张量,并指定数据类型
add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)
# 返回生成的时间 ID 和负时间 ID 张量
return add_time_ids, add_neg_time_ids
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.StableDiffusionUpscalePipeline.upcast_vae 复制
# 将 VAE 模型的参数类型转换为指定的数据类型
def upcast_vae(self):
# 获取当前 VAE 模型的参数数据类型
dtype = self.vae.dtype
# 将 VAE 模型的参数转换为 float32 类型
self.vae.to(dtype=torch.float32)
# 判断是否使用了 Torch 2.0 或 xformers 的注意力处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
),
)
# 如果使用 xformers 或 Torch 2.0,则注意力块不需要使用 float32
# 这可以节省大量内存
if use_torch_2_0_or_xformers:
# 将后量化卷积层的参数转换为原始数据类型
self.vae.post_quant_conv.to(dtype)
# 将输入卷积层的参数转换为原始数据类型
self.vae.decoder.conv_in.to(dtype)
# 将中间块的参数转换为原始数据类型
self.vae.decoder.mid_block.to(dtype)
# 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 中复制的方法
def get_guidance_scale_embedding(
# 输入参数:指导权重的张量
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
"""
参考链接:https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
参数:
w (`torch.Tensor`):
生成具有指定指导比例的嵌入向量,以随后丰富时间步长嵌入。
embedding_dim (`int`, *可选*, 默认为 512):
要生成的嵌入的维度。
dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
生成的嵌入的数据类型。
返回:
`torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
"""
# 确保输入张量 w 只有一个维度
assert len(w.shape) == 1
# 将 w 的值放大 1000 倍
w = w * 1000.0
# 计算嵌入维度的一半
half_dim = embedding_dim // 2
# 计算用于生成嵌入的基础值
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 生成用于时间步长嵌入的指数值
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 计算最终的嵌入向量
emb = w.to(dtype)[:, None] * emb[None, :]
# 生成 sin 和 cos 的组合嵌入
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度为奇数,则在最后填充一个零
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保生成的嵌入形状与预期一致
assert emb.shape == (w.shape[0], embedding_dim)
# 返回生成的嵌入向量
return emb
# 属性:获取指导比例
@property
def guidance_scale(self):
return self._guidance_scale
# 属性:获取指导重标定
@property
def guidance_rescale(self):
return self._guidance_rescale
# 属性:获取剪切跳过参数
@property
def clip_skip(self):
return self._clip_skip
# 这里的 `guidance_scale` 类似于方程 (2) 中的指导权重 `w`
# 来自于 Imagen 论文:https://arxiv.org/pdf/2205.11487.pdf ,`guidance_scale = 1`
# 表示不进行无分类器引导。
@property
def do_classifier_free_guidance(self):
# 判断是否需要进行无分类器引导
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 属性:获取交叉注意力的关键字参数
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 属性:获取去噪结束的参数
@property
def denoising_end(self):
return self._denoising_end
# 属性定义尚未完成
# 定义一个方法,返回当前去噪开始的阈值
def denoising_start(self):
return self._denoising_start
# 定义属性,返回时间步数
@property
def num_timesteps(self):
return self._num_timesteps
# 定义属性,返回中断状态
@property
def interrupt(self):
return self._interrupt
# 使用装饰器禁用梯度计算以节省内存
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用方法,处理各种输入参数
def __call__(
# 提示文本,可以是字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 第二个提示文本,可以是字符串或字符串列表
prompt_2: Optional[Union[str, List[str]]] = None,
# 输入图像
image: PipelineImageInput = None,
# 去噪强度,默认值为0.3
strength: float = 0.3,
# 推理步骤数,默认值为50
num_inference_steps: int = 50,
# 时间步列表
timesteps: List[int] = None,
# Sigma值列表
sigmas: List[float] = None,
# 可选的去噪开始阈值
denoising_start: Optional[float] = None,
# 可选的去噪结束阈值
denoising_end: Optional[float] = None,
# 指导比例,默认值为5.0
guidance_scale: float = 5.0,
# 可选的负面提示文本
negative_prompt: Optional[Union[str, List[str]]] = None,
# 可选的第二个负面提示文本
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# 预设的η值,默认值为0.0
eta: float = 0.0,
# 随机数生成器,可选
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的潜在张量
latents: Optional[torch.Tensor] = None,
# 可选的提示嵌入张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的聚合提示嵌入张量
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面聚合提示嵌入张量
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的图像适配器输入
ip_adapter_image: Optional[PipelineImageInput] = None,
# 可选的图像适配器嵌入张量列表
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认值为"pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式,默认为True
return_dict: bool = True,
# 可选的跨注意力参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 指导重标定值,默认值为0.0
guidance_rescale: float = 0.0,
# 原始图像的尺寸
original_size: Tuple[int, int] = None,
# 裁剪左上角坐标,默认值为(0, 0)
crops_coords_top_left: Tuple[int, int] = (0, 0),
# 目标图像的尺寸
target_size: Tuple[int, int] = None,
# 可选的负面原始尺寸
negative_original_size: Optional[Tuple[int, int]] = None,
# 可选的负面裁剪左上角坐标,默认值为(0, 0)
negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
# 可选的负面目标尺寸
negative_target_size: Optional[Tuple[int, int]] = None,
# 审美评分,默认值为6.0
aesthetic_score: float = 6.0,
# 负面审美评分,默认值为2.5
negative_aesthetic_score: float = 2.5,
# 可选的剪切跳过参数
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数,可选
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 结束步骤时的张量输入回调参数,默认为["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# pag比例,默认值为3.0
pag_scale: float = 3.0,
# pag自适应比例,默认值为0.0
pag_adaptive_scale: float = 0.0,
.\diffusers\pipelines\pag\pipeline_pag_sd_xl_inpaint.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,软件按“原样”分发,
# 不提供任何形式的保证或条件,无论是明示还是暗示。
# 有关许可证下的特定权限和限制,请参阅许可证。
import inspect # 导入 inspect 模块以检查对象的活动
from typing import Any, Callable, Dict, List, Optional, Tuple, Union # 导入类型提示工具
import PIL.Image # 导入 PIL 库中的 Image 模块,用于图像处理
import torch # 导入 PyTorch 库
from transformers import ( # 从 transformers 库中导入必要的类和函数
CLIPImageProcessor, # 导入 CLIP 图像处理器
CLIPTextModel, # 导入 CLIP 文本模型
CLIPTextModelWithProjection, # 导入带投影的 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 分词器
CLIPVisionModelWithProjection, # 导入带投影的 CLIP 视觉模型
)
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 导入多管道回调和管道回调
from ...image_processor import PipelineImageInput, VaeImageProcessor # 导入管道图像输入和 VAE 图像处理器
from ...loaders import ( # 从 loaders 模块导入多个加载器混入类
FromSingleFileMixin, # 从单文件加载混入类
IPAdapterMixin, # IP 适配器混入类
StableDiffusionXLLoraLoaderMixin, # 稳定扩散 XL Lora 加载混入类
TextualInversionLoaderMixin, # 文本反转加载混入类
)
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel # 导入各种模型
from ...models.attention_processor import ( # 从注意力处理器模块导入注意力处理器类
AttnProcessor2_0, # 版本 2.0 的注意力处理器
XFormersAttnProcessor, # XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 缩放文本编码器的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
from ...utils import ( # 导入多个工具函数
USE_PEFT_BACKEND, # 指示是否使用 PEFT 后端
is_invisible_watermark_available, # 检查是否可用隐形水印功能
is_torch_xla_available, # 检查是否可用 Torch XLA
logging, # 导入日志记录功能
replace_example_docstring, # 替换示例文档字符串的功能
scale_lora_layers, # 缩放 Lora 层的功能
unscale_lora_layers, # 取消缩放 Lora 层的功能
)
from ...utils.torch_utils import randn_tensor # 导入生成随机张量的工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和稳定扩散混入
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput # 导入稳定扩散 XL 管道输出
from .pag_utils import PAGMixin # 导入 PAG 混入类
if is_invisible_watermark_available(): # 检查隐形水印功能是否可用
from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker # 导入稳定扩散 XL 水印类
if is_torch_xla_available(): # 检查 Torch XLA 是否可用
import torch_xla.core.xla_model as xm # 导入 Torch XLA 核心模块
XLA_AVAILABLE = True # 设置 XLA 可用标志为 True
else:
XLA_AVAILABLE = False # 设置 XLA 可用标志为 False
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,命名为模块名
EXAMPLE_DOC_STRING = """ # 定义示例文档字符串的开始
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档字符串的结束
``` # 文档字符串的结束
```py # 文档
Examples:
```py
>>> import torch # 导入 PyTorch 库,用于深度学习计算
>>> from diffusers import AutoPipelineForInpainting # 从 diffusers 库导入自动化图像修复管道
>>> from diffusers.utils import load_image # 从 diffusers 库导入图像加载工具
>>> pipe = AutoPipelineForInpainting.from_pretrained( # 从预训练模型创建图像修复管道
... "stabilityai/stable-diffusion-xl-base-1.0", # 指定使用的预训练模型路径
... torch_dtype=torch.float16, # 设置使用的张量数据类型为 float16 以节省内存
... variant="fp16", # 指定模型变体为 fp16
... enable_pag=True, # 启用分页功能以优化性能
... )
>>> pipe.to("cuda") # 将管道移动到 GPU 以加速计算
>>> img_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png" # 定义初始图像的 URL
>>> mask_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png" # 定义掩码图像的 URL
>>> init_image = load_image(img_url).convert("RGB") # 从 URL 加载初始图像并转换为 RGB 格式
>>> mask_image = load_image(mask_url).convert("RGB") # 从 URL 加载掩码图像并转换为 RGB 格式
>>> prompt = "A majestic tiger sitting on a bench" # 定义用于图像生成的提示文本
>>> image = pipe( # 调用图像修复管道进行图像生成
... prompt=prompt, # 提供提示文本
... image=init_image, # 输入初始图像
... mask_image=mask_image, # 输入掩码图像
... num_inference_steps=50, # 设置推理步骤数
... strength=0.80, # 设置修复强度
... pag_scale=0.3, # 设置分页缩放比例
... ).images[0] # 获取生成的图像
```py
# 注释部分是代码示例的文档字符串,用于说明函数的目的和用法
"""
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg 复制而来
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
# 根据 guidance_rescale 对 noise_cfg 进行重新缩放,参考文献的第 3.4 节
"""
# 计算 noise_pred_text 的标准差,沿指定维度保持维度
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
# 计算 noise_cfg 的标准差,沿指定维度保持维度
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# 重新缩放来自指导的结果,以修正过度曝光
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# 按照指导缩放因子与原始结果混合,以避免产生“普通”外观的图像
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
# 返回重新缩放后的噪声配置
return noise_cfg
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制而来
def retrieve_latents(
# 输入的编码器输出,类型为 torch.Tensor,可选的随机生成器,默认采样模式为 "sample"
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
# 检查 encoder_output 是否具有 latent_dist 属性,并且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 返回从 latent_dist 中采样的潜在变量
return encoder_output.latent_dist.sample(generator)
# 检查 encoder_output 是否具有 latent_dist 属性,并且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的众数
return encoder_output.latent_dist.mode()
# 检查 encoder_output 是否具有 latents 属性
elif hasattr(encoder_output, "latents"):
# 返回 encoder_output 中的潜在变量
return encoder_output.latents
# 如果没有找到所需的属性,抛出异常
else:
raise AttributeError("Could not access latents of provided encoder_output")
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps 复制而来
def retrieve_timesteps(
# 调度器对象,推理步数,可选设备,时间步数,标准差列表,其他关键字参数
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
# 调用调度器的 set_timesteps 方法,并在调用后从调度器获取时间步数
"""
# 处理自定义时间步数,将任何关键字参数传递给 scheduler.set_timesteps 方法
# 定义函数的参数说明
Args:
scheduler (`SchedulerMixin`):
# 要从中获取时间步的调度器
The scheduler to get timesteps from.
num_inference_steps (`int`):
# 生成样本时使用的扩散步数。如果使用该参数,则 `timesteps` 必须为 `None`
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
# 指定时间步要移动到的设备。如果为 `None`,则时间步不会移动
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
# 自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递了 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
# 自定义 sigma 值,用于覆盖调度器的时间步间隔策略。如果传递了 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
# 函数返回值说明
Returns:
`Tuple[torch.Tensor, int]`:
# 返回一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步数
A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
# 检查是否同时传递了时间步和 sigma
if timesteps is not None and sigmas is not None:
# 如果同时传递,抛出错误
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传递了时间步
if timesteps is not None:
# 检查调度器是否接受自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步数
num_inference_steps = len(timesteps)
# 如果传递了 sigma
elif sigmas is not None:
# 检查调度器是否接受自定义 sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步数
num_inference_steps = len(timesteps)
# 如果没有传递时间步和 sigma
else:
# 设置调度器的时间步数
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步数
return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionXLPAGInpaintPipeline 的类,继承多个混合类
class StableDiffusionXLPAGInpaintPipeline(
# 继承 DiffusionPipeline 类,提供基本的扩散管道功能
DiffusionPipeline,
# 继承 StableDiffusionMixin 类,提供稳定扩散的特性
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 类,提供文本反演加载功能
TextualInversionLoaderMixin,
# 继承 StableDiffusionXLLoraLoaderMixin 类,提供 LoRA 加载功能
StableDiffusionXLLoraLoaderMixin,
# 继承 FromSingleFileMixin 类,提供从单个文件加载的功能
FromSingleFileMixin,
# 继承 IPAdapterMixin 类,提供 IP 适配器加载功能
IPAdapterMixin,
# 继承 PAGMixin 类,提供 PAG 特性
PAGMixin,
):
# 文档字符串,说明该类用于使用 Stable Diffusion XL 进行文本到图像生成
r"""
Pipeline for text-to-image generation using Stable Diffusion XL.
# 说明该模型继承自 DiffusionPipeline,并提到可以查阅超类文档了解通用方法
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)
# 该管道还继承了一些加载方法的说明
The pipeline also inherits the following loading methods:
# 文本反演嵌入加载方法
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
# 从 .ckpt 文件加载的方法
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
# 加载 LoRA 权重的方法
- [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
# 保存 LoRA 权重的方法
- [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
# 加载 IP 适配器的方法
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
# 参数说明,提供每个参数的类型和功能
Args:
# 变分自编码器模型,用于对图像进行编码和解码
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
# 冻结的文本编码器,Stable Diffusion XL 使用 CLIP 的文本部分
text_encoder ([`CLIPTextModel`]):
Frozen text-encoder. Stable Diffusion XL uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
# 第二个冻结的文本编码器,使用 CLIP 的文本和池部分
text_encoder_2 ([` CLIPTextModelWithProjection`]):
Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
variant.
# CLIP 的分词器,用于将文本转换为模型可接受的格式
tokenizer (`CLIPTokenizer`):
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
# 第二个分词器,用于处理文本输入
tokenizer_2 (`CLIPTokenizer`):
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
# 条件 U-Net 架构,用于去噪编码后的图像潜表示
unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.
# 调度器,用于与 U-Net 结合去噪图像潜表示
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
# 是否需要在推断期间传递美学评分的条件
requires_aesthetics_score (`bool`, *optional*, defaults to `"False"`):
Whether the `unet` requires a aesthetic_score condition to be passed during inference. Also see the config
of `stabilityai/stable-diffusion-xl-refiner-1-0`.
# 是否强制将负提示嵌入设置为 0
force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
`stabilityai/stable-diffusion-xl-base-1-0`.
# 是否使用隐形水印库对输出图像进行水印处理
add_watermarker (`bool`, *optional*):
Whether to use the [invisible_watermark library](https://github.com/ShieldMnt/invisible-watermark/) to
watermark output images. If not defined, it will default to True if the package is installed, otherwise no
watermarker will be used.
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->image_encoder->unet->vae"
# 可选组件列表,包含所有可选参数的名称
_optional_components = [
"tokenizer",
"tokenizer_2",
"text_encoder",
"text_encoder_2",
"image_encoder",
"feature_extractor",
]
# 定义一个包含回调张量输入的列表
_callback_tensor_inputs = [
# 潜在变量
"latents",
# 提示词嵌入
"prompt_embeds",
# 负面提示词嵌入
"negative_prompt_embeds",
# 附加文本嵌入
"add_text_embeds",
# 附加时间ID
"add_time_ids",
# 负面聚合提示词嵌入
"negative_pooled_prompt_embeds",
# 附加负面时间ID
"add_neg_time_ids",
# 掩码
"mask",
# 被掩码的图像潜在变量
"masked_image_latents",
]
# 初始化函数,接受多个模型及参数
def __init__(
# VAE模型
vae: AutoencoderKL,
# 文本编码器模型
text_encoder: CLIPTextModel,
# 第二个文本编码器模型
text_encoder_2: CLIPTextModelWithProjection,
# 词汇编码器
tokenizer: CLIPTokenizer,
# 第二个词汇编码器
tokenizer_2: CLIPTokenizer,
# UNet模型
unet: UNet2DConditionModel,
# 调度器
scheduler: KarrasDiffusionSchedulers,
# 可选的图像编码器
image_encoder: CLIPVisionModelWithProjection = None,
# 可选的特征提取器
feature_extractor: CLIPImageProcessor = None,
# 是否需要美学评分
requires_aesthetics_score: bool = False,
# 是否对空提示强制使用零
force_zeros_for_empty_prompt: bool = True,
# 可选的水印设置
add_watermarker: Optional[bool] = None,
# 应用层设置,默认是“mid”
pag_applied_layers: Union[str, List[str]] = "mid", # ["mid"], ["down.block_1", "up.block_0.attentions_0"]
):
# 调用父类初始化方法
super().__init__()
# 注册多个模块
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
image_encoder=image_encoder,
feature_extractor=feature_extractor,
scheduler=scheduler,
)
# 注册配置项:强制零设置
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
# 注册配置项:美学评分需求
self.register_to_config(requires_aesthetics_score=requires_aesthetics_score)
# 计算VAE缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器实例
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 创建掩码处理器实例
self.mask_processor = VaeImageProcessor(
vae_scale_factor=self.vae_scale_factor, do_normalize=False, do_binarize=True, do_convert_grayscale=True
)
# 如果水印未指定,则根据可用性设置
add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
# 根据水印设置初始化水印对象
if add_watermarker:
self.watermark = StableDiffusionXLWatermarker()
else:
self.watermark = None
# 设置应用层
self.set_pag_applied_layers(pag_applied_layers)
# 从diffusers库中复制的编码图像的方法
# 定义编码图像的函数,接受图像、设备、每个提示的图像数量和输出隐藏状态参数
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入的图像不是张量,将其转换为张量并提取像素值
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定设备,并转换为相应的数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态,进行隐藏状态的编码
if output_hidden_states:
# 编码图像并获取倒数第二层的隐藏状态
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 复制隐藏状态以匹配每个提示的图像数量
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 编码全零图像以获取无条件的隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 复制无条件隐藏状态以匹配每个提示的图像数量
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回图像和无条件的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 编码图像并获取图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 复制图像嵌入以匹配每个提示的图像数量
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入形状相同的全零张量作为无条件嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回图像嵌入和无条件嵌入
return image_embeds, uncond_image_embeds
# 从稳定扩散管道复制的函数,用于准备图像嵌入
def prepare_ip_adapter_image_embeds(
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 初始化存储图像嵌入的列表
image_embeds = []
# 如果启用分类器自由引导,则初始化负图像嵌入的列表
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果输入适配器图像嵌入为空
if ip_adapter_image_embeds is None:
# 如果输入适配器图像不是列表,则转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查输入适配器图像的长度是否与 IP 适配器的数量相同
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
# 如果不相同,抛出值错误
raise ValueError(
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历输入适配器图像和图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 检查当前图像投影层是否为图像投影的实例
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码当前图像,获取嵌入和负嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将当前图像嵌入添加到列表中
image_embeds.append(single_image_embeds[None, :])
# 如果启用分类器自由引导,则添加负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 遍历输入适配器图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用分类器自由引导,则将嵌入拆分为负嵌入和图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化存储最终图像适配器嵌入的列表
ip_adapter_image_embeds = []
# 遍历图像嵌入和其索引
for i, single_image_embeds in enumerate(image_embeds):
# 将当前图像嵌入复制指定次数
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用分类器自由引导,则处理负图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负图像嵌入与图像嵌入连接
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入转移到指定设备
single_image_embeds = single_image_embeds.to(device=device)
# 将处理后的图像嵌入添加到列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回最终的图像适配器嵌入
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl.StableDiffusionXLPipeline.encode_prompt 复制的代码
# 定义一个编码提示的函数,接受多个参数以生成图像
def encode_prompt(
self,
prompt: str, # 主提示字符串,用于生成图像
prompt_2: Optional[str] = None, # 可选的第二个提示字符串
device: Optional[torch.device] = None, # 指定设备(如CPU或GPU)
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
do_classifier_free_guidance: bool = True, # 是否使用无分类器引导
negative_prompt: Optional[str] = None, # 可选的负面提示字符串
negative_prompt_2: Optional[str] = None, # 可选的第二个负面提示字符串
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
pooled_prompt_embeds: Optional[torch.Tensor] = None, # 可选的池化提示嵌入
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面池化提示嵌入
lora_scale: Optional[float] = None, # 可选的LORA缩放因子
clip_skip: Optional[int] = None, # 可选的跳过剪辑的步数
# 从diffusers库中复制的函数,用于准备额外的步骤参数
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的参数,因为不同调度器的签名不同
# eta(η)仅在DDIMScheduler中使用,其他调度器将忽略该值
# eta对应于DDIM论文中的η,范围应在[0, 1]之间
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) # 检查调度器是否接受eta参数
extra_step_kwargs = {} # 初始化额外步骤参数字典
if accepts_eta:
extra_step_kwargs["eta"] = eta # 如果接受eta,则将其添加到字典
# 检查调度器是否接受生成器
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys()) # 检查生成器参数
if accepts_generator:
extra_step_kwargs["generator"] = generator # 如果接受生成器,则将其添加到字典
return extra_step_kwargs # 返回准备好的额外步骤参数
# 从diffusers库中复制的函数,用于检查输入参数的有效性
def check_inputs(
self,
prompt, # 主提示字符串
prompt_2, # 可选的第二个提示字符串
image, # 输入图像
mask_image, # 输入的遮罩图像
height, # 图像的高度
width, # 图像的宽度
strength, # 强度参数,用于控制图像生成
callback_steps, # 回调步骤数
output_type, # 输出类型(如图像或张量)
negative_prompt=None, # 可选的负面提示字符串
negative_prompt_2=None, # 可选的第二个负面提示字符串
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
ip_adapter_image=None, # 可选的适配器图像
ip_adapter_image_embeds=None, # 可选的适配器图像嵌入
callback_on_step_end_tensor_inputs=None, # 在步骤结束时的回调张量输入
padding_mask_crop=None, # 可选的填充遮罩裁剪参数
# 从diffusers库中复制的函数,用于准备潜在的变量
def prepare_latents(
self,
batch_size, # 批量大小
num_channels_latents, # 潜在通道数量
height, # 图像的高度
width, # 图像的宽度
dtype, # 数据类型(如float32)
device, # 指定设备(如CPU或GPU)
generator, # 生成器,用于随机数生成
latents=None, # 可选的潜在变量
image=None, # 可选的输入图像
timestep=None, # 可选的时间步长
is_strength_max=True, # 是否将强度设置为最大
add_noise=True, # 是否在潜在变量中添加噪声
return_noise=False, # 是否返回噪声
return_image_latents=False, # 是否返回图像潜在变量
# 定义形状,包含批处理大小、通道数、调整后的高度和宽度
):
shape = (
batch_size, # 批处理大小
num_channels_latents, # 潜在变量通道数
int(height) // self.vae_scale_factor, # 调整后的高度
int(width) // self.vae_scale_factor, # 调整后的宽度
)
# 检查生成器列表的长度是否与批处理大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError( # 抛出值错误,提示生成器长度与批处理大小不匹配
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 检查图像或时间步是否为空,且强度不为最大值
if (image is None or timestep is None) and not is_strength_max:
raise ValueError( # 抛出值错误,提示初始化潜在变量时缺少必要参数
"Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
"However, either the image or the noise timestep has not been provided."
)
# 检查图像的通道数是否为4
if image.shape[1] == 4:
# 将图像转换为所需设备和数据类型
image_latents = image.to(device=device, dtype=dtype)
# 根据批处理大小重复图像潜在变量
image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)
# 如果需要返回图像潜在变量或潜在变量为空且强度不为最大值
elif return_image_latents or (latents is None and not is_strength_max):
# 将图像转换为所需设备和数据类型
image = image.to(device=device, dtype=dtype)
# 使用 VAE 编码图像以获得图像潜在变量
image_latents = self._encode_vae_image(image=image, generator=generator)
# 根据批处理大小重复图像潜在变量
image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)
# 如果潜在变量为空且需要添加噪声
if latents is None and add_noise:
# 创建随机噪声张量
noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 如果强度为1,则初始化潜在变量为噪声,否则初始化为图像与噪声的组合
latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
# 如果强度为最大值,则将潜在变量乘以调度器的初始 sigma
latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
# 如果需要添加噪声但潜在变量不为空
elif add_noise:
# 将潜在变量转换为所需设备
noise = latents.to(device)
# 将潜在变量乘以调度器的初始 sigma
latents = noise * self.scheduler.init_noise_sigma
# 如果不添加噪声
else:
# 创建随机噪声张量
noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 将图像潜在变量转换为所需设备
latents = image_latents.to(device)
# 将潜在变量打包为输出元组
outputs = (latents,)
# 如果需要返回噪声,将其添加到输出元组
if return_noise:
outputs += (noise,)
# 如果需要返回图像潜在变量,将其添加到输出元组
if return_image_latents:
outputs += (image_latents,)
# 返回最终的输出元组
return outputs
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_inpaint.StableDiffusionXLInpaintPipeline._encode_vae_image 复制的代码
# 定义一个私有方法,用于编码变分自编码器的图像
def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
# 获取输入图像的数据类型
dtype = image.dtype
# 如果配置强制上溯,则将图像转换为浮点类型,并将 VAE 转换为浮点32类型
if self.vae.config.force_upcast:
image = image.float()
self.vae.to(dtype=torch.float32)
# 如果生成器是列表,则对每个图像编码并获取潜在表示
if isinstance(generator, list):
image_latents = [
# 调用 VAE 编码函数并获取每个图像的潜在表示
retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
for i in range(image.shape[0])
]
# 将所有潜在表示在第0维合并
image_latents = torch.cat(image_latents, dim=0)
else:
# 对单个图像编码并获取潜在表示
image_latents = retrieve_latents(self.vae.encode(image), generator=generator)
# 如果配置强制上溯,则将 VAE 恢复为原数据类型
if self.vae.config.force_upcast:
self.vae.to(dtype)
# 将潜在表示转换回原数据类型
image_latents = image_latents.to(dtype)
# 根据配置的缩放因子调整潜在表示
image_latents = self.vae.config.scaling_factor * image_latents
# 返回最终的潜在表示
return image_latents
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_inpaint.StableDiffusionXLInpaintPipeline.prepare_mask_latents 复制的方法
def prepare_mask_latents(
# 定义输入参数,包括掩码、被遮蔽图像、批量大小、高度、宽度、数据类型、设备、生成器及分类器自由引导标志
self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
):
# 将掩码调整为与潜在空间形状相同,以便后续拼接
# 在转换数据类型之前进行调整,避免在使用 CPU 卸载和半精度时出错
mask = torch.nn.functional.interpolate(
# 将掩码的大小调整为高度和宽度除以 VAE 缩放因子的结果
mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
)
# 将掩码移动到指定设备并转换为指定数据类型
mask = mask.to(device=device, dtype=dtype)
# 为每个生成的提示复制掩码和被掩盖的图像潜在空间,使用适合 MPS 的方法
if mask.shape[0] < batch_size:
# 检查传入的掩码数量是否能整除所需的批处理大小
if not batch_size % mask.shape[0] == 0:
raise ValueError(
# 抛出错误,说明掩码数量与批处理大小不匹配
"The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"
f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"
" of masks that you pass is divisible by the total requested batch size."
)
# 根据批处理大小复制掩码
mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)
# 根据是否进行无分类器自由引导拼接掩码
mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
# 检查被掩盖的图像是否存在且有四个通道
if masked_image is not None and masked_image.shape[1] == 4:
# 将被掩盖的图像潜在空间设置为原始被掩盖图像
masked_image_latents = masked_image
else:
# 如果条件不满足,则设置为 None
masked_image_latents = None
# 如果被掩盖的图像存在
if masked_image is not None:
# 如果被掩盖的图像潜在空间为 None
if masked_image_latents is None:
# 将被掩盖的图像移动到指定设备并转换为指定数据类型
masked_image = masked_image.to(device=device, dtype=dtype)
# 编码被掩盖的图像为 VAE 潜在空间
masked_image_latents = self._encode_vae_image(masked_image, generator=generator)
# 检查被掩盖的图像潜在空间的数量是否小于批处理大小
if masked_image_latents.shape[0] < batch_size:
# 检查图像数量是否能整除批处理大小
if not batch_size % masked_image_latents.shape[0] == 0:
raise ValueError(
# 抛出错误,说明图像数量与批处理大小不匹配
"The passed images and the required batch size don't match. Images are supposed to be duplicated"
f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."
" Make sure the number of images that you pass is divisible by the total requested batch size."
)
# 根据批处理大小复制被掩盖的图像潜在空间
masked_image_latents = masked_image_latents.repeat(
batch_size // masked_image_latents.shape[0], 1, 1, 1
)
# 根据是否进行无分类器自由引导拼接被掩盖的图像潜在空间
masked_image_latents = (
torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
)
# 调整设备以防止在与潜在模型输入拼接时出现设备错误
masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)
# 返回处理后的掩码和被掩盖的图像潜在空间
return mask, masked_image_latents
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img.StableDiffusionXLImg2ImgPipeline.get_timesteps 复制的代码
# 获取推理步骤的时间戳
def get_timesteps(self, num_inference_steps, strength, device, denoising_start=None):
# 使用 init_timestep 获取原始时间戳
if denoising_start is None:
# 计算初始时间戳,取 num_inference_steps 与 strength 的乘积或 num_inference_steps 的最小值
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算开始时间戳,确保不小于 0
t_start = max(num_inference_steps - init_timestep, 0)
else:
# 如果 denoising_start 不为 None,开始时间戳为 0
t_start = 0
# 获取从 t_start 开始的时间戳序列
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果直接请求一个时间戳开始,则强度与 denoising_start 相关
if denoising_start is not None:
# 计算离散时间戳截止值
discrete_timestep_cutoff = int(
round(
self.scheduler.config.num_train_timesteps
- (denoising_start * self.scheduler.config.num_train_timesteps)
)
)
# 计算推理步骤数,依据时间戳小于截止值的总和
num_inference_steps = (timesteps < discrete_timestep_cutoff).sum().item()
if self.scheduler.order == 2 and num_inference_steps % 2 == 0:
# 如果调度器为二阶调度器,推理步骤数为偶数则加 1
num_inference_steps = num_inference_steps + 1
# 因为 t_n+1 >= t_n,从末尾切片时间戳
timesteps = timesteps[-num_inference_steps:]
return timesteps, num_inference_steps
# 返回剩余时间戳和计算后的推理步骤数
return timesteps, num_inference_steps - t_start
# 从 diffusers.pipelines.stable_diffusion_xl.pipeline_stable_diffusion_xl_img2img 中复制
def _get_add_time_ids(
self,
original_size,
crops_coords_top_left,
target_size,
aesthetic_score,
negative_aesthetic_score,
negative_original_size,
negative_crops_coords_top_left,
negative_target_size,
dtype,
text_encoder_projection_dim=None,
):
# 检查配置是否需要美学评分
if self.config.requires_aesthetics_score:
# 创建包含原始尺寸、裁剪坐标和美学评分的列表
add_time_ids = list(original_size + crops_coords_top_left + (aesthetic_score,))
# 创建包含负原始尺寸、裁剪坐标和负美学评分的列表
add_neg_time_ids = list(
negative_original_size + negative_crops_coords_top_left + (negative_aesthetic_score,)
)
else:
# 创建包含原始尺寸、裁剪坐标和目标尺寸的列表
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 创建包含负原始尺寸、裁剪坐标和负目标尺寸的列表
add_neg_time_ids = list(negative_original_size + crops_coords_top_left + negative_target_size)
# 计算传递的附加嵌入维度
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取预期的附加嵌入维度
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
# 检查预期维度大于传递维度且差值等于附加时间嵌入维度
if (
expected_add_embed_dim > passed_add_embed_dim
and (expected_add_embed_dim - passed_add_embed_dim) == self.unet.config.addition_time_embed_dim
):
# 抛出值错误,提示维度不匹配
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to enable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=True)` to make sure `aesthetic_score` {aesthetic_score} and `negative_aesthetic_score` {negative_aesthetic_score} is correctly used by the model."
)
# 检查预期维度小于传递维度且差值等于附加时间嵌入维度
elif (
expected_add_embed_dim < passed_add_embed_dim
and (passed_add_embed_dim - expected_add_embed_dim) == self.unet.config.addition_time_embed_dim
):
# 抛出值错误,提示维度不匹配
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. Please make sure to disable `requires_aesthetics_score` with `pipe.register_to_config(requires_aesthetics_score=False)` to make sure `target_size` {target_size} is correctly used by the model."
)
# 检查预期维度与传递维度不相等
elif expected_add_embed_dim != passed_add_embed_dim:
# 抛出值错误,提示模型配置不正确
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将添加的时间 ID 转换为张量
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
# 将添加的负时间 ID 转换为张量
add_neg_time_ids = torch.tensor([add_neg_time_ids], dtype=dtype)
# 返回添加的时间 ID 和添加的负时间 ID
return add_time_ids, add_neg_time_ids
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_upscale.StableDiffusionUpscalePipeline.upcast_vae 中复制
# 将变分自编码器(VAE)向上转换到指定数据类型
def upcast_vae(self):
# 获取当前 VAE 的数据类型
dtype = self.vae.dtype
# 将 VAE 转换为 float32 数据类型
self.vae.to(dtype=torch.float32)
# 判断是否使用了 torch 2.0 或 xformers 的注意力处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
),
)
# 如果使用了 xformers 或 torch 2.0,注意力块可以不使用 float32,从而节省内存
if use_torch_2_0_or_xformers:
# 将后量化卷积转换为原始数据类型
self.vae.post_quant_conv.to(dtype)
# 将输入卷积转换为原始数据类型
self.vae.decoder.conv_in.to(dtype)
# 将中间块转换为原始数据类型
self.vae.decoder.mid_block.to(dtype)
# 从 LatentConsistencyModelPipeline 获取指导尺度嵌入
def get_guidance_scale_embedding(
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
"""
查看 https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
参数:
w (`torch.Tensor`):
生成指定指导尺度的嵌入向量,以丰富时间步嵌入。
embedding_dim (`int`, *可选*, 默认值为 512):
生成嵌入的维度。
dtype (`torch.dtype`, *可选*, 默认值为 `torch.float32`):
生成嵌入的数据类型。
返回:
`torch.Tensor`: 形状为 `(len(w), embedding_dim)` 的嵌入向量。
"""
# 确保 w 的形状为一维
assert len(w.shape) == 1
# 将 w 乘以 1000
w = w * 1000.0
# 计算一半的维度
half_dim = embedding_dim // 2
# 计算嵌入的比例
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 生成指数衰减的嵌入
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 根据 w 和嵌入计算最终的嵌入
emb = w.to(dtype)[:, None] * emb[None, :]
# 连接正弦和余弦值
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度为奇数,则进行零填充
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保最终嵌入的形状符合预期
assert emb.shape == (w.shape[0], embedding_dim)
# 返回嵌入
return emb
# 返回当前指导尺度
@property
def guidance_scale(self):
return self._guidance_scale
# 返回当前指导重缩放
@property
def guidance_rescale(self):
return self._guidance_rescale
# 返回当前剪切跳过设置
@property
def clip_skip(self):
return self._clip_skip
# 判断是否进行无分类器指导,基于指导尺度和 UNet 配置
@property
def do_classifier_free_guidance(self):
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 返回交叉注意力参数
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 返回去噪结束的设置
@property
def denoising_end(self):
return self._denoising_end
# 返回其他属性
# 定义去噪过程的起始点的 getter 方法
def denoising_start(self):
# 返回去噪过程的起始点
return self._denoising_start
# 定义时间步数的属性
@property
def num_timesteps(self):
# 返回时间步数
return self._num_timesteps
# 定义中断的属性
@property
def interrupt(self):
# 返回中断状态
return self._interrupt
# 以无梯度的方式定义可调用的方法
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 输入提示字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 第二个提示字符串或字符串列表(可选)
prompt_2: Optional[Union[str, List[str]]] = None,
# 输入图像(可选)
image: PipelineImageInput = None,
# 掩码图像(可选)
mask_image: PipelineImageInput = None,
# 掩码图像的潜在张量(可选)
masked_image_latents: torch.Tensor = None,
# 图像高度(可选)
height: Optional[int] = None,
# 图像宽度(可选)
width: Optional[int] = None,
# 填充掩码裁剪的大小(可选)
padding_mask_crop: Optional[int] = None,
# 去噪强度
strength: float = 0.9999,
# 推理步骤数量
num_inference_steps: int = 50,
# 时间步列表(可选)
timesteps: List[int] = None,
# sigma 值列表(可选)
sigmas: List[float] = None,
# 去噪起始点(可选)
denoising_start: Optional[float] = None,
# 去噪结束点(可选)
denoising_end: Optional[float] = None,
# 指导比例
guidance_scale: float = 7.5,
# 负提示字符串或字符串列表(可选)
negative_prompt: Optional[Union[str, List[str]]] = None,
# 第二个负提示字符串或字符串列表(可选)
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量(可选)
num_images_per_prompt: Optional[int] = 1,
# eta 值
eta: float = 0.0,
# 随机生成器(可选)
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在张量(可选)
latents: Optional[torch.Tensor] = None,
# 提示嵌入(可选)
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示嵌入(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 池化的提示嵌入(可选)
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 负池化的提示嵌入(可选)
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# IP 适配器图像(可选)
ip_adapter_image: Optional[PipelineImageInput] = None,
# IP 适配器图像嵌入(可选)
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型(可选,默认是 "pil")
output_type: Optional[str] = "pil",
# 是否返回字典形式的结果(默认是 True)
return_dict: bool = True,
# 交叉注意力的关键字参数(可选)
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 指导重新缩放
guidance_rescale: float = 0.0,
# 原始图像大小(可选)
original_size: Tuple[int, int] = None,
# 裁剪的左上角坐标(默认是 (0, 0))
crops_coords_top_left: Tuple[int, int] = (0, 0),
# 目标图像大小(可选)
target_size: Tuple[int, int] = None,
# 负原始图像大小(可选)
negative_original_size: Optional[Tuple[int, int]] = None,
# 负裁剪的左上角坐标(默认是 (0, 0))
negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
# 负目标图像大小(可选)
negative_target_size: Optional[Tuple[int, int]] = None,
# 美学评分
aesthetic_score: float = 6.0,
# 负美学评分
negative_aesthetic_score: float = 2.5,
# 跳过的剪辑数(可选)
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数(可选)
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 结束步骤时的张量输入名称列表(默认是 ["latents"])
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# PAG 缩放因子
pag_scale: float = 3.0,
# PAG 自适应缩放因子
pag_adaptive_scale: float = 0.0,