diffusers 源码解析(四十六)
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_depth2img.py
# 版权信息,声明该文件的所有权归 HuggingFace 团队所有
# 许可信息,指明该文件遵循 Apache License 2.0
# 说明在使用该文件时需遵循该许可证的条款
# 可通过下面的链接获取许可证
# http://www.apache.org/licenses/LICENSE-2.0
# 除非适用法律要求或书面同意,否则该软件按“原样”提供,不附带任何明示或暗示的担保或条件
# 查看许可证以获取特定语言管理权限和限制的详细信息
import contextlib # 导入上下文管理库,用于处理上下文
import inspect # 导入检查库,用于获取对象的信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示相关工具
import numpy as np # 导入 NumPy 库,用于数组和数值计算
import PIL.Image # 导入 PIL 库中的 Image 模块,用于图像处理
import torch # 导入 PyTorch 库,用于深度学习计算
from packaging import version # 导入版本控制工具,用于处理版本信息
from transformers import CLIPTextModel, CLIPTokenizer, DPTForDepthEstimation, DPTImageProcessor # 导入 Transformers 库中的模型和处理器
from ...configuration_utils import FrozenDict # 从配置工具中导入 FrozenDict,用于不可变字典
from ...image_processor import PipelineImageInput, VaeImageProcessor # 导入图像处理相关工具
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器工具,用于模型加载
from ...models import AutoencoderKL, UNet2DConditionModel # 导入模型类
from ...models.lora import adjust_lora_scale_text_encoder # 导入 LoRA 调整工具
from ...schedulers import KarrasDiffusionSchedulers # 导入调度器工具
from ...utils import PIL_INTERPOLATION, USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers # 导入实用工具
from ...utils.torch_utils import randn_tensor # 从 PyTorch 工具导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 导入扩散管道和图像输出工具
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
# 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
return encoder_output.latent_dist.sample(generator) # 从潜在分布中采样
# 检查 encoder_output 是否具有 latent_dist 属性且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
return encoder_output.latent_dist.mode() # 返回潜在分布的众数
# 检查 encoder_output 是否具有 latents 属性
elif hasattr(encoder_output, "latents"):
return encoder_output.latents # 返回潜在表示
else:
raise AttributeError("Could not access latents of provided encoder_output") # 如果没有访问到潜在表示,抛出异常
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.preprocess 复制的函数
def preprocess(image):
# 设置弃用提示信息
deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
# 调用弃用函数记录弃用信息
deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
# 检查输入是否为 PyTorch 张量
if isinstance(image, torch.Tensor):
return image # 如果是张量,则直接返回
# 检查输入是否为 PIL 图像
elif isinstance(image, PIL.Image.Image):
image = [image] # 如果是图像,则将其封装为列表
# 检查 image 列表的第一个元素是否为 PIL 的图像对象
if isinstance(image[0], PIL.Image.Image):
# 获取图像的宽度和高度
w, h = image[0].size
# 将宽度和高度调整为 8 的整数倍
w, h = (x - x % 8 for x in (w, h)) # resize to integer multiple of 8
# 将每个图像调整为新的宽高,并转换为 NumPy 数组,增加一个新的维度
image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
# 将所有图像沿第 0 维连接成一个大数组
image = np.concatenate(image, axis=0)
# 将数组转换为浮点数并归一化到 [0, 1] 范围
image = np.array(image).astype(np.float32) / 255.0
# 调整数组的维度顺序,从 (N, H, W, C) 转换为 (N, C, H, W)
image = image.transpose(0, 3, 1, 2)
# 将图像数据的值范围从 [0, 1] 变换到 [-1, 1]
image = 2.0 * image - 1.0
# 将 NumPy 数组转换为 PyTorch 张量
image = torch.from_numpy(image)
# 检查 image 列表的第一个元素是否为 PyTorch 张量
elif isinstance(image[0], torch.Tensor):
# 沿第 0 维连接所有 PyTorch 张量
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
# 定义一个名为 StableDiffusionDepth2ImgPipeline 的类,继承多个混合类
class StableDiffusionDepth2ImgPipeline(DiffusionPipeline, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin):
r"""
使用稳定扩散进行基于深度的图像生成的管道,支持文本引导。
该模型继承自 [`DiffusionPipeline`],可查看超类文档以了解所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
此管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器 (VAE) 模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行标记化的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 结合使用的调度器,以去噪编码图像潜在表示。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
"""
# 定义模型 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义需要作为回调的张量输入
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "depth_mask"]
# 初始化方法,接受多个参数以配置管道
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器模型
text_encoder: CLIPTextModel, # 文本编码器
tokenizer: CLIPTokenizer, # 文本标记化工具
unet: UNet2DConditionModel, # 去噪模型
scheduler: KarrasDiffusionSchedulers, # 调度器
depth_estimator: DPTForDepthEstimation, # 深度估计模型
feature_extractor: DPTImageProcessor, # 特征提取器
# 定义构造函数
):
# 调用父类的构造函数
super().__init__()
# 检查 unet 配置是否有 diffusers 版本属性,并判断其是否小于 0.9.0
is_unet_version_less_0_9_0 = hasattr(unet.config, "_diffusers_version") and version.parse(
version.parse(unet.config._diffusers_version).base_version
) < version.parse("0.9.0.dev0")
# 检查 unet 配置的样本大小是否小于 64
is_unet_sample_size_less_64 = hasattr(unet.config, "sample_size") and unet.config.sample_size < 64
# 如果 unet 版本小于 0.9.0 且样本大小小于 64,则给出弃用警告
if is_unet_version_less_0_9_0 and is_unet_sample_size_less_64:
# 创建弃用消息,提示用户更新配置文件
deprecation_message = (
"The configuration file of the unet has set the default `sample_size` to smaller than"
" 64 which seems highly unlikely .If you're checkpoint is a fine-tuned version of any of the"
" following: \n- CompVis/stable-diffusion-v1-4 \n- CompVis/stable-diffusion-v1-3 \n-"
" CompVis/stable-diffusion-v1-2 \n- CompVis/stable-diffusion-v1-1 \n- runwayml/stable-diffusion-v1-5"
" \n- runwayml/stable-diffusion-inpainting \n you should change 'sample_size' to 64 in the"
" configuration file. Please make sure to update the config accordingly as leaving `sample_size=32`"
" in the config might lead to incorrect results in future versions. If you have downloaded this"
" checkpoint from the Hugging Face Hub, it would be very nice if you could open a Pull request for"
" the `unet/config.json` file"
)
# 调用弃用函数,记录样本大小小于 64 的警告
deprecate("sample_size<64", "1.0.0", deprecation_message, standard_warn=False)
# 创建新的配置字典,修改样本大小为 64
new_config = dict(unet.config)
new_config["sample_size"] = 64
# 更新 unet 的内部字典
unet._internal_dict = FrozenDict(new_config)
# 注册各个模块
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
depth_estimator=depth_estimator,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器实例
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 从 StableDiffusionPipeline 的 _encode_prompt 方法复制
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
# 可选参数,用于嵌入提示和负面提示
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
# 接收额外的关键字参数
**kwargs,
# 结束括号,表示函数参数列表的结束
):
# 警告信息,说明 `_encode_prompt()` 已被弃用,未来版本中将移除,建议使用 `encode_prompt()`
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数记录弃用信息,指定版本和自定义警告选项
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法生成提示嵌入元组,传入多个参数
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 输入提示
device=device, # 设备信息
num_images_per_prompt=num_images_per_prompt, # 每个提示的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否进行无分类器引导
negative_prompt=negative_prompt, # 负提示内容
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负提示嵌入
lora_scale=lora_scale, # Lora 缩放因子
**kwargs, # 其他关键字参数
)
# 连接嵌入元组中的两个部分,以支持向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回组合后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的函数
def encode_prompt(
self,
prompt, # 输入提示
device, # 设备信息
num_images_per_prompt, # 每个提示的图像数量
do_classifier_free_guidance, # 是否进行无分类器引导
negative_prompt=None, # 负提示内容,默认为 None
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入
lora_scale: Optional[float] = None, # 可选的 Lora 缩放因子
clip_skip: Optional[int] = None, # 可选的剪裁跳过参数
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的函数
def run_safety_checker(self, image, device, dtype): # 安全检查器函数,检查输入图像的安全性
# 检查安全检查器是否存在,如果不存在则将 nsfw 概念标记为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果图像是张量,使用图像处理器进行后处理并转换为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果不是张量,则将 NumPy 数组转换为 PIL 图像
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器处理图像输入,并将其转换为指定设备的张量
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 调用安全检查器,检查图像并返回处理后的图像和 nsfw 概念标记
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回检查后的图像和 nsfw 概念标记
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的函数
# 解码潜在变量的函数
def decode_latents(self, latents):
# 定义一个弃用警告信息,提示用户该方法将在未来版本中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用函数,发出警告,指明该方法弃用的版本
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 的缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,获取解码后的图像,返回的结果是一个元组,取第一个元素
image = self.vae.decode(latents, return_dict=False)[0]
# 对图像进行归一化处理,将值范围调整到 [0, 1]
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像数据转为 float32 类型,便于与 bfloat16 兼容,且不会造成显著开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像数据
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的函数
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的参数,因为并非所有调度器的参数签名相同
# eta (η) 仅用于 DDIMScheduler,其他调度器将忽略此参数
# eta 对应于 DDIM 论文中的 η:https://arxiv.org/abs/2010.02502
# eta 的取值应在 [0, 1] 之间
# 检查调度器的 step 方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建一个字典以存放额外的步骤参数
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的 step 方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 检查输入参数的函数
def check_inputs(
self,
prompt, # 输入的提示文本
strength, # 强度参数
callback_steps, # 回调步骤
negative_prompt=None, # 可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选的回调输入
):
# 检查 strength 是否在有效范围内 [0.0, 1.0]
if strength < 0 or strength > 1:
# 如果不在范围内,抛出值错误
raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}")
# 检查 callback_steps 是否为正整数
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 如果不是正整数,抛出值错误
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查 callback_on_step_end_tensor_inputs 是否在允许的回调张量输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有不在允许输入中的项,抛出值错误
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查同时提供 prompt 和 prompt_embeds 是否有效
if prompt is not None and prompt_embeds is not None:
# 抛出值错误,提醒只能提供其中一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prompt_embeds is None:
# 抛出值错误,提醒必须提供其中一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 检查 prompt 的类型是否有效
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查同时提供 negative_prompt 和 negative_prompt_embeds 是否有效
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出值错误,提醒只能提供其中一个
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 的形状是否一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 抛出值错误,提醒两者形状必须一致
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.get_timesteps 复制的代码
# 获取时间步,进行推理步骤的处理
def get_timesteps(self, num_inference_steps, strength, device):
# 计算初始时间步,取 num_inference_steps 与 strength 的乘积和 num_inference_steps 的最小值
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算开始时间步,确保不小于0
t_start = max(num_inference_steps - init_timestep, 0)
# 从调度器中获取相应时间步的切片
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果调度器有设置开始索引的方法,则调用它
if hasattr(self.scheduler, "set_begin_index"):
self.scheduler.set_begin_index(t_start * self.scheduler.order)
# 返回时间步和剩余的推理步骤数
return timesteps, num_inference_steps - t_start
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.prepare_latents 中复制的
# 准备深度图,处理输入图像和深度图,适应批量大小及其他参数
def prepare_depth_map(self, image, depth_map, batch_size, do_classifier_free_guidance, dtype, device):
# 如果输入的图像是单个 PIL 图像,则将其转换为列表
if isinstance(image, PIL.Image.Image):
image = [image]
else:
# 如果输入是多个图像,则将其转换为列表
image = list(image)
# 检查图像的类型并获取宽度和高度
if isinstance(image[0], PIL.Image.Image):
width, height = image[0].size # 从 PIL 图像中获取宽高
elif isinstance(image[0], np.ndarray):
width, height = image[0].shape[:-1] # 从 numpy 数组中获取宽高
else:
height, width = image[0].shape[-2:] # 从其他格式中获取宽高
# 如果没有提供深度图,则计算深度图
if depth_map is None:
# 使用特征提取器提取图像的像素值,并将其转换为张量
pixel_values = self.feature_extractor(images=image, return_tensors="pt").pixel_values
# 将像素值移动到指定的设备并转换为指定的数据类型
pixel_values = pixel_values.to(device=device, dtype=dtype)
# DPT-Hybrid 模型使用批量归一化层,不支持 fp16,因此使用自动混合精度
if torch.backends.mps.is_available():
autocast_ctx = contextlib.nullcontext() # 创建一个空上下文
logger.warning(
"The DPT-Hybrid model uses batch-norm layers which are not compatible with fp16, but autocast is not yet supported on MPS."
) # 记录警告
else:
# 在支持的设备上创建自动混合精度上下文
autocast_ctx = torch.autocast(device.type, dtype=dtype)
with autocast_ctx: # 进入自动混合精度上下文
# 使用深度估计器计算深度图
depth_map = self.depth_estimator(pixel_values).predicted_depth
else:
# 如果提供了深度图,则将其移动到指定的设备和数据类型
depth_map = depth_map.to(device=device, dtype=dtype)
# 调整深度图的大小以适应 VAE 的缩放因子
depth_map = torch.nn.functional.interpolate(
depth_map.unsqueeze(1), # 增加一个维度以适应插值操作
size=(height // self.vae_scale_factor, width // self.vae_scale_factor), # 目标大小
mode="bicubic", # 使用双三次插值
align_corners=False, # 不对齐角点
)
# 计算深度图的最小值和最大值
depth_min = torch.amin(depth_map, dim=[1, 2, 3], keepdim=True) # 获取深度图的最小值
depth_max = torch.amax(depth_map, dim=[1, 2, 3], keepdim=True) # 获取深度图的最大值
# 将深度图归一化到 [-1, 1] 的范围
depth_map = 2.0 * (depth_map - depth_min) / (depth_max - depth_min) - 1.0
# 将深度图转换为指定的数据类型
depth_map = depth_map.to(dtype)
# 如果深度图的批量大小小于给定的批量大小,则重复深度图以匹配批量大小
if depth_map.shape[0] < batch_size:
repeat_by = batch_size // depth_map.shape[0] # 计算重复次数
depth_map = depth_map.repeat(repeat_by, 1, 1, 1) # 重复深度图
# 根据是否使用无分类器引导来调整深度图
depth_map = torch.cat([depth_map] * 2) if do_classifier_free_guidance else depth_map
# 返回处理后的深度图
return depth_map
# 返回指导缩放因子
@property
def guidance_scale(self):
return self._guidance_scale
# 返回剪辑跳过的参数
@property
def clip_skip(self):
return self._clip_skip
# 这里的 `guidance_scale` 定义类似于 Imagen 论文中公式 (2) 的指导权重 `w`
# `guidance_scale = 1` 表示不使用无分类器引导
@property
def do_classifier_free_guidance(self):
return self._guidance_scale > 1 # 判断是否使用无分类器引导
# 返回交叉注意力的参数
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 返回时间步数
@property
def num_timesteps(self):
return self._num_timesteps
# 使用装饰器禁用梯度计算,以节省内存和计算资源
@torch.no_grad()
# 定义可调用方法,接受多个参数以生成图像
def __call__(
# 输入提示,字符串或字符串列表,决定生成内容
self,
prompt: Union[str, List[str]] = None,
# 输入图像,类型为 PipelineImageInput,用于图像生成
image: PipelineImageInput = None,
# 深度图,类型为可选的 torch.Tensor,用于提供深度信息
depth_map: Optional[torch.Tensor] = None,
# 强度参数,决定生成的图像变化程度,默认为 0.8
strength: float = 0.8,
# 推理步骤数,决定生成过程的迭代次数,默认为 50
num_inference_steps: Optional[int] = 50,
# 指导比例,用于调整生成图像与提示的一致性,默认为 7.5
guidance_scale: Optional[float] = 7.5,
# 负向提示,字符串或字符串列表,提供生成限制条件
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 随机性参数,控制生成过程中的随机性,默认为 0.0
eta: Optional[float] = 0.0,
# 生成器,用于控制随机数生成的可选参数
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 提示的嵌入,类型为可选的 torch.Tensor,提供编码后的提示信息
prompt_embeds: Optional[torch.Tensor] = None,
# 负向提示的嵌入,类型为可选的 torch.Tensor,提供编码后的负向提示信息
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil",指示生成结果的格式
output_type: Optional[str] = "pil",
# 返回字典标志,决定是否以字典形式返回结果,默认为 True
return_dict: bool = True,
# 交叉注意力的额外参数,可选字典类型
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的剪辑层数,可选整数,控制模型层的使用
clip_skip: Optional[int] = None,
# 每一步结束时的回调函数,接受步数、总步数和字典作为参数
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 在回调函数中包含的张量输入名称,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 额外的关键字参数,允许用户自定义输入
**kwargs,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_image_variation.py
# 版权声明,标识文件的版权所有者和相关条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件按“原样”提供,
# 不附带任何明示或暗示的担保或条件。
# 请参阅许可证以了解适用的权限和限制。
#
# 导入 inspect 模块,用于检查函数签名和源代码
import inspect
# 从 typing 模块导入类型提示所需的类
from typing import Callable, List, Optional, Union
# 导入 PIL.Image 模块,用于处理图像
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入 version 模块用于处理版本信息
from packaging import version
# 导入 CLIP 相关的图像处理器和模型
from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection
# 从相对路径导入 FrozenDict 配置类
from ...configuration_utils import FrozenDict
# 从相对路径导入图像处理器
from ...image_processor import VaeImageProcessor
# 从相对路径导入自动编码器和 UNet 模型
from ...models import AutoencoderKL, UNet2DConditionModel
# 从相对路径导入 Karras 扩散调度器
from ...schedulers import KarrasDiffusionSchedulers
# 从相对路径导入工具函数
from ...utils import deprecate, logging
# 从工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入扩散管道和稳定扩散混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从当前目录导入稳定扩散管道输出类
from . import StableDiffusionPipelineOutput
# 从当前目录导入安全检查器
from .safety_checker import StableDiffusionSafetyChecker
# 创建日志记录器,便于记录调试信息和警告
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个类,用于生成图像变体,继承自扩散管道和稳定扩散混合类
class StableDiffusionImageVariationPipeline(DiffusionPipeline, StableDiffusionMixin):
r"""
管道用于从输入图像生成图像变体,使用稳定扩散模型。
该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解所有管道的通用方法
(下载、保存、在特定设备上运行等)。
# 函数参数说明
Args:
vae ([`AutoencoderKL`]): # 变分自编码器(VAE)模型,用于将图像编码为潜在表示,并从中解码图像。
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
image_encoder ([`~transformers.CLIPVisionModelWithProjection`] ): # 冻结的 CLIP 图像编码器,具体为 clip-vit-large-patch14。
Frozen CLIP image-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
text_encoder ([`~transformers.CLIPTextModel`]): # 冻结的文本编码器,具体为 clip-vit-large-patch14。
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
tokenizer ([`~transformers.CLIPTokenizer`]): # 用于对文本进行分词的 CLIP 分词器。
A `CLIPTokenizer` to tokenize text.
unet ([`UNet2DConditionModel`]): # 用于去噪已编码图像潜在表示的 UNet 模型。
A `UNet2DConditionModel` to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]): # 用于与 UNet 结合使用以去噪已编码图像潜在表示的调度器,可以是 DDIMScheduler、LMSDiscreteScheduler 或 PNDMScheduler。
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
safety_checker ([`StableDiffusionSafetyChecker`]): # 分类模块,用于评估生成的图像是否可能被认为是冒犯性或有害的。
Classification module that estimates whether generated images could be considered offensive or harmful.
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
about a model's potential harms.
feature_extractor ([`~transformers.CLIPImageProcessor`]): # CLIP 图像处理器,用于从生成的图像中提取特征;作为安全检查器的输入。
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
"""
# TODO: feature_extractor 是必需的,以便编码图像(如果它们是 PIL 格式),
# 如果管道没有 feature_extractor,我们应该给出描述性消息。
_optional_components = ["safety_checker"] # 可选组件列表,包含安全检查器。
model_cpu_offload_seq = "image_encoder->unet->vae" # 模型在 CPU 卸载时的顺序。
_exclude_from_cpu_offload = ["safety_checker"] # 在 CPU 卸载时排除的组件,安全检查器不会被卸载。
def __init__( # 初始化方法,定义类的构造函数。
self,
vae: AutoencoderKL, # 传入变分自编码器实例。
image_encoder: CLIPVisionModelWithProjection, # 传入图像编码器实例。
unet: UNet2DConditionModel, # 传入 UNet 实例。
scheduler: KarrasDiffusionSchedulers, # 传入调度器实例。
safety_checker: StableDiffusionSafetyChecker, # 传入安全检查器实例。
feature_extractor: CLIPImageProcessor, # 传入图像处理器实例。
requires_safety_checker: bool = True, # 是否需要安全检查器的标志,默认值为 True。
# 定义一个私有方法用于编码图像,接收多个参数
def _encode_image(self, image, device, num_images_per_prompt, do_classifier_free_guidance):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 检查输入是否为张量,如果不是,则使用特征提取器处理图像
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(images=image, return_tensors="pt").pixel_values
# 将图像转移到指定设备并转换为所需数据类型
image = image.to(device=device, dtype=dtype)
# 通过图像编码器生成图像嵌入
image_embeddings = self.image_encoder(image).image_embeds
# 增加一个维度以便于后续处理
image_embeddings = image_embeddings.unsqueeze(1)
# 针对每个提示生成图像嵌入的副本,使用适合 MPS 的方法
bs_embed, seq_len, _ = image_embeddings.shape
# 重复图像嵌入以匹配每个提示生成的图像数量
image_embeddings = image_embeddings.repeat(1, num_images_per_prompt, 1)
# 重新调整图像嵌入的形状
image_embeddings = image_embeddings.view(bs_embed * num_images_per_prompt, seq_len, -1)
# 如果需要无分类器引导,则创建零向量的负提示嵌入
if do_classifier_free_guidance:
negative_prompt_embeds = torch.zeros_like(image_embeddings)
# 对于无分类器引导,我们需要进行两次前向传递
# 这里将无条件和文本嵌入拼接到一个批次中,以避免两次前向传递
image_embeddings = torch.cat([negative_prompt_embeds, image_embeddings])
# 返回最终的图像嵌入
return image_embeddings
# 从 StableDiffusionPipeline 复制的方法,用于运行安全检查器
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则将标记设为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 检查图像是否为张量,如果是则处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器处理图像并转移到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,返回处理后的图像和 NSFW 概念标记
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像及 NSFW 概念标记
return image, has_nsfw_concept
# 从 StableDiffusionPipeline 复制的方法,用于解码潜在变量
def decode_latents(self, latents):
# 显示弃用提示,告知用户该方法将在未来版本中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 按照配置的缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量以生成图像
image = self.vae.decode(latents, return_dict=False)[0]
# 归一化图像数据并限制其值在 0 到 1 之间
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,以确保兼容性并避免显著开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像
return image
# 从 StableDiffusionPipeline 复制的方法,用于准备额外步骤的关键字参数
# 准备额外参数以便于调度器步骤,因不同调度器的签名可能不同
def prepare_extra_step_kwargs(self, generator, eta):
# 检查调度器步骤是否接受 eta 参数,eta 仅在 DDIMScheduler 中使用
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# eta 应该在 [0, 1] 范围内
# 判断调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果调度器接受 eta,添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator,添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回额外参数字典
return extra_step_kwargs
# 检查输入的有效性,包括图像、高度、宽度和回调步数
def check_inputs(self, image, height, width, callback_steps):
# 确保图像类型为 torch.Tensor 或 PIL.Image.Image 或图像列表
if (
not isinstance(image, torch.Tensor)
and not isinstance(image, PIL.Image.Image)
and not isinstance(image, list)
):
raise ValueError(
"`image` has to be of type `torch.Tensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is"
f" {type(image)}"
)
# 确保高度和宽度都是8的倍数
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 确保回调步骤是正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 如果传入的生成器列表长度与批量大小不匹配,抛出异常
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在变量,生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,将其移动到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 禁用梯度计算,以节省内存
@torch.no_grad()
# 定义一个可调用的方法,用于处理图像输入
def __call__(
self,
# 输入图像,可以是单个 PIL 图片、图片列表或 PyTorch 张量
image: Union[PIL.Image.Image, List[PIL.Image.Image], torch.Tensor],
# 目标高度,可选参数
height: Optional[int] = None,
# 目标宽度,可选参数
width: Optional[int] = None,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 引导缩放因子,默认为 7.5
guidance_scale: float = 7.5,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 噪声控制参数,默认为 0.0
eta: float = 0.0,
# 随机数生成器,默认为 None,可以是单个生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 预定义的潜在张量,默认为 None
latents: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认为 True
return_dict: bool = True,
# 可选的回调函数,接收步骤、图像索引和张量
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调函数调用的步数,默认为 1
callback_steps: int = 1,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_img2img.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,否则根据许可证分发的软件是按“原样”提供的,
# 不提供任何明示或暗示的担保或条件。
# 有关许可证下权限和限制的具体语言,请参阅许可证。
# 导入 inspect 模块,用于获取对象的签名和其他信息
import inspect
# 从 typing 模块导入类型注解,方便类型提示
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 PIL 库,用于图像处理
import PIL.Image
# 导入 PyTorch 库,用于深度学习
import torch
# 导入版本管理工具,用于版本比较
from packaging import version
# 从 transformers 库导入 CLIP 相关模型和处理器
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection
# 导入回调函数相关的类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
# 导入配置相关的工具类
from ...configuration_utils import FrozenDict
# 导入图像处理相关的输入类
from ...image_processor import PipelineImageInput, VaeImageProcessor
# 导入多种加载器混合类
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入模型相关的类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
# 导入 Lora 调整函数
from ...models.lora import adjust_lora_scale_text_encoder
# 导入调度器
from ...schedulers import KarrasDiffusionSchedulers
# 导入实用工具函数
from ...utils import (
PIL_INTERPOLATION, # PIL 图像插值方法
USE_PEFT_BACKEND, # 是否使用 PEFT 后端
deprecate, # 用于标记弃用功能
logging, # 日志记录工具
replace_example_docstring, # 替换示例文档字符串的函数
scale_lora_layers, # 调整 Lora 层的比例
unscale_lora_layers, # 取消 Lora 层的比例
)
# 导入随机张量生成工具
from ...utils.torch_utils import randn_tensor
# 导入管道相关工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入输出类
from . import StableDiffusionPipelineOutput
# 导入安全检查器
from .safety_checker import StableDiffusionSafetyChecker
# 创建日志记录器实例,用于记录日志
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,展示用法示例
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import requests # 导入 requests 库,用于发送 HTTP 请求
>>> import torch # 导入 PyTorch 库
>>> from PIL import Image # 从 PIL 导入图像处理类
>>> from io import BytesIO # 从 io 导入字节流处理类
>>> from diffusers import StableDiffusionImg2ImgPipeline # 导入图像到图像的稳定扩散管道
>>> device = "cuda" # 指定使用的设备为 GPU
>>> model_id_or_path = "runwayml/stable-diffusion-v1-5" # 指定模型 ID 或路径
>>> pipe = StableDiffusionImg2ImgPipeline.from_pretrained(model_id_or_path, torch_dtype=torch.float16) # 从预训练模型创建管道
>>> pipe = pipe.to(device) # 将管道转移到指定设备
>>> url = "https://raw.githubusercontent.com/CompVis/stable-diffusion/main/assets/stable-samples/img2img/sketch-mountains-input.jpg" # 图像 URL
>>> response = requests.get(url) # 发送 GET 请求以获取图像
>>> init_image = Image.open(BytesIO(response.content)).convert("RGB") # 打开图像并转换为 RGB 格式
>>> init_image = init_image.resize((768, 512)) # 调整图像大小
>>> prompt = "A fantasy landscape, trending on artstation" # 设置生成图像的提示文本
>>> images = pipe(prompt=prompt, image=init_image, strength=0.75, guidance_scale=7.5).images # 生成图像
>>> images[0].save("fantasy_landscape.png") # 保存生成的图像
```py
"""
# 定义一个函数以检索潜在变量
def retrieve_latents(
encoder_output: torch.Tensor, # 输入的编码器输出张量
generator: Optional[torch.Generator] = None, # 可选的随机数生成器
sample_mode: str = "sample" # 采样模式,默认为“sample”
):
# 检查 encoder_output 是否有 "latent_dist" 属性,并且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从 latent_dist 中采样并返回结果
return encoder_output.latent_dist.sample(generator)
# 检查 encoder_output 是否有 "latent_dist" 属性,并且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的众数作为结果
return encoder_output.latent_dist.mode()
# 检查 encoder_output 是否有 "latents" 属性
elif hasattr(encoder_output, "latents"):
# 直接返回 latents 属性的值
return encoder_output.latents
# 如果没有找到任何相关属性,抛出属性错误
else:
raise AttributeError("Could not access latents of provided encoder_output")
# 定义预处理图像的函数
def preprocess(image):
# 定义弃用警告信息,说明该方法在未来的版本中将被移除
deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
# 调用弃用函数,传入方法名、版本号、警告信息及标准警告参数
deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
# 检查输入是否为 PyTorch 张量
if isinstance(image, torch.Tensor):
# 如果是张量,直接返回
return image
# 检查输入是否为 PIL 图像
elif isinstance(image, PIL.Image.Image):
# 将单个图像放入列表中
image = [image]
# 如果列表中的第一个元素是 PIL 图像
if isinstance(image[0], PIL.Image.Image):
# 获取图像的宽和高
w, h = image[0].size
# 将宽高调整为 8 的整数倍
w, h = (x - x % 8 for x in (w, h)) # resize to integer multiple of 8
# 对每个图像进行调整大小,并转为 numpy 数组,增加维度
image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
# 将所有图像在第一个维度上拼接
image = np.concatenate(image, axis=0)
# 将数组转换为浮点型并归一化到 [0, 1] 区间
image = np.array(image).astype(np.float32) / 255.0
# 调整维度顺序为 (批量, 通道, 高, 宽)
image = image.transpose(0, 3, 1, 2)
# 将值映射到 [-1, 1] 区间
image = 2.0 * image - 1.0
# 将 numpy 数组转换为 PyTorch 张量
image = torch.from_numpy(image)
# 如果列表中的第一个元素是 PyTorch 张量
elif isinstance(image[0], torch.Tensor):
# 在第一个维度上拼接所有张量
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
# 定义从调度器获取时间步的函数
def retrieve_timesteps(
scheduler,
num_inference_steps: Optional[int] = None,
device: Optional[Union[str, torch.device]] = None,
timesteps: Optional[List[int]] = None,
sigmas: Optional[List[float]] = None,
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器检索时间步。处理自定义时间步。任何关键字参数将传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
从中获取时间步的调度器。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数量。如果使用,则 `timesteps` 必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步要移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`, *可选*):
自定义时间步,用于覆盖调度器的时间步间隔策略。如果传入 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
自定义 sigma,用于覆盖调度器的时间步间隔策略。如果传入 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,第一个元素是调度器的时间步调度,第二个元素是推理步骤的数量。
"""
# 检查是否同时传入了时间步和 sigma
if timesteps is not None and sigmas is not None:
# 如果同时传入,抛出错误
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查 timesteps 是否为 None,确定是否需要使用自定义时间步
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受 timesteps 参数
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受 timesteps,则抛出 ValueError 异常
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,设置自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器中的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 检查 sigmas 是否为 None,确定是否需要使用自定义 sigmas
elif sigmas is not None:
# 检查调度器的 set_timesteps 方法是否接受 sigmas 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受 sigmas,则抛出 ValueError 异常
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,设置自定义 sigmas
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取调度器中的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果 timesteps 和 sigmas 都为 None
else:
# 调用调度器的 set_timesteps 方法,使用推理步骤的数量
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器中的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤的数量
return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionImg2ImgPipeline 的类,继承多个混合类以实现功能
class StableDiffusionImg2ImgPipeline(
# 继承自 DiffusionPipeline 类
DiffusionPipeline,
# 继承自 StableDiffusionMixin 类
StableDiffusionMixin,
# 继承自 TextualInversionLoaderMixin 类
TextualInversionLoaderMixin,
# 继承自 IPAdapterMixin 类
IPAdapterMixin,
# 继承自 StableDiffusionLoraLoaderMixin 类
StableDiffusionLoraLoaderMixin,
# 继承自 FromSingleFileMixin 类
FromSingleFileMixin,
):
# 文档字符串,描述该管道的功能和参数
r"""
Pipeline for text-guided image-to-image generation using Stable Diffusion.
# 说明该模型继承自 DiffusionPipeline,提供通用方法的文档
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
# 说明该管道还继承了多个加载方法
The pipeline also inherits the following loading methods:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
# 定义该类的参数及其功能
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
text_encoder ([`~transformers.CLIPTextModel`]):
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
tokenizer ([`~transformers.CLIPTokenizer`]):
A `CLIPTokenizer` to tokenize text.
unet ([`UNet2DConditionModel`]):
A `UNet2DConditionModel` to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
safety_checker ([`StableDiffusionSafetyChecker`]):
Classification module that estimates whether generated images could be considered offensive or harmful.
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
about a model's potential harms.
feature_extractor ([`~transformers.CLIPImageProcessor`]):
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
"""
# 定义一个字符串,指定 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
# 定义一个可选组件列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义一个不参与 CPU 卸载的组件列表
_exclude_from_cpu_offload = ["safety_checker"]
# 定义一个用于回调的张量输入列表
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
# 初始化类的构造函数,接收多个参数
def __init__(
self,
# 变分自编码器模型
vae: AutoencoderKL,
# 文本编码器模型
text_encoder: CLIPTextModel,
# 词汇表处理器
tokenizer: CLIPTokenizer,
# 2D 条件生成模型
unet: UNet2DConditionModel,
# Karras 扩散调度器
scheduler: KarrasDiffusionSchedulers,
# 稳定扩散安全检查器
safety_checker: StableDiffusionSafetyChecker,
# 图像处理器
feature_extractor: CLIPImageProcessor,
# 可选的图像编码器模型
image_encoder: CLIPVisionModelWithProjection = None,
# 是否需要安全检查器,默认值为 True
requires_safety_checker: bool = True,
# 从 StableDiffusionPipeline 的 _encode_prompt 方法复制
def _encode_prompt(
self,
# 输入的提示文本
prompt,
# 设备类型
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否使用分类器自由引导
do_classifier_free_guidance,
# 可选的负面提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LoRA 缩放因子
lora_scale: Optional[float] = None,
# 其他关键字参数
**kwargs,
):
# 过时警告信息,提醒用户方法即将被删除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用过时警告函数
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,获取提示嵌入元组
prompt_embeds_tuple = self.encode_prompt(
# 输入的提示文本
prompt=prompt,
# 设备类型
device=device,
# 每个提示生成的图像数量
num_images_per_prompt=num_images_per_prompt,
# 是否使用分类器自由引导
do_classifier_free_guidance=do_classifier_free_guidance,
# 可选的负面提示文本
negative_prompt=negative_prompt,
# 可选的提示嵌入
prompt_embeds=prompt_embeds,
# 可选的负面提示嵌入
negative_prompt_embeds=negative_prompt_embeds,
# 可选的 LoRA 缩放因子
lora_scale=lora_scale,
# 其他关键字参数
**kwargs,
)
# 将提示嵌入元组中的两个部分连接以便向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 的 encode_prompt 方法复制
def encode_prompt(
self,
# 输入的提示文本
prompt,
# 设备类型
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否使用分类器自由引导
do_classifier_free_guidance,
# 可选的负面提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LoRA 缩放因子
lora_scale: Optional[float] = None,
# 可选的跳过的 CLIP 层数
clip_skip: Optional[int] = None,
# 从 StableDiffusionPipeline 的 encode_image 方法复制
# 定义编码图像的函数,接受图像、设备、每个提示的图像数量和可选的隐藏状态输出
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 检查输入的图像是否为张量类型
if not isinstance(image, torch.Tensor):
# 使用特征提取器将图像转换为张量,并返回像素值
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定设备,并转换为指定数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 获取图像编码器的隐藏状态的倒数第二层
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 将隐藏状态按提示数量进行重复
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 生成与输入图像形状相同的全零张量,并获取其隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 将无条件隐藏状态按提示数量进行重复
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回图像和无条件的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 直接获取图像编码器的图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 将图像嵌入按提示数量进行重复
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入形状相同的全零张量作为无条件嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回图像嵌入和无条件嵌入
return image_embeds, uncond_image_embeds
# 从稳定扩散管道复制的函数,用于准备适配器图像嵌入
def prepare_ip_adapter_image_embeds(
# 定义函数参数:适配器图像、适配器图像嵌入、设备、每个提示的图像数量和是否使用分类器自由引导
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 初始化一个空列表,用于存储图像嵌入
image_embeds = []
# 如果启用无分类器自由引导,初始化一个空列表存储负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果输入适配器的图像嵌入为空
if ip_adapter_image_embeds is None:
# 检查输入适配器的图像是否为列表类型,如果不是,则转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查输入适配器的图像数量与 IP 适配器数量是否匹配
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
raise ValueError(
# 如果不匹配,抛出值错误并给出相关信息
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历每个输入适配器图像和对应的图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 判断输出隐藏状态是否为图像投影层的实例
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像,获取其嵌入和负嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将单个图像嵌入添加到列表中,增加一个维度
image_embeds.append(single_image_embeds[None, :])
# 如果启用无分类器自由引导,将负嵌入添加到列表中
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 如果输入适配器图像嵌入不为空,遍历每个嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用无分类器自由引导,拆分负嵌入和图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
# 将负嵌入添加到列表中
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化一个空列表,用于存储适配器图像嵌入
ip_adapter_image_embeds = []
# 遍历图像嵌入及其索引
for i, single_image_embeds in enumerate(image_embeds):
# 将单个图像嵌入重复 num_images_per_prompt 次,并在维度 0 上连接
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用无分类器自由引导,处理负嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负嵌入和正嵌入连接在一起
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入移动到指定设备上
single_image_embeds = single_image_embeds.to(device=device)
# 将处理后的图像嵌入添加到列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回适配器图像嵌入列表
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
# 定义一个方法来运行安全检查器,接受图像、设备和数据类型作为参数
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则没有不安全内容的概念
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入图像是一个张量
if torch.is_tensor(image):
# 将图像处理为 PIL 格式以供特征提取
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果图像不是张量,则将其从 NumPy 格式转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征,并将其转移到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,返回处理后的图像和是否存在不安全内容的概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和不安全内容的概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
# 定义一个方法来解码潜在向量
def decode_latents(self, latents):
# 定义弃用警告信息
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 发出弃用警告
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 配置的缩放因子调整潜在向量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在向量,返回字典中的第一个元素(图像)
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像的像素值归一化到 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式以兼容 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
# 定义一个方法来准备额外的调度器步骤关键字参数
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因为并非所有调度器的签名相同
# eta(η)仅在 DDIMScheduler 中使用,其他调度器将忽略该参数。
# eta 对应于 DDIM 论文中的 η,范围应在 [0, 1] 之间
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外的步骤关键字参数字典
extra_step_kwargs = {}
if accepts_eta:
# 如果接受 eta,则将其添加到字典中
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
# 如果接受 generator,则将其添加到字典中
extra_step_kwargs["generator"] = generator
# 返回准备好的额外步骤关键字参数
return extra_step_kwargs
# 定义一个方法来检查输入参数的有效性
def check_inputs(
self,
prompt,
strength,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
ip_adapter_image=None,
ip_adapter_image_embeds=None,
callback_on_step_end_tensor_inputs=None,
# 定义获取时间步的方法,参数包括推理步骤数量、强度和设备类型
def get_timesteps(self, num_inference_steps, strength, device):
# 计算初始时间步,取num_inference_steps和num_inference_steps * strength的最小值
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算时间步开始的位置,确保不小于0
t_start = max(num_inference_steps - init_timestep, 0)
# 从调度器中获取时间步,从t_start开始到结束
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果调度器有设置开始索引的方法,则调用它
if hasattr(self.scheduler, "set_begin_index"):
self.scheduler.set_begin_index(t_start * self.scheduler.order)
# 返回时间步和剩余推理步骤数量
return timesteps, num_inference_steps - t_start
# 从指定的文本到图像管道中复制的方法,用于获取引导比例嵌入
def get_guidance_scale_embedding(
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
"""
参考文献链接,获取引导比例嵌入。
参数:
w (`torch.Tensor`):
使用指定的引导比例生成嵌入向量,以丰富时间步嵌入。
embedding_dim (`int`, *可选*, 默认为512):
生成的嵌入的维度。
dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
生成的嵌入的数据类型。
返回:
`torch.Tensor`: 形状为`(len(w), embedding_dim)`的嵌入向量。
"""
# 确保输入张量的形状是一维的
assert len(w.shape) == 1
# 将w乘以1000.0以调整比例
w = w * 1000.0
# 计算嵌入的半维度
half_dim = embedding_dim // 2
# 计算用于嵌入的缩放因子
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 生成指数衰减的嵌入
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 扩展w并计算最终的嵌入
emb = w.to(dtype)[:, None] * emb[None, :]
# 将正弦和余弦嵌入连接在一起
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度为奇数,进行零填充
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保嵌入的形状与预期相符
assert emb.shape == (w.shape[0], embedding_dim)
# 返回生成的嵌入
return emb
# 定义引导比例属性,返回私有变量
@property
def guidance_scale(self):
return self._guidance_scale
# 定义跳过剪辑属性,返回私有变量
@property
def clip_skip(self):
return self._clip_skip
# 定义是否进行无分类器引导的属性,基于引导比例和UNet配置
@property
def do_classifier_free_guidance(self):
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 定义交叉注意力参数的属性,返回私有变量
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 定义时间步数属性,返回私有变量
@property
def num_timesteps(self):
return self._num_timesteps
# 定义中断属性,返回私有变量
@property
def interrupt(self):
return self._interrupt
# 该方法不计算梯度,并替换示例文档字符串
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义一个可调用的类方法,用于处理生成图像的请求
def __call__(
# 提示文本,可以是单个字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 输入的图像,通常用于处理图像生成
image: PipelineImageInput = None,
# 生成强度的参数,默认值为0.8
strength: float = 0.8,
# 推理步骤的数量,默认值为50
num_inference_steps: Optional[int] = 50,
# 采样的时间步,通常用于控制生成过程
timesteps: List[int] = None,
# 噪声水平的列表,用于控制生成图像的随机性
sigmas: List[float] = None,
# 指导比例,控制生成图像的多样性,默认值为7.5
guidance_scale: Optional[float] = 7.5,
# 负面提示文本,可以是单个字符串或字符串列表,用于避免某些特征
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认值为1
num_images_per_prompt: Optional[int] = 1,
# 生成过程中使用的超参数,默认值为0.0
eta: Optional[float] = 0.0,
# 随机数生成器,可以是单个生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 预先计算的提示嵌入,用于加速生成
prompt_embeds: Optional[torch.Tensor] = None,
# 预先计算的负面提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 用于图像适配器的输入图像
ip_adapter_image: Optional[PipelineImageInput] = None,
# 图像适配器的输入图像嵌入列表
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认值为"pil",表示返回PIL图像
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认值为True
return_dict: bool = True,
# 交叉注意力的额外参数字典
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的clip层数,用于调整模型的特征提取
clip_skip: int = None,
# 结束步骤时的回调函数,可以是单个函数或多个回调的组合
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 结束步骤时的张量输入的列表,默认值为["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 其他额外的参数
**kwargs,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_inpaint.py
# 版权信息,声明该代码的所有权和许可信息
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版(“许可证”)许可;
# 除非符合许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用的法律要求或书面同意,否则根据许可证分发的软件是按“原样”基础分发的,
# 不附有任何形式的明示或暗示的担保或条件。
# 有关许可证的具体条款和权限限制,请参阅许可证。
# 导入 inspect 模块,用于获取对象的签名和信息
import inspect
# 导入类型相关的类,用于类型注解
from typing import Any, Callable, Dict, List, Optional, Union
# 导入图像处理库 PIL
import PIL.Image
# 导入 PyTorch 库
import torch
# 导入版本管理工具
from packaging import version
# 导入 Hugging Face Transformers 中的相关类
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection
# 导入其他模块和类,涉及回调、配置、图像处理等
from ...callbacks import MultiPipelineCallbacks, PipelineCallback
from ...configuration_utils import FrozenDict
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
from ...models import AsymmetricAutoencoderKL, AutoencoderKL, ImageProjection, UNet2DConditionModel
from ...models.lora import adjust_lora_scale_text_encoder
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
from ...utils.torch_utils import randn_tensor
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyChecker
# 初始化日志记录器,以当前模块的名称为标识
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义函数,从编码器输出中检索潜在变量
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents
def retrieve_latents(
encoder_output: torch.Tensor, # 输入的编码器输出,类型为张量
generator: Optional[torch.Generator] = None, # 可选的随机数生成器
sample_mode: str = "sample" # 采样模式,默认为 "sample"
):
# 如果编码器输出有潜在分布且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从潜在分布中采样并返回结果
return encoder_output.latent_dist.sample(generator)
# 如果编码器输出有潜在分布且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回潜在分布的众数
return encoder_output.latent_dist.mode()
# 如果编码器输出有潜在变量
elif hasattr(encoder_output, "latents"):
# 直接返回潜在变量
return encoder_output.latents
# 如果以上条件都不满足,抛出属性错误
else:
raise AttributeError("Could not access latents of provided encoder_output")
# 定义函数,从调度器中检索时间步
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps(
scheduler, # 调度器对象
num_inference_steps: Optional[int] = None, # 可选的推理步骤数量
device: Optional[Union[str, torch.device]] = None, # 可选的设备信息
timesteps: Optional[List[int]] = None, # 可选的时间步列表
sigmas: Optional[List[float]] = None, # 可选的 sigma 值列表
**kwargs, # 其他可选参数
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器中检索时间步。
处理自定义时间步。任何其他参数都将传递给 `scheduler.set_timesteps`。
# 定义函数参数的文档字符串
Args:
scheduler (`SchedulerMixin`):
# 调度器,用于获取时间步
The scheduler to get timesteps from.
num_inference_steps (`int`):
# 生成样本时使用的扩散步骤数量,如果使用此参数,则 `timesteps` 必须为 `None`
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`.
device (`str` or `torch.device`, *optional*):
# 要将时间步移动到的设备,如果为 `None`,则时间步不移动
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*):
# 自定义时间步,用于覆盖调度器的时间步间隔策略,如果提供了 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`.
sigmas (`List[float]`, *optional*):
# 自定义 sigma 值,用于覆盖调度器的时间步间隔策略,如果提供了 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`.
Returns:
# 返回一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步骤的数量
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps.
"""
# 检查是否同时传入了 `timesteps` 和 `sigmas`
if timesteps is not None and sigmas is not None:
# 如果同时存在,则引发错误
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查是否传入了 `timesteps`
if timesteps is not None:
# 检查调度器是否接受自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则引发错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 检查是否传入了 `sigmas`
elif sigmas is not None:
# 检查调度器是否接受自定义 sigma 值
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不支持,则引发错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的 sigma 值
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果都没有传入,则使用默认推理步骤设置时间步
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤数量
return timesteps, num_inference_steps
# 定义一个用于文本引导图像修复的管道类,继承自多个基类
class StableDiffusionInpaintPipeline(
# 继承自 DiffusionPipeline,提供通用的管道功能
DiffusionPipeline,
# 继承自 StableDiffusionMixin,增加稳定扩散特性
StableDiffusionMixin,
# 继承自 TextualInversionLoaderMixin,支持文本反转加载
TextualInversionLoaderMixin,
# 继承自 IPAdapterMixin,支持 IP 适配器加载
IPAdapterMixin,
# 继承自 StableDiffusionLoraLoaderMixin,支持 LoRA 权重加载
StableDiffusionLoraLoaderMixin,
# 继承自 FromSingleFileMixin,支持从单个文件加载
FromSingleFileMixin,
):
# 文档字符串,描述管道的功能和参数
r"""
Pipeline for text-guided image inpainting using Stable Diffusion.
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
The pipeline also inherits the following loading methods:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] for loading LoRA weights
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] for saving LoRA weights
- [`~loaders.IPAdapterMixin.load_ip_adapter`] for loading IP Adapters
- [`~loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files
Args:
vae ([`AutoencoderKL`, `AsymmetricAutoencoderKL`]):
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModel`]):
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
tokenizer ([`~transformers.CLIPTokenizer`]):
A `CLIPTokenizer` to tokenize text.
unet ([`UNet2DConditionModel`]):
A `UNet2DConditionModel` to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
safety_checker ([`StableDiffusionSafetyChecker`]):
Classification module that estimates whether generated images could be considered offensive or harmful.
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
about a model's potential harms.
feature_extractor ([`~transformers.CLIPImageProcessor`]):
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
"""
# 定义模型的 CPU 卸载顺序,指定组件的处理顺序
model_cpu_offload_seq = "text_encoder->image_encoder->unet->vae"
# 可选组件列表,包含安全检查器和特征提取器
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 排除 CPU 卸载的组件,安全检查器不会被卸载
_exclude_from_cpu_offload = ["safety_checker"]
# 定义需要回调的张量输入列表,包含潜在变量和提示嵌入
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "mask", "masked_image_latents"]
# 初始化方法,用于创建类的实例
def __init__(
self,
vae: Union[AutoencoderKL, AsymmetricAutoencoderKL], # VAE模型,支持两种类型
text_encoder: CLIPTextModel, # 文本编码器,处理文本输入
tokenizer: CLIPTokenizer, # 分词器,将文本转换为标记
unet: UNet2DConditionModel, # UNet模型,处理生成任务
scheduler: KarrasDiffusionSchedulers, # 调度器,用于调整生成过程
safety_checker: StableDiffusionSafetyChecker, # 安全检查器,确保生成内容的安全性
feature_extractor: CLIPImageProcessor, # 特征提取器,处理图像输入
image_encoder: CLIPVisionModelWithProjection = None, # 可选图像编码器,用于图像的额外处理
requires_safety_checker: bool = True, # 是否需要安全检查器的标志
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制
def _encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备信息(CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否执行无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的LORA缩放因子
**kwargs, # 其他可选参数
):
# 弃用消息,提示用户该方法将来会被移除
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 发出弃用警告
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法并获取嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 提示文本
device=device, # 设备信息
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 无分类器引导标志
negative_prompt=negative_prompt, # 负面提示文本
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负面提示嵌入
lora_scale=lora_scale, # LORA缩放因子
**kwargs, # 其他参数
)
# 将嵌入元组的内容拼接以兼容旧版
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回拼接后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制
def encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备信息(CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否执行无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的LORA缩放因子
clip_skip: Optional[int] = None, # 可选的剪切参数
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制
# 定义一个方法用于编码图像,接收图像、设备、每个提示的图像数量和可选的隐藏状态输出
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入的图像不是张量类型,则使用特征提取器处理图像并返回张量格式
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像转移到指定设备,并转换为正确的数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 编码图像并获取倒数第二层的隐藏状态
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 将隐藏状态在第0维上重复,数量为每个提示的图像数量
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 对一个全零的图像进行编码以获取无条件图像的隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 同样在第0维上重复无条件图像的隐藏状态
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回有条件和无条件的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 如果不需要输出隐藏状态,直接编码图像并获取图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 在第0维上重复图像嵌入,数量为每个提示的图像数量
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入相同形状的全零张量作为无条件图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回有条件和无条件的图像嵌入
return image_embeds, uncond_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的方法,用于准备图像嵌入
def prepare_ip_adapter_image_embeds(
# 接收输入适配器图像、适配器图像嵌入、设备、每个提示的图像数量和分类器自由引导的开关
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
# 定义函数体的结束
):
# 初始化一个空列表以存储图像嵌入
image_embeds = []
# 如果启用无分类器自由引导,则初始化负图像嵌入列表
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果输入适配器图像嵌入为空
if ip_adapter_image_embeds is None:
# 检查输入适配器图像是否为列表,如果不是则转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查输入适配器图像与 IP 适配器数量是否匹配
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
# 如果不匹配,则抛出值错误
raise ValueError(
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历输入适配器图像与图像投影层的组合
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 判断输出隐藏状态是否为 True
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像以获取图像嵌入和负图像嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds[None, :])
# 如果启用无分类器自由引导,则添加负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 遍历现有的输入适配器图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用无分类器自由引导,则分割嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
# 将负图像嵌入添加到列表中
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化一个空列表以存储 IP 适配器图像嵌入
ip_adapter_image_embeds = []
# 遍历图像嵌入和它们的索引
for i, single_image_embeds in enumerate(image_embeds):
# 将单个图像嵌入按数量扩展
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用无分类器自由引导,则扩展负图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负图像嵌入与正图像嵌入合并
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入移动到指定设备
single_image_embeds = single_image_embeds.to(device=device)
# 将处理后的图像嵌入添加到列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回 IP 适配器图像嵌入列表
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的代码
# 运行安全检查器以确保图像符合安全标准
def run_safety_checker(self, image, device, dtype):
# 检查安全检查器是否存在
if self.safety_checker is None:
# 如果没有安全检查器,设置NSFW概念为None
has_nsfw_concept = None
else:
# 如果输入是张量,则处理为PIL格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入是numpy数组,将其转换为PIL格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征并将其传输到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,并返回处理后的图像和NSFW概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和NSFW概念
return image, has_nsfw_concept
# 从StableDiffusionPipeline复制的函数,准备额外的调度器步骤参数
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因为并非所有调度器具有相同的签名
# eta仅在DDIM调度器中使用,其他调度器会忽略
# eta对应于DDIM论文中的η,应在[0, 1]之间
# 检查调度器是否接受eta参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建额外步骤参数的字典
extra_step_kwargs = {}
if accepts_eta:
# 如果接受eta,则将其添加到字典中
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受generator参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
# 如果接受generator,则将其添加到字典中
extra_step_kwargs["generator"] = generator
# 返回额外步骤参数字典
return extra_step_kwargs
# 检查输入参数的有效性和完整性
def check_inputs(
self,
prompt,
image,
mask_image,
height,
width,
strength,
callback_steps,
output_type,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
ip_adapter_image=None,
ip_adapter_image_embeds=None,
callback_on_step_end_tensor_inputs=None,
padding_mask_crop=None,
# 准备潜在变量的函数
def prepare_latents(
self,
batch_size,
num_channels_latents,
height,
width,
dtype,
device,
generator,
latents=None,
image=None,
timestep=None,
is_strength_max=True,
return_noise=False,
return_image_latents=False,
):
# 定义形状,包含批次大小、通道数、调整后的高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 验证图像和时间步是否提供,且强度不为最大值
if (image is None or timestep is None) and not is_strength_max:
raise ValueError(
"Since strength < 1. initial latents are to be initialised as a combination of Image + Noise."
"However, either the image or the noise timestep has not been provided."
)
# 根据条件处理图像潜在变量
if return_image_latents or (latents is None and not is_strength_max):
# 将图像移动到指定设备和数据类型
image = image.to(device=device, dtype=dtype)
# 如果图像有4个通道,则直接使用图像潜在变量
if image.shape[1] == 4:
image_latents = image
else:
# 使用 VAE 编码图像以获取潜在变量
image_latents = self._encode_vae_image(image=image, generator=generator)
# 根据批次大小重复潜在变量
image_latents = image_latents.repeat(batch_size // image_latents.shape[0], 1, 1, 1)
# 如果潜在变量为空,则生成噪声
if latents is None:
noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 根据强度初始化潜在变量
latents = noise if is_strength_max else self.scheduler.add_noise(image_latents, noise, timestep)
# 如果强度为最大值,则按调度器的初始 sigma 缩放潜在变量
latents = latents * self.scheduler.init_noise_sigma if is_strength_max else latents
else:
# 将现有潜在变量转换到设备上
noise = latents.to(device)
# 按调度器的初始 sigma 缩放潜在变量
latents = noise * self.scheduler.init_noise_sigma
# 输出结果,包括潜在变量
outputs = (latents,)
# 如果需要返回噪声,则添加噪声到输出
if return_noise:
outputs += (noise,)
# 如果需要返回图像潜在变量,则添加到输出
if return_image_latents:
outputs += (image_latents,)
# 返回最终输出
return outputs
# 编码 VAE 图像以获取潜在变量
def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
# 检查生成器是否为列表
if isinstance(generator, list):
# 为每个图像批次编码潜在变量并检索
image_latents = [
retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i])
for i in range(image.shape[0])
]
# 将所有潜在变量合并为一个张量
image_latents = torch.cat(image_latents, dim=0)
else:
# 使用单个生成器编码图像并检索潜在变量
image_latents = retrieve_latents(self.vae.encode(image), generator=generator)
# 按配置的缩放因子缩放潜在变量
image_latents = self.vae.config.scaling_factor * image_latents
# 返回缩放后的潜在变量
return image_latents
# 准备掩膜潜在变量的方法
def prepare_mask_latents(
self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance
):
# 将掩码调整为与潜在特征图的形状相同,以便将掩码与潜在特征图拼接
# 在转换数据类型之前进行此操作,以避免在使用 cpu_offload 和半精度时出现问题
mask = torch.nn.functional.interpolate(
# 使用插值方法调整掩码的大小,目标尺寸为根据 VAE 缩放因子调整后的高度和宽度
mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor)
)
# 将掩码移动到指定设备,并转换为指定数据类型
mask = mask.to(device=device, dtype=dtype)
# 将掩码图像移动到指定设备,并转换为指定数据类型
masked_image = masked_image.to(device=device, dtype=dtype)
# 检查掩码图像的通道数是否为4
if masked_image.shape[1] == 4:
# 如果是4通道,则将其直接赋值给潜在特征图
masked_image_latents = masked_image
else:
# 否则,使用 VAE 编码掩码图像以获取潜在特征图
masked_image_latents = self._encode_vae_image(masked_image, generator=generator)
# 针对每个提示生成,重复掩码和潜在特征图以适应批量大小,使用对 MPS 友好的方法
if mask.shape[0] < batch_size:
# 如果掩码的数量小于批量大小,则检查批量大小是否能被掩码数量整除
if not batch_size % mask.shape[0] == 0:
# 如果不能整除,抛出值错误
raise ValueError(
"传入的掩码与所需的批量大小不匹配。掩码应复制到"
f" 总批量大小 {batch_size},但传入了 {mask.shape[0]} 个掩码。请确保传入的掩码数量"
" 能被所请求的总批量大小整除。"
)
# 通过重复掩码,调整其数量以匹配批量大小
mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)
# 同样检查潜在特征图的数量
if masked_image_latents.shape[0] < batch_size:
# 检查批量大小是否能被潜在特征图数量整除
if not batch_size % masked_image_latents.shape[0] == 0:
# 如果不能整除,抛出值错误
raise ValueError(
"传入的图像与所需的批量大小不匹配。图像应复制到"
f" 总批量大小 {batch_size},但传入了 {masked_image_latents.shape[0]} 个图像。"
" 请确保传入的图像数量能被所请求的总批量大小整除。"
)
# 通过重复潜在特征图,调整其数量以匹配批量大小
masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)
# 如果启用分类器自由引导,则重复掩码以进行拼接
mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
# 如果启用分类器自由引导,则重复潜在特征图以进行拼接
masked_image_latents = (
torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents
)
# 确保潜在特征图的设备与潜在模型输入一致,以避免设备错误
masked_image_latents = masked_image_latents.to(device=device, dtype=dtype)
# 返回掩码和潜在特征图
return mask, masked_image_latents
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.StableDiffusionImg2ImgPipeline.get_timesteps 复制
# 定义获取时间步长的函数,接受推理步数、强度和设备作为参数
def get_timesteps(self, num_inference_steps, strength, device):
# 根据给定的推理步数和强度计算初始时间步
init_timestep = min(int(num_inference_steps * strength), num_inference_steps)
# 计算时间步开始的索引,确保不小于零
t_start = max(num_inference_steps - init_timestep, 0)
# 从调度器中获取时间步长,从 t_start 开始
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order :]
# 如果调度器具有设置开始索引的方法,则调用该方法
if hasattr(self.scheduler, "set_begin_index"):
self.scheduler.set_begin_index(t_start * self.scheduler.order)
# 返回时间步长和剩余的推理步数
return timesteps, num_inference_steps - t_start
# 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 中复制的函数
def get_guidance_scale_embedding(
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
"""
参考链接: https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
参数:
w (`torch.Tensor`):
生成带有指定引导尺度的嵌入向量,以后丰富时间步嵌入。
embedding_dim (`int`, *可选*, 默认为 512):
要生成的嵌入维度。
dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
生成的嵌入的数据类型。
返回:
`torch.Tensor`: 嵌入向量,形状为 `(len(w), embedding_dim)`。
"""
# 确保输入的张量 w 是一维的
assert len(w.shape) == 1
# 将 w 扩大 1000 倍
w = w * 1000.0
# 计算嵌入维度的一半
half_dim = embedding_dim // 2
# 计算嵌入的基数
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 计算指数衰减的嵌入值
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 将 w 转换为目标数据类型,并与嵌入值相乘
emb = w.to(dtype)[:, None] * emb[None, :]
# 将正弦和余弦值连接成最终嵌入
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度是奇数,则在最后填充一个零
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保最终的嵌入形状是正确的
assert emb.shape == (w.shape[0], embedding_dim)
# 返回计算得到的嵌入
return emb
# 获取引导尺度的属性
@property
def guidance_scale(self):
return self._guidance_scale
# 获取剪辑跳过的属性
@property
def clip_skip(self):
return self._clip_skip
# 判断是否进行无分类器引导的属性
# 这里的 `guidance_scale` 类似于公式 (2) 中的引导权重 `w`
# 参见 Imagen 论文: https://arxiv.org/pdf/2205.11487.pdf 。`guidance_scale = 1`
# 表示不进行分类器自由引导。
@property
def do_classifier_free_guidance(self):
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 获取交叉注意力的关键字参数
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 获取时间步数的属性
@property
def num_timesteps(self):
return self._num_timesteps
# 获取中断状态的属性
@property
def interrupt(self):
return self._interrupt
# 在计算梯度时不追踪
@torch.no_grad()
# 定义一个可调用的类方法,允许传入多个参数
def __call__(
# 提示信息,可以是字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 输入图像,用于处理的管道图像
image: PipelineImageInput = None,
# 用于掩蔽的图像
mask_image: PipelineImageInput = None,
# 掩蔽图像的潜在表示,Tensor 类型
masked_image_latents: torch.Tensor = None,
# 输出图像的高度,默认为 None
height: Optional[int] = None,
# 输出图像的宽度,默认为 None
width: Optional[int] = None,
# 填充掩码裁剪的大小,默认为 None
padding_mask_crop: Optional[int] = None,
# 强度参数,默认为 1.0
strength: float = 1.0,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 预定义的时间步列表,默认为 None
timesteps: List[int] = None,
# sigma 值列表,默认为 None
sigmas: List[float] = None,
# 指导缩放因子,默认为 7.5
guidance_scale: float = 7.5,
# 负提示信息,可以是字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# eta 参数,默认为 0.0
eta: float = 0.0,
# 随机数生成器,可以是单个或列表,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在表示,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示的嵌入表示,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示的嵌入表示,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输入适配器图像,默认为 None
ip_adapter_image: Optional[PipelineImageInput] = None,
# 输入适配器图像的嵌入列表,默认为 None
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认为 True
return_dict: bool = True,
# 交叉注意力的参数字典,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的剪辑步骤数,默认为 None
clip_skip: int = None,
# 在每个步骤结束时的回调函数,默认为 None
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 回调时输入的张量名称列表,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 额外的关键字参数
**kwargs,
.\diffusers\pipelines\stable_diffusion\pipeline_stable_diffusion_instruct_pix2pix.py
# 版权声明,包含版权信息和许可证使用条款
# Copyright 2024 The InstructPix2Pix Authors and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证第 2.0 版授权
# 您不得在不遵守许可证的情况下使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件在许可证下分发
# 是基于“按现状”原则,没有任何明示或暗示的担保或条件。
# 查看许可证以获取特定的权限和限制。
import inspect # 导入 inspect 模块,用于获取有关活跃对象的信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示相关的类
import numpy as np # 导入 numpy,常用的数值计算库
import PIL.Image # 导入 PIL.Image,处理图像的库
import torch # 导入 PyTorch,深度学习框架
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection # 导入 transformers 库中的 CLIP 相关类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 导入回调类
from ...image_processor import PipelineImageInput, VaeImageProcessor # 导入图像处理相关类
from ...loaders import IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器混合类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel # 导入模型类
from ...schedulers import KarrasDiffusionSchedulers # 导入调度器类
from ...utils import PIL_INTERPOLATION, deprecate, logging # 导入实用工具类
from ...utils.torch_utils import randn_tensor # 导入随机张量生成工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入管道工具类
from . import StableDiffusionPipelineOutput # 导入管道输出类
from .safety_checker import StableDiffusionSafetyChecker # 导入安全检查器类
logger = logging.get_logger(__name__) # 创建一个日志记录器,用于记录信息,禁用 pylint 名称无效警告
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.preprocess 复制的函数
def preprocess(image):
# 设置弃用警告消息,指示预处理方法已弃用
deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
# 调用弃用函数,显示警告
deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
# 如果输入是张量,则直接返回
if isinstance(image, torch.Tensor):
return image
# 如果输入是 PIL 图像,则将其封装在列表中
elif isinstance(image, PIL.Image.Image):
image = [image]
# 如果列表中的第一个元素是 PIL 图像
if isinstance(image[0], PIL.Image.Image):
w, h = image[0].size # 获取图像的宽和高
# 将宽高调整为 8 的整数倍
w, h = (x - x % 8 for x in (w, h)) # resize to integer multiple of 8
# 将图像调整为新的宽高,并转换为 NumPy 数组,增加一个维度
image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
# 将所有图像沿第0维合并
image = np.concatenate(image, axis=0)
# 将数组转换为 float32 类型并归一化到 [0, 1] 范围
image = np.array(image).astype(np.float32) / 255.0
# 调整数组维度顺序,从 (N, H, W, C) 转为 (N, C, H, W)
image = image.transpose(0, 3, 1, 2)
# 将像素值范围调整到 [-1, 1]
image = 2.0 * image - 1.0
# 将 NumPy 数组转换为 PyTorch 张量
image = torch.from_numpy(image)
# 如果输入是张量列表
elif isinstance(image[0], torch.Tensor):
# 将多个张量沿第0维合并
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents 复制的函数
def retrieve_latents(
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
# 检查 encoder_output 是否具有 "latent_dist" 属性,并且采样模式为 "sample"
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
# 从 latent_dist 中进行采样,使用指定的生成器
return encoder_output.latent_dist.sample(generator)
# 检查 encoder_output 是否具有 "latent_dist" 属性,并且采样模式为 "argmax"
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
# 返回 latent_dist 的众数
return encoder_output.latent_dist.mode()
# 检查 encoder_output 是否具有 "latents" 属性
elif hasattr(encoder_output, "latents"):
# 返回 latents 属性的值
return encoder_output.latents
# 如果以上条件都不满足,则抛出 AttributeError
else:
raise AttributeError("Could not access latents of provided encoder_output")
# 定义一个用于像素级图像编辑的管道类,继承多个混合类以实现功能
class StableDiffusionInstructPix2PixPipeline(
# 继承 DiffusionPipeline 类以获得基础功能
DiffusionPipeline,
# 继承 StableDiffusionMixin 以获取稳定扩散相关功能
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 以支持文本反转加载
TextualInversionLoaderMixin,
# 继承 StableDiffusionLoraLoaderMixin 以支持 LoRA 权重加载和保存
StableDiffusionLoraLoaderMixin,
# 继承 IPAdapterMixin 以支持 IP 适配器加载
IPAdapterMixin,
):
r"""
管道用于通过遵循文本指令进行像素级图像编辑(基于稳定扩散)。
该模型从 [`DiffusionPipeline`] 继承。有关所有管道实现的通用方法(下载、保存、在特定设备上运行等)的文档,请查看超类文档。
此管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器(VAE)模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行分词的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码图像潜在表示。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,估计生成的图像是否可能被认为是冒犯性或有害的。
有关模型潜在危害的更多详细信息,请参阅 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成的图像中提取特征的 `CLIPImageProcessor`;作为输入用于 `safety_checker`。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义从 CPU 卸载时排除的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 定义回调张量输入
_callback_tensor_inputs = ["latents", "prompt_embeds", "image_latents"]
# 构造函数,初始化管道所需的各个组件
def __init__(
# 初始化变分自编码器(VAE)
self,
vae: AutoencoderKL,
# 初始化文本编码器
text_encoder: CLIPTextModel,
# 初始化分词器
tokenizer: CLIPTokenizer,
# 初始化去噪模型
unet: UNet2DConditionModel,
# 初始化调度器
scheduler: KarrasDiffusionSchedulers,
# 初始化安全检查器
safety_checker: StableDiffusionSafetyChecker,
# 初始化特征提取器
feature_extractor: CLIPImageProcessor,
# 可选图像编码器
image_encoder: Optional[CLIPVisionModelWithProjection] = None,
# 是否需要安全检查器的标志
requires_safety_checker: bool = True,
# 初始化父类
):
super().__init__()
# 检查安全检查器是否为 None,并且需要安全检查器时发出警告
if safety_checker is None and requires_safety_checker:
logger.warning(
# 警告信息,提醒用户禁用安全检查器的风险和使用条款
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 检查安全检查器不为 None 时,特征提取器必须定义
if safety_checker is not None and feature_extractor is None:
raise ValueError(
# 抛出异常,提示用户需要定义特征提取器
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块,以便后续使用
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
image_encoder=image_encoder,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 将配置注册到类中,指明是否需要安全检查器
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 装饰器,指示该函数不需要计算梯度
@torch.no_grad()
def __call__(
# 输入参数,包括提示文本、图像、推理步骤等
prompt: Union[str, List[str]] = None,
image: PipelineImageInput = None,
num_inference_steps: int = 100,
guidance_scale: float = 7.5,
image_guidance_scale: float = 1.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
ip_adapter_image: Optional[PipelineImageInput] = None,
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
# 回调函数定义,处理步骤结束时的操作
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 定义步骤结束时的张量输入
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 交叉注意力的额外参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 接收额外参数
**kwargs,
# 定义一个编码提示的私有方法
def _encode_prompt(
self, # 方法的第一个参数,表示调用该方法时传入的提示文本
prompt, # 提示文本
device, # 设备类型(如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=None, # 可选的负提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入,类型为 Torch 张量
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入,类型为 Torch 张量
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制而来
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None): # 定义一个编码图像的方法
dtype = next(self.image_encoder.parameters()).dtype # 获取图像编码器参数的数据类型
if not isinstance(image, torch.Tensor): # 如果图像不是 Torch 张量
image = self.feature_extractor(image, return_tensors="pt").pixel_values # 使用特征提取器将图像转换为张量
image = image.to(device=device, dtype=dtype) # 将图像移动到指定设备并转换为相应的数据类型
if output_hidden_states: # 如果需要输出隐藏状态
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2] # 编码图像并获取倒数第二层的隐藏状态
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0) # 根据每个提示的图像数量重复隐藏状态
uncond_image_enc_hidden_states = self.image_encoder( # 编码零图像以获取无条件隐藏状态
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2] # 获取倒数第二层的隐藏状态
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave( # 根据每个提示的图像数量重复无条件隐藏状态
num_images_per_prompt, dim=0
)
return image_enc_hidden_states, uncond_image_enc_hidden_states # 返回编码后的隐藏状态
else: # 如果不需要输出隐藏状态
image_embeds = self.image_encoder(image).image_embeds # 编码图像并获取图像嵌入
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0) # 根据每个提示的图像数量重复图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds) # 创建与图像嵌入相同形状的零张量作为无条件嵌入
return image_embeds, uncond_image_embeds # 返回编码后的图像嵌入和无条件嵌入
# 定义一个准备图像嵌入的适配器方法
def prepare_ip_adapter_image_embeds(
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 检查 ip_adapter_image_embeds 是否为 None
if ip_adapter_image_embeds is None:
# 如果 ip_adapter_image 不是列表,则将其转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 确保 ip_adapter_image 的长度与 IP Adapters 的数量相同
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
raise ValueError(
# 抛出错误提示,指出图像数量与 IP Adapters 数量不匹配
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 初始化图像嵌入列表
image_embeds = []
# 遍历每个单独的图像和对应的投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 确定是否需要输出隐藏状态
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码图像,获取正向和负向图像嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将正向图像嵌入重复 num_images_per_prompt 次
single_image_embeds = torch.stack([single_image_embeds] * num_images_per_prompt, dim=0)
# 将负向图像嵌入重复 num_images_per_prompt 次
single_negative_image_embeds = torch.stack(
[single_negative_image_embeds] * num_images_per_prompt, dim=0
)
# 如果启用了分类器自由引导
if do_classifier_free_guidance:
# 将正向和负向图像嵌入拼接在一起
single_image_embeds = torch.cat(
[single_image_embeds, single_negative_image_embeds, single_negative_image_embeds]
)
# 将图像嵌入转移到指定设备
single_image_embeds = single_image_embeds.to(device)
# 将单个图像嵌入添加到图像嵌入列表中
image_embeds.append(single_image_embeds)
else:
# 定义重复维度
repeat_dims = [1]
# 初始化图像嵌入列表
image_embeds = []
# 遍历已有的图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用了分类器自由引导
if do_classifier_free_guidance:
# 将图像嵌入分割为正向和负向嵌入
(
single_image_embeds,
single_negative_image_embeds,
single_negative_image_embeds,
) = single_image_embeds.chunk(3)
# 重复正向图像嵌入
single_image_embeds = single_image_embeds.repeat(
num_images_per_prompt, *(repeat_dims * len(single_image_embeds.shape[1:]))
)
# 重复负向图像嵌入
single_negative_image_embeds = single_negative_image_embeds.repeat(
num_images_per_prompt, *(repeat_dims * len(single_negative_image_embeds.shape[1:]))
)
# 将正向和负向图像嵌入拼接在一起
single_image_embeds = torch.cat(
[single_image_embeds, single_negative_image_embeds, single_negative_image_embeds]
)
else:
# 重复单个图像嵌入
single_image_embeds = single_image_embeds.repeat(
num_images_per_prompt, *(repeat_dims * len(single_image_embeds.shape[1:]))
)
# 将单个图像嵌入添加到图像嵌入列表中
image_embeds.append(single_image_embeds)
# 返回最终的图像嵌入列表
return image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,设置 nsfw 概念为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入为张量,使用图像处理器将其后处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入为 NumPy 数组,转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器处理输入,返回张量并移动到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 调用安全检查器,返回处理后的图像和 nsfw 概念判断结果
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 nsfw 概念判断结果
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因为并非所有调度器都有相同的签名
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器会忽略该参数。
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 值应在 [0, 1] 之间
# 检查调度器的步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
# 如果接受 eta,将其添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,将其添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
def decode_latents(self, latents):
# 警告信息,表示该方法已弃用,将在 1.0.0 中移除,建议使用 VaeImageProcessor.postprocess(...)
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 按比例缩放 latents
latents = 1 / self.vae.config.scaling_factor * latents
# 解码 latents,获取图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像像素值规范化到 [0, 1] 范围
image = (image / 2 + 0.5).clamp(0, 1)
# 始终转换为 float32,确保与 bfloat16 兼容且不会造成显著开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 定义检查输入参数的方法
def check_inputs(
self,
prompt,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
ip_adapter_image=None,
ip_adapter_image_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查 callback_steps 是否为正整数,如果不是则引发 ValueError
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
raise ValueError(
# 报告 callback_steps 不是正整数的错误信息
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查 callback_on_step_end_tensor_inputs 是否为 None,且是否包含在 _callback_tensor_inputs 中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
# 报告 callback_on_step_end_tensor_inputs 中的某些元素不在 _callback_tensor_inputs 的错误信息
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查 prompt 和 prompt_embeds 是否同时存在
if prompt is not None and prompt_embeds is not None:
raise ValueError(
# 报告同时提供 prompt 和 prompt_embeds 的错误信息
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都为 None
elif prompt is None and prompt_embeds is None:
raise ValueError(
# 报告需要提供 prompt 或 prompt_embeds 之一的错误信息
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 的类型是否为 str 或 list
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查 negative_prompt 和 negative_prompt_embeds 是否同时存在
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
# 报告同时提供 negative_prompt 和 negative_prompt_embeds 的错误信息
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 是否同时存在且形状一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
# 报告 prompt_embeds 和 negative_prompt_embeds 形状不一致的错误信息
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 检查 ip_adapter_image 和 ip_adapter_image_embeds 是否同时存在
if ip_adapter_image is not None and ip_adapter_image_embeds is not None:
raise ValueError(
# 报告需要提供 ip_adapter_image 或 ip_adapter_image_embeds 之一的错误信息
"Provide either `ip_adapter_image` or `ip_adapter_image_embeds`. Cannot leave both `ip_adapter_image` and `ip_adapter_image_embeds` defined."
)
# 检查 ip_adapter_image_embeds 是否存在且类型为 list
if ip_adapter_image_embeds is not None:
if not isinstance(ip_adapter_image_embeds, list):
raise ValueError(
# 报告 ip_adapter_image_embeds 不是 list 类型的错误信息
f"`ip_adapter_image_embeds` has to be of type `list` but is {type(ip_adapter_image_embeds)}"
)
# 检查 ip_adapter_image_embeds 中第一个元素的维度是否为 3D 或 4D
elif ip_adapter_image_embeds[0].ndim not in [3, 4]:
raise ValueError(
# 报告 ip_adapter_image_embeds 中的张量维度不正确的错误信息
f"`ip_adapter_image_embeds` has to be a list of 3D or 4D tensors but is {ip_adapter_image_embeds[0].ndim}D"
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义形状,包含批大小、通道数和缩放后的高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor, # 按 VAE 缩放因子调整高度
int(width) // self.vae_scale_factor, # 按 VAE 缩放因子调整宽度
)
# 检查生成器是否为列表且长度与批大小不匹配
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出值错误,提示生成器长度与批大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有传入潜在变量
if latents is None:
# 生成随机张量作为潜在变量,使用指定的生成器、设备和数据类型
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果已传入潜在变量,将其转移到指定的设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
def prepare_image_latents(
# 准备图像潜在变量的方法,接受图像及相关参数
self, image, batch_size, num_images_per_prompt, dtype, device, do_classifier_free_guidance, generator=None
):
# 检查输入的图像类型是否为 torch.Tensor、PIL.Image.Image 或列表
if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)):
# 如果类型不匹配,则抛出错误并显示当前类型
raise ValueError(
f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}"
)
# 将图像转换到指定的设备和数据类型
image = image.to(device=device, dtype=dtype)
# 根据提示数量调整批处理大小
batch_size = batch_size * num_images_per_prompt
# 如果图像有4个通道,则直接使用它
if image.shape[1] == 4:
image_latents = image
else:
# 编码图像并以 "argmax" 模式检索潜在表示
image_latents = retrieve_latents(self.vae.encode(image), sample_mode="argmax")
# 如果批处理大小大于潜在表示的数量且可以整除
if batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] == 0:
# 生成警告消息
deprecation_message = (
f"You have passed {batch_size} text prompts (`prompt`), but only {image_latents.shape[0]} initial"
" images (`image`). Initial images are now duplicating to match the number of text prompts. Note"
" that this behavior is deprecated and will be removed in a version 1.0.0. Please make sure to update"
" your script to pass as many initial images as text prompts to suppress this warning."
)
# 发出弃用警告
deprecate("len(prompt) != len(image)", "1.0.0", deprecation_message, standard_warn=False)
# 计算每个提示需要额外的图像数量
additional_image_per_prompt = batch_size // image_latents.shape[0]
# 扩展潜在表示以匹配批处理大小
image_latents = torch.cat([image_latents] * additional_image_per_prompt, dim=0)
# 如果批处理大小大于潜在表示数量但不能整除
elif batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] != 0:
# 抛出错误,表示无法复制图像以匹配文本提示
raise ValueError(
f"Cannot duplicate `image` of batch size {image_latents.shape[0]} to {batch_size} text prompts."
)
else:
# 将潜在表示展开为一个批次
image_latents = torch.cat([image_latents], dim=0)
# 如果启用分类器自由引导
if do_classifier_free_guidance:
# 创建与潜在表示形状相同的零张量
uncond_image_latents = torch.zeros_like(image_latents)
# 将潜在表示和未条件潜在表示合并
image_latents = torch.cat([image_latents, image_latents, uncond_image_latents], dim=0)
# 返回处理后的潜在表示
return image_latents
# 定义属性:引导比例
@property
def guidance_scale(self):
return self._guidance_scale
# 定义属性:图像引导比例
@property
def image_guidance_scale(self):
return self._image_guidance_scale
# 定义属性:时间步数
@property
def num_timesteps(self):
return self._num_timesteps
# 此处的 `guidance_scale` 定义类似于 Imagen 论文中方程 (2) 的引导权重 `w`
# `guidance_scale = 1` 表示不使用分类器自由引导。
@property
def do_classifier_free_guidance(self):
# 根据引导比例和图像引导比例决定是否使用分类器自由引导
return self.guidance_scale > 1.0 and self.image_guidance_scale >= 1.0