diffusers 源码解析(四十九)
.\diffusers\pipelines\stable_diffusion_diffedit\__init__.py
# 导入类型检查常量
from typing import TYPE_CHECKING
# 从 utils 模块导入所需的功能和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 速度慢的导入标志
OptionalDependencyNotAvailable, # 可选依赖不可用异常
_LazyModule, # 懒加载模块
get_objects_from_module, # 从模块获取对象的函数
is_torch_available, # 检查 PyTorch 是否可用
is_transformers_available, # 检查 Transformers 是否可用
)
# 初始化一个空字典,用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典,用于存储导入结构
_import_structure = {}
# 尝试检查依赖库的可用性
try:
if not (is_transformers_available() and is_torch_available()): # 检查是否同时可用
raise OptionalDependencyNotAvailable() # 抛出异常
except OptionalDependencyNotAvailable: # 捕获可选依赖不可用的异常
from ...utils import dummy_torch_and_transformers_objects # 导入虚拟对象
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果可用,则更新导入结构以包含特定管道
_import_structure["pipeline_stable_diffusion_diffedit"] = ["StableDiffusionDiffEditPipeline"]
# 根据类型检查或慢导入标志执行以下代码
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
if not (is_transformers_available() and is_torch_available()): # 再次检查依赖
raise OptionalDependencyNotAvailable() # 抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
from ...utils.dummy_torch_and_transformers_objects import * # 导入虚拟对象
else:
from .pipeline_stable_diffusion_diffedit import StableDiffusionDiffEditPipeline # 导入实际管道
else:
import sys # 导入系统模块
# 使用懒加载模块创建当前模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure, # 传递导入结构
module_spec=__spec__, # 传递模块规格
)
# 将虚拟对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_gligen\pipeline_stable_diffusion_gligen.py
# 版权所有 2024 GLIGEN 作者和 HuggingFace 团队,保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下地址获得许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议同意,软件
# 根据许可证分发是按“原样”基础,
# 不提供任何形式的保证或条件,无论是明示还是暗示。
# 有关许可证具体权限和
# 限制的详细信息,请参阅许可证。
import inspect # 导入 inspect 模块,用于获取信息和检查对象
import warnings # 导入 warnings 模块,用于发出警告信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示相关的类
import PIL.Image # 导入 PIL.Image 模块,用于图像处理
import torch # 导入 PyTorch 库,用于深度学习
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer # 从 transformers 导入相关模型和处理器
from ...image_processor import VaeImageProcessor # 从相对路径导入 VaeImageProcessor
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器混合类
from ...models import AutoencoderKL, UNet2DConditionModel # 导入相关模型
from ...models.attention import GatedSelfAttentionDense # 导入自定义的注意力模块
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 模型的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入调度器类
from ...utils import ( # 导入实用工具函数和常量
USE_PEFT_BACKEND, # 导入用于 PEFT 后端的常量
deprecate, # 导入用于标记过时功能的装饰器
logging, # 导入日志记录模块
replace_example_docstring, # 导入替换示例文档字符串的函数
scale_lora_layers, # 导入用于缩放 Lora 层的函数
unscale_lora_layers, # 导入用于取消缩放 Lora 层的函数
)
from ...utils.torch_utils import randn_tensor # 从工具模块导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 从管道工具模块导入相关类
from ..stable_diffusion import StableDiffusionPipelineOutput # 从稳定扩散模块导入输出类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 导入安全检查器
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,使用 pylint 禁用无效名称警告
EXAMPLE_DOC_STRING = """ # 定义示例文档字符串的多行字符串
Examples:
```py
>>> import torch # 导入 PyTorch 库
>>> from diffusers import StableDiffusionGLIGENPipeline # 从 diffusers 库导入 StableDiffusionGLIGENPipeline 类
>>> from diffusers.utils import load_image # 从 diffusers.utils 导入 load_image 函数
>>> # 在由边界框定义的区域插入由文本描述的对象
>>> pipe = StableDiffusionGLIGENPipeline.from_pretrained( # 从预训练模型加载 StableDiffusionGLIGENPipeline
... "masterful/gligen-1-4-inpainting-text-box", variant="fp16", torch_dtype=torch.float16 # 指定模型名称和数据类型
... )
>>> pipe = pipe.to("cuda") # 将模型移动到 GPU 上
>>> input_image = load_image( # 从指定 URL 加载输入图像
... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/gligen/livingroom_modern.png" # 图像 URL
... )
>>> prompt = "a birthday cake" # 定义生成图像的文本提示
>>> boxes = [[0.2676, 0.6088, 0.4773, 0.7183]] # 定义边界框的位置
>>> phrases = ["a birthday cake"] # 定义要插入的对象描述
>>> images = pipe( # 调用管道生成图像
... prompt=prompt, # 传入文本提示
... gligen_phrases=phrases, # 传入要插入的描述
... gligen_inpaint_image=input_image, # 传入需要修复的图像
... gligen_boxes=boxes, # 传入边界框
... gligen_scheduled_sampling_beta=1, # 设定计划采样的 beta 值
... output_type="pil", # 输出类型设为 PIL 图像
... num_inference_steps=50, # 设定推理步骤数量
... ).images # 获取生成的图像列表
>>> images[0].save("./gligen-1-4-inpainting-text-box.jpg") # 将生成的第一张图像保存为 JPEG 文件
>>> # 生成由提示描述的图像,并在由边界框定义的区域插入由文本描述的对象
>>> pipe = StableDiffusionGLIGENPipeline.from_pretrained( # 从预训练模型加载另一个 StableDiffusionGLIGENPipeline
... "masterful/gligen-1-4-generation-text-box", variant="fp16", torch_dtype=torch.float16 # 指定新模型名称和数据类型
... )
>>> pipe = pipe.to("cuda") # 将模型移动到 GPU 上
>>> prompt = "a waterfall and a modern high speed train running through the tunnel in a beautiful forest with fall foliage" # 定义新的生成图像的文本提示
>>> boxes = [[0.1387, 0.2051, 0.4277, 0.7090], [0.4980, 0.4355, 0.8516, 0.7266]] # 定义多个边界框的位置
>>> phrases = ["a waterfall", "a modern high speed train running through the tunnel"] # 定义要插入的多个对象描述
>>> images = pipe( # 调用管道生成图像
... prompt=prompt, # 传入新的文本提示
... gligen_phrases=phrases, # 传入新的要插入的描述
... gligen_boxes=boxes, # 传入新的边界框
... gligen_scheduled_sampling_beta=1, # 设定计划采样的 beta 值
... output_type="pil", # 输出类型设为 PIL 图像
... num_inference_steps=50, # 设定推理步骤数量
... ).images # 获取生成的图像列表
>>> images[0].save("./gligen-1-4-generation-text-box.jpg") # 将生成的第一张图像保存为 JPEG 文件
```py
# 定义一个名为 StableDiffusionGLIGENPipeline 的类,继承自 DiffusionPipeline 和 StableDiffusionMixin
class StableDiffusionGLIGENPipeline(DiffusionPipeline, StableDiffusionMixin):
r"""
用于使用 Stable Diffusion 和基于语言的图像生成 (GLIGEN) 的文本到图像生成管道。
该模型从 [`DiffusionPipeline`] 继承。有关库为所有管道实现的通用方法的文档,请检查超类文档(例如下载或保存、在特定设备上运行等)。
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器 (VAE) 模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行标记的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在值的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码图像潜在值。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,估计生成的图像是否可能被认为是冒犯性或有害的。
有关模型潜在危害的更多详细信息,请参考 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成图像中提取特征的 `CLIPImageProcessor`;作为 `safety_checker` 的输入。
"""
# 定义可选组件列表,包括安全检查器和特征提取器
_optional_components = ["safety_checker", "feature_extractor"]
# 定义模型在 CPU 上卸载的顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义从 CPU 卸载中排除的组件,安全检查器不被卸载
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,接受多个参数以配置管道
def __init__(
# VAE 模型,负责图像的编码和解码
vae: AutoencoderKL,
# 文本编码器,用于处理输入文本
text_encoder: CLIPTextModel,
# 用于对文本进行标记的分词器
tokenizer: CLIPTokenizer,
# 用于去噪图像的 UNet 模型
unet: UNet2DConditionModel,
# 调度器,控制去噪过程
scheduler: KarrasDiffusionSchedulers,
# 安全检查器,评估生成图像的潜在危害
safety_checker: StableDiffusionSafetyChecker,
# 特征提取器,处理生成的图像
feature_extractor: CLIPImageProcessor,
# 是否需要安全检查器的标志,默认为真
requires_safety_checker: bool = True,
# 初始化父类
):
super().__init__()
# 检查是否禁用安全检查器并且需要安全检查器时,记录警告信息
if safety_checker is None and requires_safety_checker:
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 检查是否定义特征提取器以便使用安全检查器,如果没有则抛出错误
if safety_checker is not None and feature_extractor is None:
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册模型模块,包括 VAE、文本编码器、分词器、UNet、调度器、安全检查器和特征提取器
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,设置为转换 RGB
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
# 将配置中的安全检查器要求注册到对象
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 中复制的编码提示函数
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
**kwargs,
):
# 定义一个弃用消息,提示用户该方法将来会被移除,并建议使用新方法
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数,记录该方法的弃用信息
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,获取与提示相关的嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 输入提示
device=device, # 设备类型(如 CPU 或 GPU)
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=negative_prompt, # 负面提示
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负面提示嵌入
lora_scale=lora_scale, # Lora 缩放因子
**kwargs, # 其他可选参数
)
# 将嵌入元组中的两个张量连接起来,便于后续兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 复制的 encode_prompt 方法
def encode_prompt(
self,
prompt, # 输入提示
device, # 设备类型(如 CPU 或 GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=None, # 负面提示(可选)
prompt_embeds: Optional[torch.Tensor] = None, # 提示嵌入(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None, # 负面提示嵌入(可选)
lora_scale: Optional[float] = None, # Lora 缩放因子(可选)
clip_skip: Optional[int] = None, # 跳过剪辑层(可选)
# 从 StableDiffusionPipeline 复制的 run_safety_checker 方法
def run_safety_checker(self, image, device, dtype):
# 检查是否存在安全检查器
if self.safety_checker is None:
has_nsfw_concept = None # 如果没有,则设置无敏感内容标志为 None
else:
# 如果输入是张量,则处理图像为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 否则将输入转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征并将其转换为指定设备的张量
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,检查图像是否含有 NSFW 内容
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype) # 提供图像和特征输入
)
# 返回处理后的图像及其 NSFW 内容标志
return image, has_nsfw_concept
# 定义一个方法,用于准备调度器步骤的额外参数
def prepare_extra_step_kwargs(self, generator, eta):
# 准备调度器步骤的额外参数,因为不同的调度器具有不同的参数签名
# eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应在 [0, 1] 范围内
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化一个空字典,用于存放额外的步骤参数
extra_step_kwargs = {}
# 如果调度器接受 eta 参数,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator 参数,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数字典
return extra_step_kwargs
# 定义一个方法,用于检查输入参数的有效性
def check_inputs(
self,
prompt, # 文本提示,用于生成内容
height, # 生成内容的高度
width, # 生成内容的宽度
callback_steps, # 回调步骤的频率
gligen_phrases, # 用于生成的短语
gligen_boxes, # 用于生成的框
negative_prompt=None, # 可选的负面提示,用于生成的限制
prompt_embeds=None, # 可选的提示嵌入,提前计算的文本表示
negative_prompt_embeds=None, # 可选的负面提示嵌入,提前计算的负面文本表示
):
# 检查高度和宽度是否都是 8 的倍数,如果不是,则抛出值错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查 callback_steps 是否为正整数,若条件不满足则抛出值错误
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查 prompt 和 prompt_embeds 是否同时被定义,如果是则抛出值错误
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都未定义,如果是则抛出值错误
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 是否为字符串或列表类型,如果不是则抛出值错误
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查 negative_prompt 和 negative_prompt_embeds 是否同时被定义,如果是则抛出值错误
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查如果提供了 prompt_embeds 和 negative_prompt_embeds,则它们的形状是否相同,如果不同则抛出值错误
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 检查 gligen_phrases 和 gligen_boxes 的长度是否相同,如果不同则抛出值错误
if len(gligen_phrases) != len(gligen_boxes):
raise ValueError(
"length of `gligen_phrases` and `gligen_boxes` has to be same, but"
f" got: `gligen_phrases` {len(gligen_phrases)} != `gligen_boxes` {len(gligen_boxes)}"
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
# 准备潜变量,创建指定形状的随机噪声或处理已给定的潜变量
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜变量的形状,包括批大小、通道数和经过 VAE 缩放因子后的高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器的类型,如果是列表且长度与批大小不匹配则抛出错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜变量,则通过随机生成创建新的潜变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜变量,将其转移到指定设备上
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜变量
return latents
# 启用或禁用自注意力模块的融合
def enable_fuser(self, enabled=True):
# 遍历 UNet 模块
for module in self.unet.modules():
# 如果模块是 GatedSelfAttentionDense 类型,则设置其启用状态
if type(module) is GatedSelfAttentionDense:
module.enabled = enabled
# 从给定的框列表生成修复掩码
def draw_inpaint_mask_from_boxes(self, boxes, size):
# 创建一个全为 1 的掩码,大小与输入图像一致
inpaint_mask = torch.ones(size[0], size[1])
# 遍历每个框,更新掩码中的相应区域为 0
for box in boxes:
x0, x1 = box[0] * size[0], box[2] * size[0] # 计算框的左和右边界
y0, y1 = box[1] * size[1], box[3] * size[1] # 计算框的上和下边界
inpaint_mask[int(y0) : int(y1), int(x0) : int(x1)] = 0 # 将框内区域设置为 0
# 返回修复掩码
return inpaint_mask
# 裁剪图像到指定的新宽度和高度
def crop(self, im, new_width, new_height):
# 获取图像的当前宽度和高度
width, height = im.size
# 计算裁剪区域的左、上、右、下边界
left = (width - new_width) / 2
top = (height - new_height) / 2
right = (width + new_width) / 2
bottom = (height + new_height) / 2
# 返回裁剪后的图像
return im.crop((left, top, right, bottom))
# 根据目标尺寸对图像进行中心裁剪
def target_size_center_crop(self, im, new_hw):
# 获取图像的当前宽度和高度
width, height = im.size
# 如果宽度和高度不相等,进行中心裁剪
if width != height:
im = self.crop(im, min(height, width), min(height, width))
# 将图像调整为新的宽高,并使用高质量的重采样方法
return im.resize((new_hw, new_hw), PIL.Image.LANCZOS)
# 装饰器,禁用梯度计算以节省内存
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的 __call__ 方法,允许实例像函数一样被调用
def __call__(
# 提示信息,可以是单个字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 图像高度,可选参数
height: Optional[int] = None,
# 图像宽度,可选参数
width: Optional[int] = None,
# 推理步骤的数量,默认为50
num_inference_steps: int = 50,
# 引导强度,默认为7.5
guidance_scale: float = 7.5,
# Gligen 调度采样的 beta 值,默认为0.3
gligen_scheduled_sampling_beta: float = 0.3,
# Gligen 相关短语,可选字符串列表
gligen_phrases: List[str] = None,
# Gligen 边界框,列表中包含浮点数列表,可选
gligen_boxes: List[List[float]] = None,
# Gligen 使用的图像,PIL.Image.Image 对象,可选
gligen_inpaint_image: Optional[PIL.Image.Image] = None,
# 负提示信息,可选字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# 采样的 ETA 值,默认为0.0
eta: float = 0.0,
# 随机数生成器,可选,可以是单个或多个生成器
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在张量,可选
latents: Optional[torch.Tensor] = None,
# 提示嵌入张量,可选
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示嵌入张量,可选
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为“pil”
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果,默认为 True
return_dict: bool = True,
# 可选回调函数,用于在每个步骤执行
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 每隔多少步骤调用回调,默认为1
callback_steps: int = 1,
# 跨注意力的额外参数,可选
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 可选的跳过剪辑参数
clip_skip: Optional[int] = None,
.\diffusers\pipelines\stable_diffusion_gligen\pipeline_stable_diffusion_gligen_text_image.py
# 版权所有 2024 GLIGEN 作者及 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非符合许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有规定,
# 否则根据许可证分发的软件是在“按现状”基础上分发的,
# 不提供任何形式的明示或暗示的担保或条件。
# 有关许可证所涵盖的特定权限和限制,请参阅许可证。
import inspect # 导入 inspect 模块,用于获取对象的信息
import warnings # 导入 warnings 模块,用于发出警告
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示所需的类型
import PIL.Image # 导入 PIL.Image 模块,用于图像处理
import torch # 导入 PyTorch 库,用于张量运算
from transformers import ( # 从 transformers 库中导入多个类和函数
CLIPImageProcessor, # 图像处理器,用于 CLIP 模型
CLIPProcessor, # 通用 CLIP 处理器
CLIPTextModel, # CLIP 文本模型
CLIPTokenizer, # CLIP 令牌化器
CLIPVisionModelWithProjection, # CLIP 视觉模型,包含投影功能
)
from ...image_processor import VaeImageProcessor # 导入 VAE 图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入用于加载的混合类
from ...models import AutoencoderKL, UNet2DConditionModel # 导入模型类
from ...models.attention import GatedSelfAttentionDense # 导入密集门控自注意力类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LORA 缩放的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
from ...utils import USE_PEFT_BACKEND, logging, replace_example_docstring, scale_lora_layers, unscale_lora_layers # 导入工具函数和常量
from ...utils.torch_utils import randn_tensor # 导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和稳定扩散混合类
from ..stable_diffusion import StableDiffusionPipelineOutput # 导入稳定扩散管道输出类
from ..stable_diffusion.clip_image_project_model import CLIPImageProjection # 导入 CLIP 图像投影模型
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 导入稳定扩散安全检查器
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,使用 pylint 禁用无效名称警告
EXAMPLE_DOC_STRING = """ # 示例文档字符串,可能用于说明使用方式或示例
"""
class StableDiffusionGLIGENTextImagePipeline(DiffusionPipeline, StableDiffusionMixin): # 定义一个类,继承自 DiffusionPipeline 和 StableDiffusionMixin
r""" # 类的文档字符串,说明该类的功能
使用 Stable Diffusion 进行文本到图像生成的管道,结合基于语言的图像生成(GLIGEN)。
此模型继承自 [`DiffusionPipeline`]。有关库为所有管道实现的通用方法(例如下载或保存、在特定设备上运行等),请查看超类文档。
# 定义参数列表及其说明
Args:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器 (VAE) 模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行分词的 `CLIPTokenizer`。
processor ([`~transformers.CLIPProcessor`]):
用于处理参考图像的 `CLIPProcessor`。
image_encoder ([`transformers.CLIPVisionModelWithProjection`]):
冻结的图像编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
image_project ([`CLIPImageProjection`]):
用于将图像嵌入投影到短语嵌入空间的 `CLIPImageProjection`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用以去噪编码图像潜在表示的调度器。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的一个。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,用于评估生成的图像是否可能被视为冒犯或有害。
请参阅 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5) 获取有关模型潜在危害的更多细节。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成图像中提取特征的 `CLIPImageProcessor`;用于作为 `safety_checker` 的输入。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor"]
# 定义不包含在 CPU 卸载中的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法
def __init__(
# 接收变分自编码器
self,
vae: AutoencoderKL,
# 接收文本编码器
text_encoder: CLIPTextModel,
# 接收分词器
tokenizer: CLIPTokenizer,
# 接收图像处理器
processor: CLIPProcessor,
# 接收图像编码器
image_encoder: CLIPVisionModelWithProjection,
# 接收图像投影器
image_project: CLIPImageProjection,
# 接收 U-Net 模型
unet: UNet2DConditionModel,
# 接收调度器
scheduler: KarrasDiffusionSchedulers,
# 接收安全检查器
safety_checker: StableDiffusionSafetyChecker,
# 接收特征提取器
feature_extractor: CLIPImageProcessor,
# 是否需要安全检查器的布尔值
requires_safety_checker: bool = True,
# 初始化父类
):
super().__init__()
# 如果未提供安全检查器且需要安全检查器,记录警告信息
if safety_checker is None and requires_safety_checker:
logger.warning(
# 生成警告内容,提醒用户安全检查器已禁用,并提供相关指导信息
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 如果提供了安全检查器但未提供特征提取器,抛出错误
if safety_checker is not None and feature_extractor is None:
raise ValueError(
# 提示用户在加载该类时需要定义特征提取器
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块,初始化相关组件
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
image_encoder=image_encoder,
processor=processor,
image_project=image_project,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器,设置为 RGB 转换
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
# 将所需的安全检查器信息注册到配置中
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 复制的 encode_prompt 方法
def encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
clip_skip: Optional[int] = None,
# 从 StableDiffusionPipeline 复制的 run_safety_checker 方法
# 执行安全检查器,检查输入图像是否符合安全标准
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则初始化为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入是张量格式,则进行后处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入是 NumPy 数组,则转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征并将输入转移到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 使用安全检查器处理图像,并获取是否存在不当内容的概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和不当内容概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
# 准备调度步骤的额外参数,因为并非所有调度器的参数签名相同
def prepare_extra_step_kwargs(self, generator, eta):
# eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# eta 的取值范围应在 [0, 1] 之间
# 检查调度器是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
if accepts_eta:
# 如果接受 eta,则将其添加到额外参数中
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受生成器参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
# 如果接受生成器,则将其添加到额外参数中
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.StableDiffusionKDiffusionPipeline.check_inputs 复制
# 检查输入参数的有效性和一致性
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否都能被 8 整除
if height % 8 != 0 or width % 8 != 0:
# 如果不能整除,抛出一个值错误,说明高度和宽度不符合要求
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步骤是否被设置,并且检查它是否为正整数
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 如果条件不满足,抛出值错误,说明回调步骤无效
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调结束时的张量输入是否有效
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有无效的输入,抛出值错误,说明输入不在允许的范围内
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了提示和提示嵌入
if prompt is not None and prompt_embeds is not None:
# 如果同时提供,抛出值错误,说明只能提供其中一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
elif prompt is None and prompt_embeds is None:
# 如果都没有提供,抛出值错误,说明至少需要提供一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 检查提示的类型是否有效
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了负提示和负提示嵌入
if negative_prompt is not None and negative_prompt_embeds is not None:
# 如果同时提供,抛出值错误,说明只能提供其中一个
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入的形状是否一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 如果形状不一致,抛出值错误,说明形状必须相同
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
# 准备潜在变量的形状和初始值
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,包括批量大小和通道数
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在变量,则生成随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,则将其移动到指定设备
latents = latents.to(device)
# 将初始噪声按调度器要求的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在变量
return latents
# 启用或禁用门控自注意力模块
def enable_fuser(self, enabled=True):
# 遍历 UNet 模块
for module in self.unet.modules():
# 检查模块类型是否为 GatedSelfAttentionDense
if type(module) is GatedSelfAttentionDense:
# 设置模块的启用状态
module.enabled = enabled
# 根据给定的框创建修复掩码
def draw_inpaint_mask_from_boxes(self, boxes, size):
"""
Create an inpainting mask based on given boxes. This function generates an inpainting mask using the provided
boxes to mark regions that need to be inpainted.
"""
# 创建一个全白的修复掩码
inpaint_mask = torch.ones(size[0], size[1])
# 遍历每个框
for box in boxes:
# 根据框计算对应的像素坐标
x0, x1 = box[0] * size[0], box[2] * size[0]
y0, y1 = box[1] * size[1], box[3] * size[1]
# 在掩码上标记需要修复的区域
inpaint_mask[int(y0) : int(y1), int(x0) : int(x1)] = 0
# 返回修复掩码
return inpaint_mask
# 裁剪输入图像到指定尺寸
def crop(self, im, new_width, new_height):
"""
Crop the input image to the specified dimensions.
"""
# 获取原始图像的宽度和高度
width, height = im.size
# 计算裁剪区域的左、上、右、下边界
left = (width - new_width) / 2
top = (height - new_height) / 2
right = (width + new_width) / 2
bottom = (height + new_height) / 2
# 返回裁剪后的图像
return im.crop((left, top, right, bottom))
# 裁剪并调整图像到目标尺寸,保持中心
def target_size_center_crop(self, im, new_hw):
"""
Crop and resize the image to the target size while keeping the center.
"""
# 获取图像的宽度和高度
width, height = im.size
# 如果宽高不相等,进行中心裁剪
if width != height:
im = self.crop(im, min(height, width), min(height, width))
# 返回调整后的图像
return im.resize((new_hw, new_hw), PIL.Image.LANCZOS)
# 根据输入的掩码值(0或1)为每个短语和图像掩蔽特征
def complete_mask(self, has_mask, max_objs, device):
# 创建一个全1的掩码,形状为(1, max_objs),数据类型与文本编码器一致,转移到指定设备
mask = torch.ones(1, max_objs).type(self.text_encoder.dtype).to(device)
# 如果没有掩码,则返回全1的掩码
if has_mask is None:
return mask
# 如果掩码是一个整数,则返回乘以该整数的掩码
if isinstance(has_mask, int):
return mask * has_mask
else:
# 遍历掩码列表,将值填入掩码中
for idx, value in enumerate(has_mask):
mask[0, idx] = value
# 返回填充后的掩码
return mask
# 使用 CLIP 预训练模型获取图像和短语的嵌入
def get_clip_feature(self, input, normalize_constant, device, is_image=False):
# 如果处理的是图像
if is_image:
# 如果输入为 None,返回 None
if input is None:
return None
# 处理图像输入,转换为张量并转移到设备
inputs = self.processor(images=[input], return_tensors="pt").to(device)
# 将像素值转换为图像编码器的数据类型
inputs["pixel_values"] = inputs["pixel_values"].to(self.image_encoder.dtype)
# 使用图像编码器获取嵌入输出
outputs = self.image_encoder(**inputs)
# 提取图像嵌入
feature = outputs.image_embeds
# 通过投影将特征转化并压缩维度
feature = self.image_project(feature).squeeze(0)
# 归一化特征并乘以归一化常数
feature = (feature / feature.norm()) * normalize_constant
# 添加维度以符合输出要求
feature = feature.unsqueeze(0)
else:
# 如果处理的是文本
if input is None:
return None
# 将文本输入转换为张量并转移到设备
inputs = self.tokenizer(input, return_tensors="pt", padding=True).to(device)
# 使用文本编码器获取嵌入输出
outputs = self.text_encoder(**inputs)
# 提取池化输出作为特征
feature = outputs.pooler_output
# 返回提取的特征
return feature
# 定义获取带有基础的交叉注意力参数的方法
def get_cross_attention_kwargs_with_grounded(
self,
hidden_size,
gligen_phrases,
gligen_images,
gligen_boxes,
input_phrases_mask,
input_images_mask,
repeat_batch,
normalize_constant,
max_objs,
device,
):
"""
准备交叉注意力的关键字参数,包含有关基础输入的信息(框,掩码,图像嵌入,短语嵌入)。
"""
# 将输入的短语和图像分别赋值给变量
phrases, images = gligen_phrases, gligen_images
# 如果图像为 None,则为每个短语创建一个 None 列表
images = [None] * len(phrases) if images is None else images
# 如果短语为 None,则为每个图像创建一个 None 列表
phrases = [None] * len(images) if phrases is None else phrases
# 创建一个张量用于存储每个对象的框(四个坐标)
boxes = torch.zeros(max_objs, 4, device=device, dtype=self.text_encoder.dtype)
# 创建一个张量用于存储每个对象的掩码
masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个张量用于存储每个短语的掩码
phrases_masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个张量用于存储每个图像的掩码
image_masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个张量用于存储每个短语的嵌入
phrases_embeddings = torch.zeros(max_objs, hidden_size, device=device, dtype=self.text_encoder.dtype)
# 创建一个张量用于存储每个图像的嵌入
image_embeddings = torch.zeros(max_objs, hidden_size, device=device, dtype=self.text_encoder.dtype)
# 初始化存储文本特征和图像特征的列表
text_features = []
image_features = []
# 遍历短语和图像,获取特征
for phrase, image in zip(phrases, images):
# 获取短语的特征并添加到列表
text_features.append(self.get_clip_feature(phrase, normalize_constant, device, is_image=False))
# 获取图像的特征并添加到列表
image_features.append(self.get_clip_feature(image, normalize_constant, device, is_image=True))
# 遍历框、文本特征和图像特征,填充相应的张量
for idx, (box, text_feature, image_feature) in enumerate(zip(gligen_boxes, text_features, image_features)):
# 将框转换为张量并赋值
boxes[idx] = torch.tensor(box)
# 设置掩码为 1
masks[idx] = 1
# 如果文本特征不为空,则赋值并设置掩码
if text_feature is not None:
phrases_embeddings[idx] = text_feature
phrases_masks[idx] = 1
# 如果图像特征不为空,则赋值并设置掩码
if image_feature is not None:
image_embeddings[idx] = image_feature
image_masks[idx] = 1
# 完成输入短语的掩码
input_phrases_mask = self.complete_mask(input_phrases_mask, max_objs, device)
# 通过重复输入短语的掩码来扩展短语掩码
phrases_masks = phrases_masks.unsqueeze(0).repeat(repeat_batch, 1) * input_phrases_mask
# 完成输入图像的掩码
input_images_mask = self.complete_mask(input_images_mask, max_objs, device)
# 通过重复输入图像的掩码来扩展图像掩码
image_masks = image_masks.unsqueeze(0).repeat(repeat_batch, 1) * input_images_mask
# 通过重复来扩展框的维度
boxes = boxes.unsqueeze(0).repeat(repeat_batch, 1, 1)
# 通过重复来扩展掩码的维度
masks = masks.unsqueeze(0).repeat(repeat_batch, 1)
# 通过重复来扩展短语嵌入的维度
phrases_embeddings = phrases_embeddings.unsqueeze(0).repeat(repeat_batch, 1, 1)
# 通过重复来扩展图像嵌入的维度
image_embeddings = image_embeddings.unsqueeze(0).repeat(repeat_batch, 1, 1)
# 将所有处理后的数据组织成字典
out = {
"boxes": boxes,
"masks": masks,
"phrases_masks": phrases_masks,
"image_masks": image_masks,
"phrases_embeddings": phrases_embeddings,
"image_embeddings": image_embeddings,
}
# 返回包含所有信息的字典
return out
# 定义一个方法,用于获取无基于输入信息的交叉注意力参数
def get_cross_attention_kwargs_without_grounded(self, hidden_size, repeat_batch, max_objs, device):
"""
准备无关于基础输入(框、掩码、图像嵌入、短语嵌入)信息的交叉注意力参数(均为零张量)。
"""
# 创建一个形状为 (max_objs, 4) 的全零张量,用于表示物体框
boxes = torch.zeros(max_objs, 4, device=device, dtype=self.text_encoder.dtype)
# 创建一个形状为 (max_objs,) 的全零张量,用于表示物体掩码
masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个形状为 (max_objs,) 的全零张量,用于表示短语掩码
phrases_masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个形状为 (max_objs,) 的全零张量,用于表示图像掩码
image_masks = torch.zeros(max_objs, device=device, dtype=self.text_encoder.dtype)
# 创建一个形状为 (max_objs, hidden_size) 的全零张量,用于表示短语嵌入
phrases_embeddings = torch.zeros(max_objs, hidden_size, device=device, dtype=self.text_encoder.dtype)
# 创建一个形状为 (max_objs, hidden_size) 的全零张量,用于表示图像嵌入
image_embeddings = torch.zeros(max_objs, hidden_size, device=device, dtype=self.text_encoder.dtype)
# 创建一个字典,包含多个张量,均为扩展并重复的零张量
out = {
# 扩展 boxes 张量并重复,生成形状为 (repeat_batch, max_objs, 4)
"boxes": boxes.unsqueeze(0).repeat(repeat_batch, 1, 1),
# 扩展 masks 张量并重复,生成形状为 (repeat_batch, max_objs)
"masks": masks.unsqueeze(0).repeat(repeat_batch, 1),
# 扩展 phrases_masks 张量并重复,生成形状为 (repeat_batch, max_objs)
"phrases_masks": phrases_masks.unsqueeze(0).repeat(repeat_batch, 1),
# 扩展 image_masks 张量并重复,生成形状为 (repeat_batch, max_objs)
"image_masks": image_masks.unsqueeze(0).repeat(repeat_batch, 1),
# 扩展 phrases_embeddings 张量并重复,生成形状为 (repeat_batch, max_objs, hidden_size)
"phrases_embeddings": phrases_embeddings.unsqueeze(0).repeat(repeat_batch, 1, 1),
# 扩展 image_embeddings 张量并重复,生成形状为 (repeat_batch, max_objs, hidden_size)
"image_embeddings": image_embeddings.unsqueeze(0).repeat(repeat_batch, 1, 1),
}
# 返回包含交叉注意力参数的字典
return out
# 装饰器,禁用梯度计算以减少内存使用
@torch.no_grad()
# 装饰器,用于替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义调用方法,支持多种参数
def __call__(
# 可选的字符串或字符串列表,作为提示输入
prompt: Union[str, List[str]] = None,
# 可选的整数,指定生成图像的高度
height: Optional[int] = None,
# 可选的整数,指定生成图像的宽度
width: Optional[int] = None,
# 指定推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 指定引导比例,默认为 7.5
guidance_scale: float = 7.5,
# 指定 Gligen 计划采样的 beta 值,默认为 0.3
gligen_scheduled_sampling_beta: float = 0.3,
# 可选的短语列表,用于 Gligen
gligen_phrases: List[str] = None,
# 可选的图像列表,用于 Gligen
gligen_images: List[PIL.Image.Image] = None,
# 可选的短语掩码,单个整数或整数列表
input_phrases_mask: Union[int, List[int]] = None,
# 可选的图像掩码,单个整数或整数列表
input_images_mask: Union[int, List[int]] = None,
# 可选的边框列表,用于 Gligen
gligen_boxes: List[List[float]] = None,
# 可选的图像,用于填充,Gligen
gligen_inpaint_image: Optional[PIL.Image.Image] = None,
# 可选的负提示,单个字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 可选的整数,指定每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 指定噪声比例,默认为 0.0
eta: float = 0.0,
# 可选的生成器,用于随机数生成
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的潜在张量
latents: Optional[torch.Tensor] = None,
# 可选的提示嵌入张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 可选的布尔值,指定是否返回字典格式
return_dict: bool = True,
# 可选的回调函数,用于处理中间结果
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 指定回调函数调用的步长,默认为 1
callback_steps: int = 1,
# 可选的字典,包含交叉注意力的参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# Gligen 正常化常量,默认为 28.7
gligen_normalize_constant: float = 28.7,
# 可选的整数,指定跳过的剪辑步骤
clip_skip: int = None,
.\diffusers\pipelines\stable_diffusion_gligen\__init__.py
# 导入类型检查常量
from typing import TYPE_CHECKING
# 从上级模块导入必要的工具和依赖
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢导入标志
OptionalDependencyNotAvailable, # 可选依赖未找到异常
_LazyModule, # 懒加载模块的工具
get_objects_from_module, # 从模块中获取对象的工具
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 初始化一个空字典用于存放虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存放导入结构
_import_structure = {}
# 尝试检查依赖是否可用
try:
# 如果 Transformers 和 PyTorch 不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象以防止错误
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典,获取虚拟对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果依赖可用,添加稳定扩散管道到导入结构
_import_structure["pipeline_stable_diffusion_gligen"] = ["StableDiffusionGLIGENPipeline"]
# 添加文本到图像的稳定扩散管道到导入结构
_import_structure["pipeline_stable_diffusion_gligen_text_image"] = ["StableDiffusionGLIGENTextImagePipeline"]
# 检查类型标志或慢导入标志
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 如果 Transformers 和 PyTorch 不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象
from ...utils.dummy_torch_and_transformers_objects import *
else:
# 从稳定扩散管道中导入 StableDiffusionGLIGENPipeline
from .pipeline_stable_diffusion_gligen import StableDiffusionGLIGENPipeline
# 从稳定扩散管道中导入 StableDiffusionGLIGENTextImagePipeline
from .pipeline_stable_diffusion_gligen_text_image import StableDiffusionGLIGENTextImagePipeline
# 如果不是类型检查或慢导入
else:
# 导入系统模块
import sys
# 将当前模块替换为懒加载模块
sys.modules[__name__] = _LazyModule(
__name__, # 模块名称
globals()["__file__"], # 当前文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块规格
)
# 为当前模块设置虚拟对象
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_k_diffusion\pipeline_stable_diffusion_k_diffusion.py
# 版权声明,表示该文件由 HuggingFace 团队所有,所有权利保留
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行授权;
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件
# 按“原样”分发,没有任何形式的明示或暗示的担保或条件。
# 有关许可证所涵盖的特定权限和
# 限制,请参阅许可证。
import importlib # 导入模块以动态导入其他模块
import inspect # 导入用于检查对象的模块
from typing import Callable, List, Optional, Union # 导入类型注解
import torch # 导入 PyTorch 库
from k_diffusion.external import CompVisDenoiser, CompVisVDenoiser # 从 k_diffusion 导入去噪模型
from k_diffusion.sampling import BrownianTreeNoiseSampler, get_sigmas_karras # 导入采样相关的函数和类
from ...image_processor import VaeImageProcessor # 从相对路径导入 VAE 图像处理器
from ...loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器混合类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LoRA 缩放的函数
from ...schedulers import LMSDiscreteScheduler # 导入 LMS 离散调度器
from ...utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers # 导入工具函数
from ...utils.torch_utils import randn_tensor # 从工具模块导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和混合类
from ..stable_diffusion import StableDiffusionPipelineOutput # 导入稳定扩散管道输出类
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器实例
class ModelWrapper: # 定义模型包装类
def __init__(self, model, alphas_cumprod): # 初始化模型和累积 alpha 参数
self.model = model # 将传入的模型赋值给实例变量
self.alphas_cumprod = alphas_cumprod # 将传入的累积 alpha 参数赋值给实例变量
def apply_model(self, *args, **kwargs): # 定义应用模型的方法,接受可变参数
if len(args) == 3: # 如果参数数量为 3
encoder_hidden_states = args[-1] # 将最后一个参数作为编码器隐藏状态
args = args[:2] # 保留前两个参数
if kwargs.get("cond", None) is not None: # 如果关键字参数中有 "cond"
encoder_hidden_states = kwargs.pop("cond") # 从关键字参数中移除并赋值给编码器隐藏状态
return self.model(*args, encoder_hidden_states=encoder_hidden_states, **kwargs).sample # 调用模型并返回样本
class StableDiffusionKDiffusionPipeline( # 定义稳定扩散 K 扩散管道类
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin # 继承多个基类
):
r""" # 文档字符串,描述此类的功能
用于文本到图像生成的管道,使用稳定扩散模型。
该模型继承自 [`DiffusionPipeline`]. 查看超类文档以获取库为所有管道实现的通用方法(例如下载或保存、在特定设备上运行等)。
此管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反演嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
<Tip warning={true}>
这是一个实验性管道,未来可能会发生变化。
</Tip>
# 文档字符串,描述参数的含义
Args:
vae ([`AutoencoderKL`]): # 变分自编码器模型,用于对图像进行编码和解码,转换为潜在表示
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModel`]): # 冻结的文本编码器,稳定扩散使用 CLIP 的文本部分
Frozen text-encoder. Stable Diffusion uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel),具体为
[clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
tokenizer (`CLIPTokenizer`): # CLIP 的分词器,负责将文本转换为模型可接受的输入格式
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
unet ([`UNet2DConditionModel`]): # 条件 U-Net 结构,用于对编码的图像潜在表示去噪
Conditional U-Net architecture to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]): # 用于与 U-Net 结合的调度器,帮助去噪图像潜在表示
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
safety_checker ([`StableDiffusionSafetyChecker`]): # 分类模块,评估生成的图像是否可能具有攻击性或有害
Classification module that estimates whether generated images could be considered offensive or harmful.
Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.
feature_extractor ([`CLIPImageProcessor`]): # 模型从生成的图像中提取特征,以作为 `safety_checker` 的输入
Model that extracts features from generated images to be used as inputs for the `safety_checker`.
"""
# 定义模型在 CPU 上的卸载顺序,从文本编码器到 U-Net 再到 VAE
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件,包含安全检查器和特征提取器
_optional_components = ["safety_checker", "feature_extractor"]
# 定义不包括在 CPU 卸载中的组件,特定为安全检查器
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,接受多个组件作为参数
def __init__(
self,
vae, # 传入变分自编码器实例
text_encoder, # 传入文本编码器实例
tokenizer, # 传入分词器实例
unet, # 传入条件 U-Net 实例
scheduler, # 传入调度器实例
safety_checker, # 传入安全检查器实例
feature_extractor, # 传入特征提取器实例
requires_safety_checker: bool = True, # 指示是否需要安全检查器的布尔参数,默认为 True
):
# 调用父类的初始化方法
super().__init__()
# 记录当前类是实验性管道,可能会在未来发生变化的信息
logger.info(
f"{self.__class__} is an experimntal pipeline and is likely to change in the future. We recommend to use"
" this pipeline for fast experimentation / iteration if needed, but advice to rely on existing pipelines"
" as defined in https://huggingface.co/docs/diffusers/api/schedulers#implemented-schedulers for"
" production settings."
)
# 从 LMS 配置中获取正确的 sigmas
scheduler = LMSDiscreteScheduler.from_config(scheduler.config)
# 注册模型组件,包括变分自编码器、文本编码器、分词器、UNet、调度器、安全检查器和特征提取器
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
)
# 将安全检查器的要求注册到配置中
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器,使用计算出的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 封装模型,将 UNet 和调度器的累积 alpha 传入模型包装器
model = ModelWrapper(unet, scheduler.alphas_cumprod)
# 根据预测类型选择合适的去噪模型
if scheduler.config.prediction_type == "v_prediction":
self.k_diffusion_model = CompVisVDenoiser(model)
else:
self.k_diffusion_model = CompVisDenoiser(model)
# 设置调度器类型的方法
def set_scheduler(self, scheduler_type: str):
# 动态导入 k_diffusion 库
library = importlib.import_module("k_diffusion")
# 获取采样模块
sampling = getattr(library, "sampling")
try:
# 尝试获取指定的采样器
self.sampler = getattr(sampling, scheduler_type)
except Exception:
# 如果发生异常,收集有效的采样器名称
valid_samplers = []
for s in dir(sampling):
if "sample_" in s:
valid_samplers.append(s)
# 抛出无效调度器类型的异常,并提供有效的采样器列表
raise ValueError(f"Invalid scheduler type {scheduler_type}. Please choose one of {valid_samplers}.")
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的方法
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
# 可选的提示嵌入,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示嵌入,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LORA 缩放因子,默认为 None
lora_scale: Optional[float] = None,
# 额外的关键字参数
**kwargs,
):
# 定义一个警告信息,提示 `_encode_prompt()` 已被弃用,并将在未来版本中移除,建议使用 `encode_prompt()` 替代
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数,记录弃用信息,设置标准警告为 False
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,生成提示嵌入的元组
prompt_embeds_tuple = self.encode_prompt(
# 提供必要的参数给 encode_prompt 方法
prompt=prompt,
device=device,
num_images_per_prompt=num_images_per_prompt,
do_classifier_free_guidance=do_classifier_free_guidance,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
lora_scale=lora_scale,
**kwargs,
)
# 连接提示嵌入元组中的两个部分,方便向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回合并后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的部分
def encode_prompt(
self,
# 定义参数,分别用于处理提示信息和设备设置等
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
clip_skip: Optional[int] = None,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的部分
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则设置 NSFW 概念为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入图像是张量格式,进行后处理并转换为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入图像不是张量,则将其转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征,并将其转换为适合设备的张量格式
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 使用安全检查器处理图像,并返回处理后的图像及 NSFW 概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 NSFW 概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的部分
# 解码潜在变量的方法
def decode_latents(self, latents):
# 设置弃用警告信息,提示用户此方法将在1.0.0版本中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用函数,记录弃用信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,返回的第一个元素是解码后的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像数据缩放到[0, 1]范围
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像从GPU移动到CPU,并调整维度顺序,转换为float32类型以保持兼容性
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像
return image
# 检查输入参数的方法
def check_inputs(
self,
# 提示文本
prompt,
# 图像高度
height,
# 图像宽度
width,
# 回调步骤
callback_steps,
# 可选的负提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds=None,
# 可选的负提示嵌入
negative_prompt_embeds=None,
# 可选的在步骤结束时回调的张量输入
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否能被 8 整除,若不能则抛出错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步骤是否为正整数,若不是则抛出错误
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调结束时的张量输入是否有效,若有无效项则抛出错误
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查同时提供了 prompt 和 prompt_embeds,若同时提供则抛出错误
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都未提供,若都未提供则抛出错误
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 的类型是否为 str 或 list,若不是则抛出错误
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查同时提供了 negative_prompt 和 negative_prompt_embeds,若同时提供则抛出错误
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 的形状是否一致,若不一致则抛出错误
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 准备潜在向量的函数,输入参数包括批大小、通道数、高度、宽度等
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 根据输入参数计算潜在向量的形状
shape = (
batch_size, # 批处理的大小
num_channels_latents, # 潜在向量的通道数
int(height) // self.vae_scale_factor, # 高度经过缩放因子调整
int(width) // self.vae_scale_factor, # 宽度经过缩放因子调整
)
# 如果没有提供潜在向量,则随机生成一个
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供的潜在向量形状不匹配,则抛出错误
if latents.shape != shape:
raise ValueError(f"Unexpected latents shape, got {latents.shape}, expected {shape}")
# 将潜在向量移动到指定设备
latents = latents.to(device)
# 根据调度器所需的标准差缩放初始噪声
return latents
# 该方法用于模型调用,且不计算梯度
@torch.no_grad()
def __call__(
# 输入提示,可能是单个字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 图像的高度,默认为 None
height: Optional[int] = None,
# 图像的宽度,默认为 None
width: Optional[int] = None,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 引导尺度,默认为 7.5
guidance_scale: float = 7.5,
# 负提示,可能是单个字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 额外的噪声,默认为 0.0
eta: float = 0.0,
# 随机数生成器,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 预先生成的潜在向量,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示的嵌入,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示的嵌入,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式,默认为 True
return_dict: bool = True,
# 回调函数,默认为 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤,默认为 1
callback_steps: int = 1,
# 是否使用 Karras Sigma,默认为 False
use_karras_sigmas: Optional[bool] = False,
# 噪声采样器的种子,默认为 None
noise_sampler_seed: Optional[int] = None,
# 跳过的剪辑数量,默认为 None
clip_skip: int = None,
.\diffusers\pipelines\stable_diffusion_k_diffusion\pipeline_stable_diffusion_xl_k_diffusion.py
# 版权所有 2024 HuggingFace 团队。所有权利保留。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非符合许可证,否则不得使用此文件。
# 可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,按“原样”分发的软件不提供任何形式的保证或条件,
# 无论是明示或暗示的。有关许可证下的特定权限和限制,请参阅许可证。
import importlib # 导入用于动态导入模块的库
import inspect # 导入用于检查对象的库
from typing import List, Optional, Tuple, Union # 导入类型注解
import torch # 导入 PyTorch 库
from k_diffusion.external import CompVisDenoiser, CompVisVDenoiser # 导入外部去噪模型
from k_diffusion.sampling import BrownianTreeNoiseSampler, get_sigmas_karras # 导入采样器和获取函数
from transformers import ( # 导入 Transformers 库中的模型和分词器
CLIPTextModel,
CLIPTextModelWithProjection,
CLIPTokenizer,
)
from ...image_processor import VaeImageProcessor # 导入变分自编码器图像处理器
from ...loaders import ( # 导入不同加载器的混合器
FromSingleFileMixin,
IPAdapterMixin,
StableDiffusionXLLoraLoaderMixin,
TextualInversionLoaderMixin,
)
from ...models import AutoencoderKL, UNet2DConditionModel # 导入模型类
from ...models.attention_processor import ( # 导入注意力处理器
AttnProcessor2_0,
FusedAttnProcessor2_0,
XFormersAttnProcessor,
)
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 规模的函数
from ...schedulers import KarrasDiffusionSchedulers, LMSDiscreteScheduler # 导入调度器
from ...utils import ( # 导入实用工具
USE_PEFT_BACKEND,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
from ...utils.torch_utils import randn_tensor # 导入生成随机张量的工具
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和混合器
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput # 导入管道输出类
logger = logging.get_logger(__name__) # 初始化日志记录器,使用模块名
EXAMPLE_DOC_STRING = """ # 示例文档字符串,演示用法
Examples:
```py
>>> import torch # 导入 PyTorch 库
>>> from diffusers import StableDiffusionXLKDiffusionPipeline # 导入扩散管道
>>> pipe = StableDiffusionXLKDiffusionPipeline.from_pretrained( # 从预训练模型加载管道
... "stabilityai/stable-diffusion-xl-base-1.0", torch_dtype=torch.float16 # 设置模型和数据类型
... )
>>> pipe = pipe.to("cuda") # 将管道移动到 GPU
>>> pipe.set_scheduler("sample_dpmpp_2m_sde") # 设置调度器
>>> prompt = "a photo of an astronaut riding a horse on mars" # 定义生成图像的提示
>>> image = pipe(prompt).images[0] # 生成图像并获取第一张
```py
"""
# 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.ModelWrapper 复制的类
class ModelWrapper: # 定义模型包装器类
def __init__(self, model, alphas_cumprod): # 初始化方法,接受模型和累积 alpha 值
self.model = model # 保存模型
self.alphas_cumprod = alphas_cumprod # 保存累积 alpha 值
def apply_model(self, *args, **kwargs): # 应用模型的方法
if len(args) == 3: # 如果传入三个位置参数
encoder_hidden_states = args[-1] # 获取最后一个参数作为编码器隐藏状态
args = args[:2] # 保留前两个参数
if kwargs.get("cond", None) is not None: # 如果关键字参数中有“cond”
encoder_hidden_states = kwargs.pop("cond") # 从关键字参数中提取并删除“cond”
return self.model(*args, encoder_hidden_states=encoder_hidden_states, **kwargs).sample # 调用模型并返回样本
# 定义一个类 StableDiffusionXLKDiffusionPipeline,继承自多个基类
class StableDiffusionXLKDiffusionPipeline(
# 继承自 DiffusionPipeline 基类,提供扩散模型功能
DiffusionPipeline,
# 继承自 StableDiffusionMixin,提供稳定扩散特性
StableDiffusionMixin,
# 继承自 FromSingleFileMixin,支持从单个文件加载
FromSingleFileMixin,
# 继承自 StableDiffusionXLLoraLoaderMixin,支持加载 LoRA 权重
StableDiffusionXLLoraLoaderMixin,
# 继承自 TextualInversionLoaderMixin,支持加载文本反转嵌入
TextualInversionLoaderMixin,
# 继承自 IPAdapterMixin,支持加载 IP 适配器
IPAdapterMixin,
):
# 文档字符串,描述该管道的功能和用途
r"""
Pipeline for text-to-image generation using Stable Diffusion XL and k-diffusion.
# 该模型继承自 `DiffusionPipeline`。请查看超类文档以了解库为所有管道实现的通用方法
# (例如下载或保存、在特定设备上运行等)
# 该管道还继承了以下加载方法:
# [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
# [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
# [`~loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
# [`~loaders.StableDiffusionXLLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
# [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
# 参数:
# vae ([`AutoencoderKL`] ):
# 变分自编码器 (VAE) 模型,用于编码和解码图像到潜在表示。
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
# text_encoder ([`CLIPTextModel`]):
# 冻结的文本编码器。Stable Diffusion XL 使用 CLIP 的文本部分
# [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel),具体为
# [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
text_encoder ([`CLIPTextModel`]):
Frozen text-encoder. Stable Diffusion XL uses the text portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically
the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.
# text_encoder_2 ([` CLIPTextModelWithProjection`]):
# 第二个冻结文本编码器。Stable Diffusion XL 使用 CLIP 的文本和池部分
# [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
# 具体为
# [laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
# 变体。
text_encoder_2 ([` CLIPTextModelWithProjection`]):
Second frozen text-encoder. Stable Diffusion XL uses the text and pool portion of
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
variant.
# tokenizer (`CLIPTokenizer`):
# CLIP 的分词器
# Tokenizer of class
# [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer (`CLIPTokenizer`):
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
# tokenizer_2 (`CLIPTokenizer`):
# 第二个 CLIP 分词器
# Second Tokenizer of class
# [CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_2 (`CLIPTokenizer`):
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
# unet ([`UNet2DConditionModel`]): 条件 U-Net 架构,用于去噪编码后的图像潜变量。
# Conditional U-Net architecture to denoise the encoded image latents.
unet ([`UNet2DConditionModel`]):
Conditional U-Net architecture to denoise the encoded image latents.
# scheduler ([`SchedulerMixin`]):
# 用于与 `unet` 结合使用的调度器,以去噪编码的图像潜变量。可以是
# [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的一个。
scheduler ([`SchedulerMixin`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
# force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
# 是否将负提示嵌入强制设置为 0。还请参见
# `stabilityai/stable-diffusion-xl-base-1-0` 的配置。
force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`):
Whether the negative prompt embeddings shall be forced to always be set to 0. Also see the config of
`stabilityai/stable-diffusion-xl-base-1-0`.
"""
# 文档字符串,通常用于描述类或方法的功能
# 定义模型中 CPU 卸载的顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->unet->vae"
# 定义可选组件列表
_optional_components = [
"tokenizer", # 词元化器
"tokenizer_2", # 第二个词元化器
"text_encoder", # 文本编码器
"text_encoder_2", # 第二个文本编码器
"feature_extractor", # 特征提取器
]
# 初始化方法,接收多个参数
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器
text_encoder: CLIPTextModel, # 文本编码器
text_encoder_2: CLIPTextModelWithProjection, # 带投影的第二文本编码器
tokenizer: CLIPTokenizer, # 词元化器
tokenizer_2: CLIPTokenizer, # 第二个词元化器
unet: UNet2DConditionModel, # UNet 模型
scheduler: KarrasDiffusionSchedulers, # 调度器
force_zeros_for_empty_prompt: bool = True, # 空提示时强制使用零
):
super().__init__() # 调用父类初始化方法
# 从 LMS 获取正确的 sigma 值
scheduler = LMSDiscreteScheduler.from_config(scheduler.config) # 根据配置创建调度器
# 注册模块到类中
self.register_modules(
vae=vae, # 注册变分自编码器
text_encoder=text_encoder, # 注册文本编码器
text_encoder_2=text_encoder_2, # 注册第二文本编码器
tokenizer=tokenizer, # 注册词元化器
tokenizer_2=tokenizer_2, # 注册第二个词元化器
unet=unet, # 注册 UNet 模型
scheduler=scheduler, # 注册调度器
)
# 将配置中的参数注册到类中
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
# 计算 VAE 缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 获取 UNet 的默认样本大小
self.default_sample_size = self.unet.config.sample_size
# 创建模型包装器
model = ModelWrapper(unet, scheduler.alphas_cumprod)
# 根据调度器配置选择 K Diffusion 模型
if scheduler.config.prediction_type == "v_prediction":
self.k_diffusion_model = CompVisVDenoiser(model) # 使用 V 预测模型
else:
self.k_diffusion_model = CompVisDenoiser(model) # 使用普通去噪模型
# 从 StableDiffusionKDiffusionPipeline 复制的方法,设置调度器
def set_scheduler(self, scheduler_type: str):
library = importlib.import_module("k_diffusion") # 动态导入 k_diffusion 库
sampling = getattr(library, "sampling") # 获取 sampling 模块
try:
# 根据调度器类型设置采样器
self.sampler = getattr(sampling, scheduler_type)
except Exception:
valid_samplers = [] # 初始化有效采样器列表
# 遍历 sampling 模块中的属性,查找有效采样器
for s in dir(sampling):
if "sample_" in s:
valid_samplers.append(s)
# 抛出无效调度器类型的异常,并提供有效选择
raise ValueError(f"Invalid scheduler type {scheduler_type}. Please choose one of {valid_samplers}.")
# 从 StableDiffusionXLPipeline 复制的方法,编码提示
# 定义一个编码提示的函数,接受多个参数以生成图像
def encode_prompt(
self,
prompt: str, # 主提示字符串
prompt_2: Optional[str] = None, # 可选的第二个提示字符串
device: Optional[torch.device] = None, # 可选的设备信息
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
do_classifier_free_guidance: bool = True, # 是否执行无分类器引导
negative_prompt: Optional[str] = None, # 可选的负提示字符串
negative_prompt_2: Optional[str] = None, # 可选的第二个负提示字符串
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入张量
pooled_prompt_embeds: Optional[torch.Tensor] = None, # 可选的池化提示嵌入张量
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None, # 可选的池化负提示嵌入张量
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
clip_skip: Optional[int] = None, # 可选的剪切跳过参数
# 定义一个检查输入的函数,确保所有必要参数有效
def check_inputs(
self,
prompt, # 主提示
prompt_2, # 第二个提示
height, # 高度参数
width, # 宽度参数
negative_prompt=None, # 负提示
negative_prompt_2=None, # 第二个负提示
prompt_embeds=None, # 提示嵌入
negative_prompt_embeds=None, # 负提示嵌入
pooled_prompt_embeds=None, # 池化提示嵌入
negative_pooled_prompt_embeds=None, # 池化负提示嵌入
# 准备潜在变量的函数,根据输入生成张量
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 根据批量大小、通道数、高度和宽度定义形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor, # 计算缩放后的高度
int(width) // self.vae_scale_factor, # 计算缩放后的宽度
)
# 检查生成器列表长度是否与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量为空,则生成随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else: # 否则,将潜在变量转移到指定设备
latents = latents.to(device)
return latents # 返回生成或处理后的潜在变量
# 从 StableDiffusionXLPipeline 复制的函数,用于获取添加时间ID
def _get_add_time_ids(
self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
):
# 创建添加时间ID列表,由原始大小、裁剪坐标和目标大小组成
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 计算实际添加嵌入维度
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取期望的添加嵌入维度
expected_add_embed_dim = self.unet.add_embedding.linear_1.in_features
# 检查实际和期望的维度是否匹配
if expected_add_embed_dim != passed_add_embed_dim:
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将添加时间ID转换为张量并返回
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
return add_time_ids
# 从 StableDiffusionXLPipeline 复制的函数,用于上采样 VAE
# 定义一个方法,用于将 VAE 的数据类型提升到 float32
def upcast_vae(self):
# 获取当前 VAE 的数据类型
dtype = self.vae.dtype
# 将 VAE 转换为 float32 数据类型
self.vae.to(dtype=torch.float32)
# 检查当前使用的是否是 Torch 2.0 或 XFormers 的注意力处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
FusedAttnProcessor2_0,
),
)
# 如果使用的是 XFormers 或 Torch 2.0,注意力块不需要为 float32,这样可以节省大量内存
if use_torch_2_0_or_xformers:
# 将后量化卷积层转换为原始数据类型
self.vae.post_quant_conv.to(dtype)
# 将输入卷积层转换为原始数据类型
self.vae.decoder.conv_in.to(dtype)
# 将中间块转换为原始数据类型
self.vae.decoder.mid_block.to(dtype)
# 定义一个属性,用于获取引导缩放因子
@property
def guidance_scale(self):
# 返回引导缩放因子的值
return self._guidance_scale
# 定义一个属性,用于获取剪切跳过的值
@property
def clip_skip(self):
# 返回剪切跳过的值
return self._clip_skip
# 定义一个属性,指示是否执行无分类器引导
# 该引导等同于 Imagen 论文中的指导权重 w
@property
def do_classifier_free_guidance(self):
# 判断引导缩放因子是否大于 1 且时间条件投影维度为 None
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 装饰器,表示在推理过程中不计算梯度
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用方法,接收多种参数
def __call__(
# 接收一个或多个提示文本
prompt: Union[str, List[str]] = None,
# 接收第二个提示文本
prompt_2: Optional[Union[str, List[str]]] = None,
# 可选的图像高度
height: Optional[int] = None,
# 可选的图像宽度
width: Optional[int] = None,
# 指定推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 指定引导缩放因子的值,默认为 5.0
guidance_scale: float = 5.0,
# 可选的负提示文本
negative_prompt: Optional[Union[str, List[str]]] = None,
# 可选的第二个负提示文本
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 可选的随机数生成器
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的潜在变量张量
latents: Optional[torch.Tensor] = None,
# 可选的提示嵌入张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的聚合提示嵌入张量
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负聚合提示嵌入张量
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 指示是否返回字典形式的结果,默认为 True
return_dict: bool = True,
# 可选的原始图像尺寸
original_size: Optional[Tuple[int, int]] = None,
# 默认的裁剪坐标,默认为 (0, 0)
crops_coords_top_left: Tuple[int, int] = (0, 0),
# 可选的目标尺寸
target_size: Optional[Tuple[int, int]] = None,
# 可选的负原始尺寸
negative_original_size: Optional[Tuple[int, int]] = None,
# 可选的负裁剪坐标
negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
# 可选的负目标尺寸
negative_target_size: Optional[Tuple[int, int]] = None,
# 可选的 Karras sigma 使用标志
use_karras_sigmas: Optional[bool] = False,
# 可选的噪声采样器种子
noise_sampler_seed: Optional[int] = None,
# 可选的剪切跳过值
clip_skip: Optional[int] = None,
.\diffusers\pipelines\stable_diffusion_k_diffusion\__init__.py
# 导入类型检查支持
from typing import TYPE_CHECKING
# 从工具模块中导入必要的工具和依赖
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢导入标志
OptionalDependencyNotAvailable, # 可选依赖不可用异常
_LazyModule, # 懒加载模块
get_objects_from_module, # 从模块获取对象的函数
is_k_diffusion_available, # 检查 k_diffusion 是否可用
is_k_diffusion_version, # 检查 k_diffusion 的版本
is_torch_available, # 检查 PyTorch 是否可用
is_transformers_available, # 检查 transformers 是否可用
)
# 初始化一个空字典以存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典以定义导入结构
_import_structure = {}
# 尝试块,检查依赖是否可用
try:
if not (
is_transformers_available() # 检查 transformers 可用性
and is_torch_available() # 检查 PyTorch 可用性
and is_k_diffusion_available() # 检查 k_diffusion 可用性
and is_k_diffusion_version(">=", "0.0.12") # 检查 k_diffusion 版本
):
raise OptionalDependencyNotAvailable() # 抛出可选依赖不可用异常
except OptionalDependencyNotAvailable:
# 如果依赖不可用,导入虚拟对象以避免错误
from ...utils import dummy_torch_and_transformers_and_k_diffusion_objects # noqa F403
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_and_k_diffusion_objects))
else:
# 如果依赖可用,定义导入结构
_import_structure["pipeline_stable_diffusion_k_diffusion"] = ["StableDiffusionKDiffusionPipeline"]
_import_structure["pipeline_stable_diffusion_xl_k_diffusion"] = ["StableDiffusionXLKDiffusionPipeline"]
# 根据类型检查或慢导入标志,进行进一步处理
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
if not (
is_transformers_available() # 检查 transformers 可用性
and is_torch_available() # 检查 PyTorch 可用性
and is_k_diffusion_available() # 检查 k_diffusion 可用性
and is_k_diffusion_version(">=", "0.0.12") # 检查 k_diffusion 版本
):
raise OptionalDependencyNotAvailable() # 抛出可选依赖不可用异常
except OptionalDependencyNotAvailable:
# 如果依赖不可用,导入虚拟对象以避免错误
from ...utils.dummy_torch_and_transformers_and_k_diffusion_objects import *
else:
# 如果依赖可用,导入相关管道
from .pipeline_stable_diffusion_k_diffusion import StableDiffusionKDiffusionPipeline
from .pipeline_stable_diffusion_xl_k_diffusion import StableDiffusionXLKDiffusionPipeline
else:
# 如果不是类型检查,设置懒加载模块
import sys
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块名
globals()["__file__"], # 当前文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块规范
)
# 将虚拟对象设置到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_ldm3d\pipeline_stable_diffusion_ldm3d.py
# 版权声明,说明版权所有者及相关团队
# Copyright 2024 The Intel Labs Team Authors and the HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0(“许可证”)许可;
# 除非符合许可证,否则不得使用此文件。
# 可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,
# 否则根据许可证分发的软件在“原样”基础上提供,
# 不提供任何形式的担保或条件。
# 请参见许可证,以了解有关权限和限制的具体内容。
import inspect # 导入inspect模块以进行对象获取和检查
from dataclasses import dataclass # 从dataclasses模块导入dataclass装饰器
from typing import Any, Callable, Dict, List, Optional, Union # 导入常用类型提示
import numpy as np # 导入NumPy库以进行数值计算
import PIL.Image # 导入PIL库中的Image模块以处理图像
import torch # 导入PyTorch库以进行深度学习操作
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection # 导入Transformers库中的CLIP相关模型和处理器
from ...image_processor import PipelineImageInput, VaeImageProcessorLDM3D # 从相对路径导入图像处理相关类
from ...loaders import FromSingleFileMixin, IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入不同类型的加载器混合类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel # 导入不同的模型类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整Lora文本编码器的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入Karras扩散调度器
from ...utils import ( # 从utils模块导入多个工具函数和常量
USE_PEFT_BACKEND,
BaseOutput,
deprecate,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
from ...utils.torch_utils import randn_tensor # 从torch_utils模块导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 从pipeline_utils模块导入扩散管道和稳定扩散混合类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 从stable_diffusion模块导入安全检查器
logger = logging.get_logger(__name__) # 创建一个记录器,用于记录模块中的日志信息,禁用pylint对名称无效的警告
EXAMPLE_DOC_STRING = """ # 定义示例文档字符串,用于展示如何使用StableDiffusionLDM3DPipeline
Examples:
```python
>>> from diffusers import StableDiffusionLDM3DPipeline # 从diffusers模块导入StableDiffusionLDM3DPipeline
>>> pipe = StableDiffusionLDM3DPipeline.from_pretrained("Intel/ldm3d-4c") # 从预训练模型加载管道
>>> pipe = pipe.to("cuda") # 将管道移动到GPU设备上
>>> prompt = "a photo of an astronaut riding a horse on mars" # 定义生成图像的提示
>>> output = pipe(prompt) # 使用提示生成图像输出
>>> rgb_image, depth_image = output.rgb, output.depth # 从输出中提取RGB图像和深度图像
>>> rgb_image[0].save("astronaut_ldm3d_rgb.jpg") # 保存RGB图像
>>> depth_image[0].save("astronaut_ldm3d_depth.png") # 保存深度图像
```py
"""
# 从diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion模块复制的函数
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0): # 定义一个函数,用于根据guidance_rescale重缩放噪声配置
"""
根据`guidance_rescale`重缩放`noise_cfg`。基于[Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf)中的发现。见第3.4节
"""
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True) # 计算文本预测噪声的标准差
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True) # 计算噪声配置的标准差
# 根据引导结果重缩放噪声(修复过度曝光问题)
noise_pred_rescaled = noise_cfg * (std_text / std_cfg) # 通过标准差比例重缩放噪声配置
# 将原始结果与通过因子 guidance_rescale 指导的结果混合,以避免“平淡无奇”的图像
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
# 返回混合后的噪声配置
return noise_cfg
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制的函数
def retrieve_timesteps(
# 调度器对象,用于获取时间步
scheduler,
# 推理步骤的数量,可选参数,默认为 None
num_inference_steps: Optional[int] = None,
# 设备类型,可选参数,默认为 None
device: Optional[Union[str, torch.device]] = None,
# 自定义时间步,可选参数,默认为 None
timesteps: Optional[List[int]] = None,
# 自定义 sigma 值,可选参数,默认为 None
sigmas: Optional[List[float]] = None,
# 其他关键字参数
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器检索时间步。处理自定义时间步。
任何 kwargs 将被传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
获取时间步的调度器。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数。如果使用,`timesteps` 必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步要移动到的设备。如果为 `None`,时间步将不被移动。
timesteps (`List[int]`, *可选*):
自定义时间步,用于覆盖调度器的时间步间隔策略。如果传递了 `timesteps`,
则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
自定义 sigma,用于覆盖调度器的时间步间隔策略。如果传递了 `sigmas`,
则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,第一个元素是来自调度器的时间步调度,第二个元素是推理步骤的数量。
"""
# 检查是否同时传递了自定义时间步和自定义 sigma
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传递了自定义时间步
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,则抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器获取当前的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果传递了自定义 sigma
elif sigmas is not None:
# 检查调度器的 set_timesteps 方法是否接受自定义 sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,则抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器获取当前的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
else: # 如果前面的条件不满足,则执行以下代码
# 设置调度器的时间步长,传入推理步骤数量和设备参数
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取当前调度器的时间步长
timesteps = scheduler.timesteps
# 返回时间步长和推理步骤数量
return timesteps, num_inference_steps
# 定义 LDM3D 输出数据类,继承自 BaseOutput
@dataclass
class LDM3DPipelineOutput(BaseOutput):
"""
输出类,用于稳定扩散管道。
参数:
rgb (`List[PIL.Image.Image]` 或 `np.ndarray`)
表示去噪后的 PIL 图像列表,长度为 `batch_size` 或形状为 `(batch_size, height, width,
num_channels)` 的 NumPy 数组。
depth (`List[PIL.Image.Image]` 或 `np.ndarray`)
表示去噪后的 PIL 图像列表,长度为 `batch_size` 或形状为 `(batch_size, height, width,
num_channels)` 的 NumPy 数组。
nsfw_content_detected (`List[bool]`)
表示相应生成图像是否包含“不适合工作” (nsfw) 内容的列表,如果无法执行安全检查则为 `None`。
"""
# 定义 rgb 属性,可以是 PIL 图像列表或 NumPy 数组
rgb: Union[List[PIL.Image.Image], np.ndarray]
# 定义 depth 属性,可以是 PIL 图像列表或 NumPy 数组
depth: Union[List[PIL.Image.Image], np.ndarray]
# 定义 nsfw_content_detected 属性,表示每个图像的安全性检测结果
nsfw_content_detected: Optional[List[bool]]
# 定义稳定扩散 LDM3D 管道类,继承多个混合类
class StableDiffusionLDM3DPipeline(
DiffusionPipeline,
StableDiffusionMixin,
TextualInversionLoaderMixin,
IPAdapterMixin,
StableDiffusionLoraLoaderMixin,
FromSingleFileMixin,
):
r"""
用于文本到图像和 3D 生成的管道,使用 LDM3D。
此模型继承自 [`DiffusionPipeline`]。请查看超类文档以获取所有管道的通用方法(下载、保存、在特定设备上运行等)。
此管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
- [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
# 定义参数说明部分
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) 模型,用于编码和解码图像到潜在表示。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于将文本标记化的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于对编码的图像潜在表示进行去噪的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于对编码的图像潜在表示进行去噪。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,用于评估生成的图像是否可能被视为冒犯或有害。
有关模型潜在危害的更多详细信息,请参阅 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成图像中提取特征的 `CLIPImageProcessor`;作为输入用于 `safety_checker`。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件的列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义在 CPU 卸载时排除的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 定义回调张量输入的列表
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
# 初始化方法定义
def __init__(
# 初始化所需的 VAE 模型
vae: AutoencoderKL,
# 初始化所需的文本编码器
text_encoder: CLIPTextModel,
# 初始化所需的标记器
tokenizer: CLIPTokenizer,
# 初始化所需的 UNet 模型
unet: UNet2DConditionModel,
# 初始化所需的调度器
scheduler: KarrasDiffusionSchedulers,
# 初始化所需的安全检查器
safety_checker: StableDiffusionSafetyChecker,
# 初始化所需的特征提取器
feature_extractor: CLIPImageProcessor,
# 可选的图像编码器
image_encoder: Optional[CLIPVisionModelWithProjection],
# 指示是否需要安全检查器的布尔值
requires_safety_checker: bool = True,
# 初始化父类
):
super().__init__()
# 检查是否禁用安全检查器且需要安全检查器
if safety_checker is None and requires_safety_checker:
# 记录警告信息,提醒用户有关安全检查器的使用建议
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 检查是否提供了安全检查器但未提供特征提取器
if safety_checker is not None and feature_extractor is None:
# 引发错误,提示用户需定义特征提取器
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块到当前对象
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
image_encoder=image_encoder,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessorLDM3D(vae_scale_factor=self.vae_scale_factor)
# 将需要的配置注册到当前对象
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 复制的方法,用于编码提示
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
**kwargs,
# 定义一个方法,可能是类中的一个部分
):
# 创建一个关于 `_encode_prompt()` 方法被弃用的提示信息
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数,记录弃用信息和版本
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,传入一系列参数,获取提示嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 输入提示
device=device, # 设备类型(CPU或GPU)
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=negative_prompt, # 负面提示
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负面提示嵌入
lora_scale=lora_scale, # Lora缩放因子
**kwargs, # 其他关键字参数
)
# 将提示嵌入元组中的两个元素连接成一个张量,用于向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回合并后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 复制的 encode_prompt 方法定义
def encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备类型(CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 负面提示,默认为 None
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 Lora 缩放因子
clip_skip: Optional[int] = None, # 可选的剪辑跳过参数
# 从 StableDiffusionPipeline 复制的 encode_image 方法定义
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 检查输入图像是否为张量,如果不是则进行转换
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像数据移动到指定设备并转换为指定类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态,则处理隐藏状态
if output_hidden_states:
# 获取编码后的图像隐藏状态,并重复图像以适应生成数量
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 获取无条件图像的编码隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 重复无条件图像隐藏状态
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回图像和无条件图像的编码隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 获取编码后的图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 重复图像嵌入以适应生成数量
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建一个与图像嵌入大小相同的零张量作为无条件图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回图像嵌入和无条件图像嵌入
return image_embeds, uncond_image_embeds
# 从 StableDiffusionPipeline 复制的 prepare_ip_adapter_image_embeds 方法定义
# 准备 IP 适配器图像的嵌入表示
def prepare_ip_adapter_image_embeds(
# 定义方法的参数,包括图像、图像嵌入、设备、每个提示的图像数量和分类自由引导标志
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 初始化图像嵌入列表
image_embeds = []
# 如果启用分类自由引导
if do_classifier_free_guidance:
# 初始化负图像嵌入列表
negative_image_embeds = []
# 如果图像嵌入为空
if ip_adapter_image_embeds is None:
# 如果传入的图像不是列表,将其转为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查图像数量与 IP 适配器的数量是否一致
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
# 如果不一致,抛出值错误
raise ValueError(
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历图像和相应的图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 检查图像投影层是否为 ImageProjection 类型
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像,返回图像嵌入和负图像嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将单个图像嵌入添加到嵌入列表中
image_embeds.append(single_image_embeds[None, :])
# 如果启用分类自由引导,将负图像嵌入添加到列表中
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 如果已提供图像嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用分类自由引导,分离负图像嵌入和图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
# 将负图像嵌入添加到列表中
negative_image_embeds.append(single_negative_image_embeds)
# 将图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
# 初始化 IP 适配器图像嵌入列表
ip_adapter_image_embeds = []
# 遍历图像嵌入列表
for i, single_image_embeds in enumerate(image_embeds):
# 复制嵌入以生成每个提示的图像数量
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用分类自由引导
if do_classifier_free_guidance:
# 复制负图像嵌入以生成每个提示的图像数量
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 将负图像嵌入和图像嵌入拼接在一起
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入转移到指定设备上
single_image_embeds = single_image_embeds.to(device=device)
# 将处理后的图像嵌入添加到结果列表中
ip_adapter_image_embeds.append(single_image_embeds)
# 返回 IP 适配器图像嵌入列表
return ip_adapter_image_embeds
# 定义运行安全检查器的方法,接收图像、设备和数据类型作为参数
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则没有 NSFW 概念
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果图像是一个张量,则进行后处理以转换为 PIL 图像
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果图像不是张量,则将 NumPy 数组转换为 PIL 图像
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取第一个图像作为 RGB 特征输入
rgb_feature_extractor_input = feature_extractor_input[0]
# 使用特征提取器将 RGB 输入转换为张量并移动到指定设备
safety_checker_input = self.feature_extractor(rgb_feature_extractor_input, return_tensors="pt").to(device)
# 使用安全检查器检查图像,返回处理后的图像和 NSFW 概念的标识
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 NSFW 概念的标识
return image, has_nsfw_concept
# 从 StableDiffusionPipeline 复制的准备额外步骤参数的方法
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外参数,因为不是所有调度器具有相同的签名
# eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应该在 [0, 1] 之间
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外步骤参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外步骤参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外步骤参数字典
return extra_step_kwargs
# 从 StableDiffusionPipeline 复制的检查输入的方法
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
ip_adapter_image=None,
ip_adapter_image_embeds=None,
callback_on_step_end_tensor_inputs=None,
# 准备潜在变量,用于生成模型的输入
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,依据批量大小和通道数
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器是否为列表且其长度与批量大小一致
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出值错误,提示生成器长度与批量大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在变量,则生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将提供的潜在变量转移到指定设备
latents = latents.to(device)
# 将初始噪声按调度器要求的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img 的 LatentConsistencyModelPipeline 复制
def get_guidance_scale_embedding(
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
) -> torch.Tensor:
"""
查看链接 https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
参数:
w (`torch.Tensor`):
生成带有指定引导缩放的嵌入向量,以丰富时间步嵌入。
embedding_dim (`int`, *可选*, 默认为 512):
要生成的嵌入维度。
dtype (`torch.dtype`, *可选*, 默认为 `torch.float32`):
生成的嵌入数据类型。
返回:
`torch.Tensor`: 嵌入向量,形状为 `(len(w), embedding_dim)`。
"""
# 确保输入张量 w 是一维的
assert len(w.shape) == 1
# 将 w 乘以 1000.0
w = w * 1000.0
# 计算半维度
half_dim = embedding_dim // 2
# 计算嵌入的缩放因子
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 生成指数衰减的嵌入
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 计算最终嵌入,按 w 进行缩放
emb = w.to(dtype)[:, None] * emb[None, :]
# 连接正弦和余弦变换的嵌入
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度为奇数,则进行零填充
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保生成的嵌入形状正确
assert emb.shape == (w.shape[0], embedding_dim)
# 返回生成的嵌入
return emb
# 定义属性,返回引导缩放因子
@property
def guidance_scale(self):
return self._guidance_scale
# 定义属性,返回引导重缩放因子
@property
def guidance_rescale(self):
return self._guidance_rescale
# 定义属性,返回跳过剪辑的标志
@property
def clip_skip(self):
return self._clip_skip
# 这里的 `guidance_scale` 定义类似于 Imagen 论文中方程 (2) 的引导权重 `w`
# https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`
# 对应于不进行分类器自由引导。
@property
# 定义一个方法,用于判断是否进行无分类器引导
def do_classifier_free_guidance(self):
# 判断引导比例是否大于1并且时间条件投影维度是否为None
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 定义一个属性,返回交叉注意力的关键字参数
@property
def cross_attention_kwargs(self):
return self._cross_attention_kwargs
# 定义一个属性,返回时间步数
@property
def num_timesteps(self):
return self._num_timesteps
# 定义一个属性,返回中断状态
@property
def interrupt(self):
return self._interrupt
# 使用无梯度计算装饰器,定义调用方法
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 定义调用方法的参数,prompt可以是字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 定义可选的高度参数
height: Optional[int] = None,
# 定义可选的宽度参数
width: Optional[int] = None,
# 设置推理步骤的默认值为49
num_inference_steps: int = 49,
# 定义可选的时间步数列表
timesteps: List[int] = None,
# 定义可选的sigma值列表
sigmas: List[float] = None,
# 设置引导比例的默认值为5.0
guidance_scale: float = 5.0,
# 定义可选的负面提示,可以是字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 定义每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# 设置eta的默认值为0.0
eta: float = 0.0,
# 定义可选的生成器,可以是单个或多个torch.Generator
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 定义可选的潜在向量
latents: Optional[torch.Tensor] = None,
# 定义可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 定义可选的负面提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 定义可选的图像输入适配器
ip_adapter_image: Optional[PipelineImageInput] = None,
# 定义可选的图像适配器嵌入列表
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 设置输出类型的默认值为"pil"
output_type: Optional[str] = "pil",
# 设置返回字典的默认值为True
return_dict: bool = True,
# 定义可选的交叉注意力关键字参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 设置引导重标定的默认值为0.0
guidance_rescale: float = 0.0,
# 定义可选的剪切跳过参数
clip_skip: Optional[int] = None,
# 定义可选的步骤结束回调函数
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 定义步骤结束时的张量输入回调参数,默认包括"latents"
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 接收其他关键字参数
**kwargs,