diffusers 源码解析(五十)
.\diffusers\pipelines\stable_diffusion_ldm3d\__init__.py
# 导入类型检查的常量
from typing import TYPE_CHECKING
# 从 utils 模块导入所需的工具和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢导入标志
OptionalDependencyNotAvailable, # 可选依赖未找到异常
_LazyModule, # 延迟加载模块的类
get_objects_from_module, # 从模块获取对象的函数
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 初始化一个空字典用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存储导入结构
_import_structure = {}
# 尝试检查依赖项可用性
try:
# 如果 Transformers 和 PyTorch 都不可用,则引发异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
from ...utils import dummy_torch_and_transformers_objects # noqa F403 # 导入虚拟对象的模块
# 更新虚拟对象字典,填充 dummy 对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果依赖项可用,则更新导入结构
else:
_import_structure["pipeline_stable_diffusion_ldm3d"] = ["StableDiffusionLDM3DPipeline"]
# 如果在类型检查或慢导入模式下
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 再次检查依赖项可用性
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # 导入虚拟对象
else:
# 从稳定扩散管道模块导入管道类
from .pipeline_stable_diffusion_ldm3d import StableDiffusionLDM3DPipeline
# 否则,进行懒加载
else:
import sys # 导入 sys 模块
# 将当前模块替换为一个懒加载模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure, # 传入导入结构
module_spec=__spec__, # 模块规格
)
# 将虚拟对象设置到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_panorama\pipeline_stable_diffusion_panorama.py
# 版权声明,说明该文件的版权所有者及相关信息
# Copyright 2024 MultiDiffusion Authors and The HuggingFace Team. All rights reserved."
# 根据 Apache 许可证第 2.0 版(“许可证”)进行授权;
# 您不得在未遵守许可证的情况下使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,软件
# 按照“按现状”基础分发,没有任何形式的保证或条件,
# 明示或暗示。
# 请参阅许可证以获取有关权限和
# 限制的具体说明。
# 导入用于复制对象的库
import copy
# 导入用于检查对象的库
import inspect
# 导入用于类型提示的库
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 导入 CLIP 模型相关的处理器和模型
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection
# 从本地模块导入图像处理和加载器相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 从本地模块导入模型相关的类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
from ...models.lora import adjust_lora_scale_text_encoder
# 导入调度器
from ...schedulers import DDIMScheduler
# 导入一些实用工具
from ...utils import (
USE_PEFT_BACKEND,
deprecate,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
# 从实用工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入扩散管道和稳定扩散相关的混合类
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从稳定扩散模块导入输出类
from ..stable_diffusion import StableDiffusionPipelineOutput
# 从安全检查器模块导入稳定扩散安全检查器
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker
# 初始化日志记录器,指定当前模块的名称
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,提供如何使用该模块的示例
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import StableDiffusionPanoramaPipeline, DDIMScheduler
>>> model_ckpt = "stabilityai/stable-diffusion-2-base"
>>> scheduler = DDIMScheduler.from_pretrained(model_ckpt, subfolder="scheduler")
>>> pipe = StableDiffusionPanoramaPipeline.from_pretrained(
... model_ckpt, scheduler=scheduler, torch_dtype=torch.float16
... )
>>> pipe = pipe.to("cuda")
>>> prompt = "a photo of the dolomites"
>>> image = pipe(prompt).images[0]
```py
"""
# 从稳定扩散的管道中复制的函数,用于重新缩放噪声配置
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0):
"""
根据 `guidance_rescale` 重新缩放 `noise_cfg`。基于 [Common Diffusion Noise Schedules and
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf) 中的发现。见第 3.4 节
"""
# 计算噪声预测文本的标准差,沿指定维度保持维度
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True)
# 计算噪声配置的标准差,沿指定维度保持维度
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True)
# 根据文本标准差和配置标准差重新缩放噪声配置,以修正过曝问题
noise_pred_rescaled = noise_cfg * (std_text / std_cfg)
# 将原始结果与通过因子 guidance_rescale 指导的结果混合,以避免图像“过于平淡”
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg
# 返回混合后的噪声配置
return noise_cfg
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion 复制而来,用于检索时间步
def retrieve_timesteps(
scheduler, # 调度器,用于获取时间步
num_inference_steps: Optional[int] = None, # 生成样本时使用的扩散步骤数,可选
device: Optional[Union[str, torch.device]] = None, # 指定将时间步移动到的设备,可选
timesteps: Optional[List[int]] = None, # 自定义时间步,用于覆盖调度器的时间步策略,可选
sigmas: Optional[List[float]] = None, # 自定义 sigma,用于覆盖调度器的时间步策略,可选
**kwargs, # 其他关键字参数,将传递给 `scheduler.set_timesteps`
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器中检索时间步。处理自定义时间步。
任何 kwargs 将被传递到 `scheduler.set_timesteps`。
Args:
scheduler (`SchedulerMixin`):
用于获取时间步的调度器。
num_inference_steps (`int`):
生成样本时使用的扩散步骤数。如果使用,则 `timesteps` 必须为 `None`。
device (`str` or `torch.device`, *optional*):
将时间步移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`, *optional*):
自定义时间步,用于覆盖调度器的时间步策略。如果传递 `timesteps`,则 `num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *optional*):
自定义 sigma,用于覆盖调度器的时间步策略。如果传递 `sigmas`,则 `num_inference_steps` 和 `timesteps` 必须为 `None`。
Returns:
`Tuple[torch.Tensor, int]`: 一个元组,第一元素为调度器的时间步调度,第二元素为推理步骤数。
"""
# 检查是否同时传递了自定义时间步和自定义 sigma
if timesteps is not None and sigmas is not None:
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果传递了自定义时间步
if timesteps is not None:
# 检查当前调度器是否接受自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器中获取当前时间步
timesteps = scheduler.timesteps
# 计算推理步骤数
num_inference_steps = len(timesteps)
# 如果传递了自定义 sigma
elif sigmas is not None:
# 检查当前调度器是否接受自定义 sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置自定义 sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器中获取当前时间步
timesteps = scheduler.timesteps
# 计算推理步骤数
num_inference_steps = len(timesteps)
# 否则分支处理
else:
# 设置调度器的推理步数,指定设备,并传递额外参数
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步数
return timesteps, num_inference_steps
# 定义一个名为 StableDiffusionPanoramaPipeline 的类,继承自多个基类
class StableDiffusionPanoramaPipeline(
# 继承 DiffusionPipeline 类
DiffusionPipeline,
# 继承 StableDiffusionMixin 类
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 类
TextualInversionLoaderMixin,
# 继承 StableDiffusionLoraLoaderMixin 类
StableDiffusionLoraLoaderMixin,
# 继承 IPAdapterMixin 类
IPAdapterMixin,
):
# 文档字符串,描述该管道的用途和相关信息
r"""
用于通过 MultiDiffusion 生成文本到图像的管道。
该模型继承自 [`DiffusionPipeline`]。有关所有管道的通用方法(下载、保存、在特定设备上运行等)的文档,请查看父类文档。
该管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反演嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器(VAE)模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行标记化的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码后的图像潜在值的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
结合 `unet` 用于去噪编码后的图像潜在值的调度器。可以是 [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,用于估计生成的图像是否可能被认为是冒犯性或有害的。
请参考 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5) 了解有关模型潜在危害的更多详细信息。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成图像中提取特征的 `CLIPImageProcessor`;作为 `safety_checker` 的输入。
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义不允许 CPU 卸载的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 定义回调张量输入列表
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
# 初始化方法,接受多个参数以初始化类
def __init__(
# VAE 模型参数,类型为 AutoencoderKL
self,
vae: AutoencoderKL,
# 文本编码器参数,类型为 CLIPTextModel
text_encoder: CLIPTextModel,
# 标记化器参数,类型为 CLIPTokenizer
tokenizer: CLIPTokenizer,
# UNet 模型参数,类型为 UNet2DConditionModel
unet: UNet2DConditionModel,
# 调度器参数,类型为 DDIMScheduler
scheduler: DDIMScheduler,
# 安全检查器参数,类型为 StableDiffusionSafetyChecker
safety_checker: StableDiffusionSafetyChecker,
# 特征提取器参数,类型为 CLIPImageProcessor
feature_extractor: CLIPImageProcessor,
# 可选参数,图像编码器,类型为 CLIPVisionModelWithProjection 或 None
image_encoder: Optional[CLIPVisionModelWithProjection] = None,
# 可选参数,指示是否需要安全检查器,默认为 True
requires_safety_checker: bool = True,
):
# 调用父类的构造函数,初始化基类
super().__init__()
# 检查安全检查器是否为 None,且要求使用安全检查器
if safety_checker is None and requires_safety_checker:
# 记录警告,提示用户禁用了安全检查器,并强调遵守相关许可证条件
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 检查安全检查器不为 None,且特征提取器为 None
if safety_checker is not None and feature_extractor is None:
# 抛出错误,提示用户必须定义特征提取器以使用安全检查器
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块,便于后续使用
self.register_modules(
vae=vae, # 注册变分自编码器
text_encoder=text_encoder, # 注册文本编码器
tokenizer=tokenizer, # 注册分词器
unet=unet, # 注册 UNet 模型
scheduler=scheduler, # 注册调度器
safety_checker=safety_checker, # 注册安全检查器
feature_extractor=feature_extractor, # 注册特征提取器
image_encoder=image_encoder, # 注册图像编码器
)
# 计算 VAE 的缩放因子,基于其输出通道数量
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 初始化图像处理器,使用计算出的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 将安全检查器的需求注册到配置中
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 类复制的编码提示的方法
def _encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备类型,例如 'cpu' 或 'cuda'
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 负提示文本,表示不希望生成的内容
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
**kwargs, # 其他可选参数
):
# 定义弃用消息,提醒用户该函数已弃用并将来会被移除,建议使用新的函数
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用弃用警告函数,传入函数名、版本号和警告消息
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,传递多个参数并获取返回的元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 提供的提示文本
device=device, # 设备(CPU或GPU)
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=negative_prompt, # 负面提示文本
prompt_embeds=prompt_embeds, # 已编码的提示张量
negative_prompt_embeds=negative_prompt_embeds, # 已编码的负面提示张量
lora_scale=lora_scale, # Lora缩放因子
**kwargs, # 其他可选参数
)
# 将返回的元组中的两个张量连接起来,以兼容旧版
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制的方法
def encode_prompt(
self,
prompt, # 提示文本
device, # 设备(CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 负面提示文本(可选)
prompt_embeds: Optional[torch.Tensor] = None, # 已编码的提示张量(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None, # 已编码的负面提示张量(可选)
lora_scale: Optional[float] = None, # Lora缩放因子(可选)
clip_skip: Optional[int] = None, # 剪辑跳过(可选)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制的方法
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入图像不是张量,则使用特征提取器将其转换为张量
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定设备,并转换为相应的数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 获取图像编码的隐藏状态,取倒数第二个
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 重复以生成每个提示相应数量的隐藏状态
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 获取无条件图像的隐藏状态,使用全零图像
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 重复以生成每个提示相应数量的无条件隐藏状态
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回有条件和无条件的图像编码隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 如果不需要隐藏状态,获取图像编码的嵌入
image_embeds = self.image_encoder(image).image_embeds
# 重复以生成每个提示相应数量的图像嵌入
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入相同形状的全零无条件图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回有条件和无条件的图像嵌入
return image_embeds, uncond_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_ip_adapter_image_embeds 复制的方法
# 准备图像适配器图像嵌入的函数
def prepare_ip_adapter_image_embeds(
# 输入参数:图像适配器图像、图像嵌入、设备、每个提示的图像数量、是否进行无分类器自由引导
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 初始化图像嵌入列表
image_embeds = []
# 如果启用无分类器自由引导,初始化负图像嵌入列表
if do_classifier_free_guidance:
negative_image_embeds = []
# 如果图像嵌入为 None,则进行处理
if ip_adapter_image_embeds is None:
# 如果输入的图像不是列表,则转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查图像数量与适配器数量是否匹配
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
raise ValueError(
# 抛出错误,提示图像数量与适配器数量不匹配
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 遍历图像和适配器层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 输出隐藏状态的标志
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像,获取嵌入和负嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将单个图像嵌入添加到列表
image_embeds.append(single_image_embeds[None, :])
# 如果启用无分类器自由引导,添加负图像嵌入
if do_classifier_free_guidance:
negative_image_embeds.append(single_negative_image_embeds[None, :])
else:
# 如果已有图像嵌入,遍历每个嵌入
for single_image_embeds in ip_adapter_image_embeds:
# 如果启用无分类器自由引导,分割负嵌入和图像嵌入
if do_classifier_free_guidance:
single_negative_image_embeds, single_image_embeds = single_image_embeds.chunk(2)
negative_image_embeds.append(single_negative_image_embeds)
# 添加图像嵌入到列表
image_embeds.append(single_image_embeds)
# 初始化最终图像嵌入列表
ip_adapter_image_embeds = []
# 遍历图像嵌入
for i, single_image_embeds in enumerate(image_embeds):
# 将单个图像嵌入重复指定次数
single_image_embeds = torch.cat([single_image_embeds] * num_images_per_prompt, dim=0)
# 如果启用无分类器自由引导,重复负嵌入
if do_classifier_free_guidance:
single_negative_image_embeds = torch.cat([negative_image_embeds[i]] * num_images_per_prompt, dim=0)
# 连接负嵌入和图像嵌入
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds], dim=0)
# 将图像嵌入转移到指定设备
single_image_embeds = single_image_embeds.to(device=device)
# 将结果添加到最终列表
ip_adapter_image_embeds.append(single_image_embeds)
# 返回最终的图像适配器图像嵌入列表
return ip_adapter_image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的代码
# 运行安全检查器,检测给定图像的安全性
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,则设置 NSFW 概念为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入图像是张量,处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入图像是 NumPy 数组,转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征并将其转移到指定设备上
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,并返回处理后的图像和 NSFW 概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 NSFW 概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的解码潜在变量的方法
def decode_latents(self, latents):
# 定义弃用提示信息,告知用户该方法将在未来版本中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 发出弃用警告
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,获取生成的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 对图像进行归一化处理,并限制其值在 0 到 1 之间
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,以兼容 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像
return image
# 解码潜在变量并添加填充,以便进行循环推理
def decode_latents_with_padding(self, latents: torch.Tensor, padding: int = 8) -> torch.Tensor:
"""
Decode the given latents with padding for circular inference.
Args:
latents (torch.Tensor): The input latents to decode.
padding (int, optional): The number of latents to add on each side for padding. Defaults to 8.
Returns:
torch.Tensor: The decoded image with padding removed.
Notes:
- The padding is added to remove boundary artifacts and improve the output quality.
- This would slightly increase the memory usage.
- The padding pixels are then removed from the decoded image.
"""
# 根据缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 获取潜在变量的左侧填充部分
latents_left = latents[..., :padding]
# 获取潜在变量的右侧填充部分
latents_right = latents[..., -padding:]
# 将填充部分与原始潜在变量合并
latents = torch.cat((latents_right, latents, latents_left), axis=-1)
# 解码合并后的潜在变量,获取生成的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 计算去除填充后图像的边界像素
padding_pix = self.vae_scale_factor * padding
# 去除填充像素,返回最终图像
image = image[..., padding_pix:-padding_pix]
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的准备额外步骤参数的方法
# 准备调度器步骤的额外参数,因为并非所有调度器都有相同的参数签名
def prepare_extra_step_kwargs(self, generator, eta):
# eta(η)仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# eta 的值应在 [0, 1] 之间
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果接受 eta 参数,则将其添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator 参数,则将其添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.check_inputs 复制
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
ip_adapter_image=None,
ip_adapter_image_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查输入的有效性,确保参数满足要求
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义生成潜在变量的形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 如果传入的生成器是列表且长度与批量大小不匹配,则抛出错误
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在变量,则随机生成
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供潜在变量,将其移动到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回生成的潜在变量
return latents
# 从 diffusers.pipelines.latent_consistency_models.pipeline_latent_consistency_text2img.LatentConsistencyModelPipeline.get_guidance_scale_embedding 复制
def get_guidance_scale_embedding(
self, w: torch.Tensor, embedding_dim: int = 512, dtype: torch.dtype = torch.float32
):
# 获取引导缩放嵌入的函数,接受张量和嵌入维度
# 该函数返回一个形状为 (len(w), embedding_dim) 的嵌入向量
) -> torch.Tensor:
"""
# 函数文档字符串,提供函数的详细信息和参数说明
See https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298
Args:
w (`torch.Tensor`):
# 输入张量,用于生成带有指定引导比例的嵌入向量,以丰富时间步嵌入
Generate embedding vectors with a specified guidance scale to subsequently enrich timestep embeddings.
embedding_dim (`int`, *optional*, defaults to 512):
# 生成的嵌入维度,默认为 512
Dimension of the embeddings to generate.
dtype (`torch.dtype`, *optional*, defaults to `torch.float32`):
# 生成的嵌入数据类型,默认为 torch.float32
Data type of the generated embeddings.
Returns:
`torch.Tensor`: Embedding vectors with shape `(len(w), embedding_dim)`.
"""
# 确保输入张量 w 只有一个维度
assert len(w.shape) == 1
# 将 w 的值放大 1000.0
w = w * 1000.0
# 计算嵌入维度的一半
half_dim = embedding_dim // 2
# 计算缩放因子,用于后续的嵌入计算
emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
# 计算每个位置的嵌入值的指数
emb = torch.exp(torch.arange(half_dim, dtype=dtype) * -emb)
# 将 w 转换为指定 dtype 并与嵌入值相乘
emb = w.to(dtype)[:, None] * emb[None, :]
# 将正弦和余弦值拼接在一起,形成最终的嵌入
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
# 如果嵌入维度为奇数,则在最后进行零填充
if embedding_dim % 2 == 1: # zero pad
emb = torch.nn.functional.pad(emb, (0, 1))
# 确保最终嵌入的形状与预期一致
assert emb.shape == (w.shape[0], embedding_dim)
# 返回生成的嵌入
return emb
# 定义获取视图的函数,接收全景图的高度和宽度
def get_views(
self,
panorama_height: int,
panorama_width: int,
window_size: int = 64,
stride: int = 8,
circular_padding: bool = False,
# 返回视图坐标列表的函数签名,返回值类型为包含四个整数的元组列表
) -> List[Tuple[int, int, int, int]]:
# 生成视图的文档字符串,说明函数的参数和返回值
"""
# 将全景图的高度除以 8,得到缩放后的高度
panorama_height /= 8
# 将全景图的宽度除以 8,得到缩放后的宽度
panorama_width /= 8
# 计算块的高度,如果全景图高度大于窗口大小,则计算块数,否则返回 1
num_blocks_height = (panorama_height - window_size) // stride + 1 if panorama_height > window_size else 1
# 如果应用循环填充,计算块的宽度
if circular_padding:
num_blocks_width = panorama_width // stride if panorama_width > window_size else 1
# 否则,根据宽度计算块的数量
else:
num_blocks_width = (panorama_width - window_size) // stride + 1 if panorama_width > window_size else 1
# 计算总块数
total_num_blocks = int(num_blocks_height * num_blocks_width)
# 初始化视图列表
views = []
# 遍历每个块,计算其起始和结束坐标
for i in range(total_num_blocks):
# 计算当前块的高度起始坐标
h_start = int((i // num_blocks_width) * stride)
# 计算当前块的高度结束坐标
h_end = h_start + window_size
# 计算当前块的宽度起始坐标
w_start = int((i % num_blocks_width) * stride)
# 计算当前块的宽度结束坐标
w_end = w_start + window_size
# 将当前块的坐标添加到视图列表中
views.append((h_start, h_end, w_start, w_end))
# 返回所有视图的坐标列表
return views
# 定义一个属性,用于获取引导比例
@property
def guidance_scale(self):
# 返回内部存储的引导比例值
return self._guidance_scale
# 定义一个属性,用于获取引导重标定值
@property
def guidance_rescale(self):
# 返回内部存储的引导重标定值
return self._guidance_rescale
# 定义一个属性,用于获取交叉注意力的关键字参数
@property
def cross_attention_kwargs(self):
# 返回内部存储的交叉注意力关键字参数
return self._cross_attention_kwargs
# 定义一个属性,用于获取剪辑跳过的值
@property
def clip_skip(self):
# 返回内部存储的剪辑跳过值
return self._clip_skip
# 定义一个属性,用于判断是否进行分类器自由引导
@property
def do_classifier_free_guidance(self):
# 始终返回 False,表示不进行分类器自由引导
return False
# 定义一个属性,用于获取时间步数
@property
def num_timesteps(self):
# 返回内部存储的时间步数值
return self._num_timesteps
# 定义一个属性,用于获取中断标志
@property
def interrupt(self):
# 返回内部存储的中断标志
return self._interrupt
# 取消梯度计算的上下文装饰器,优化内存使用
@torch.no_grad()
# 替换文档字符串的装饰器,提供示例文档
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用的方法,允许对象像函数一样被调用
def __call__(
# 输入提示,可以是字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 输出图像的高度,默认为 512
height: Optional[int] = 512,
# 输出图像的宽度,默认为 2048
width: Optional[int] = 2048,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 时间步的列表,可以用于控制推理过程
timesteps: List[int] = None,
# 指导比例,默认为 7.5
guidance_scale: float = 7.5,
# 视图批量大小,默认为 1
view_batch_size: int = 1,
# 负向提示,可以是字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 额外参数,用于控制生成过程的随机性,默认为 0.0
eta: float = 0.0,
# 生成器,可以是单个生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,可以用于控制生成图像的特征
latents: Optional[torch.Tensor] = None,
# 提示的嵌入表示,可以用于优化生成过程
prompt_embeds: Optional[torch.Tensor] = None,
# 负向提示的嵌入表示
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 图像输入,可能用于适配器
ip_adapter_image: Optional[PipelineImageInput] = None,
# 图像嵌入的列表,可能用于适配器
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认为 "pil" 图像格式
output_type: Optional[str] = "pil",
# 是否返回字典格式的输出,默认为 True
return_dict: bool = True,
# 交叉注意力的额外参数字典
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 指导再缩放的值,默认为 0.0
guidance_rescale: float = 0.0,
# 是否使用循环填充,默认为 False
circular_padding: bool = False,
# 可选的剪切跳过参数
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 步骤结束时的张量输入列表,默认为包含 "latents"
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 其他任意关键字参数
**kwargs: Any,
.\diffusers\pipelines\stable_diffusion_panorama\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查时的条件导入
from typing import TYPE_CHECKING
# 从上层模块的 utils 导入多个工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入慢速导入的标志
OptionalDependencyNotAvailable, # 导入可选依赖不可用异常
_LazyModule, # 导入延迟加载模块的工具
get_objects_from_module, # 导入从模块获取对象的工具函数
is_torch_available, # 导入检查 PyTorch 是否可用的函数
is_transformers_available, # 导入检查 Transformers 是否可用的函数
)
# 初始化一个空字典,用于存储占位对象
_dummy_objects = {}
# 初始化一个空字典,用于存储模块导入结构
_import_structure = {}
# 尝试检查 Transformers 和 PyTorch 是否可用
try:
if not (is_transformers_available() and is_torch_available()): # 如果两个库都不可用
raise OptionalDependencyNotAvailable() # 抛出可选依赖不可用异常
except OptionalDependencyNotAvailable: # 捕获可选依赖不可用异常
# 从 utils 导入 dummy_torch_and_transformers_objects,用于占位
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新 _dummy_objects 字典,获取占位对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果库可用,更新导入结构,添加 StableDiffusionPanoramaPipeline
_import_structure["pipeline_stable_diffusion_panorama"] = ["StableDiffusionPanoramaPipeline"]
# 检查是否在类型检查阶段或慢速导入标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 再次检查 Transformers 和 PyTorch 是否可用
if not (is_transformers_available() and is_torch_available()): # 如果不可用
raise OptionalDependencyNotAvailable() # 抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
# 从 utils 导入占位对象,使用通配符导入所有对象
from ...utils.dummy_torch_and_transformers_objects import *
else:
# 如果库可用,从当前模块导入 StableDiffusionPanoramaPipeline
from .pipeline_stable_diffusion_panorama import StableDiffusionPanoramaPipeline
else: # 如果不是类型检查阶段且不是慢速导入
import sys # 导入 sys 模块
# 使用 LazyModule 创建一个延迟加载的模块
sys.modules[__name__] = _LazyModule(
__name__, # 模块名称
globals()["__file__"], # 模块文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块的规范
)
# 将占位对象设置到当前模块中
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value) # 为当前模块设置属性
.\diffusers\pipelines\stable_diffusion_safe\pipeline_output.py
# 从 dataclasses 模块导入 dataclass 装饰器,用于简化类的定义
from dataclasses import dataclass
# 从 typing 模块导入 List、Optional 和 Union 类型提示,用于类型注解
from typing import List, Optional, Union
# 导入 numpy 库,常用于数值计算和处理数组
import numpy as np
# 导入 PIL.Image 模块,用于图像处理
import PIL.Image
# 从相对路径的 utils 模块中导入 BaseOutput 类,作为输出类的基类
from ...utils import (
BaseOutput,
)
# 定义 StableDiffusionSafePipelineOutput 类,继承自 BaseOutput
@dataclass
class StableDiffusionSafePipelineOutput(BaseOutput):
"""
Safe Stable Diffusion 管道的输出类。
参数:
images (`List[PIL.Image.Image]` 或 `np.ndarray`)
长度为 `batch_size` 的去噪 PIL 图像列表,或形状为 `(batch_size, height, width,
num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组表示扩散管道的去噪图像。
nsfw_content_detected (`List[bool]`)
标志列表,表示对应生成的图像是否可能代表“成人内容”
(nsfw) 的内容,如果无法执行安全检查,则为 `None`。
unsafe_images (`List[PIL.Image.Image]` 或 `np.ndarray`)
被安全检查器标记的去噪 PIL 图像列表,可能包含“成人内容”
(nsfw) 的图像,或如果未执行安全检查或未标记图像,则为 `None`。
applied_safety_concept (`str`)
应用的安全概念,用于安全指导,如果禁用安全指导,则为 `None`
"""
# 定义类属性,images 可以是 PIL 图像列表或 numpy 数组
images: Union[List[PIL.Image.Image], np.ndarray]
# 定义可选的 nsfw_content_detected 属性,表示安全检查结果
nsfw_content_detected: Optional[List[bool]]
# 定义可选的 unsafe_images 属性,表示被标记为不安全的图像
unsafe_images: Optional[Union[List[PIL.Image.Image], np.ndarray]]
# 定义可选的 applied_safety_concept 属性,表示应用的安全概念
applied_safety_concept: Optional[str]
.\diffusers\pipelines\stable_diffusion_safe\pipeline_stable_diffusion_safe.py
# 导入 inspect 模块用于检查活跃对象的来源
import inspect
# 导入 warnings 模块以发出警告信息
import warnings
# 导入类型提示相关的类型
from typing import Callable, List, Optional, Union
# 导入 numpy 库以进行数值计算
import numpy as np
# 导入 torch 库用于深度学习
import torch
# 导入版本管理工具
from packaging import version
# 从 transformers 库导入必要的类用于处理图像和文本
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection
# 从上级模块导入 FrozenDict 类
from ...configuration_utils import FrozenDict
# 从上级模块导入 PipelineImageInput 类
from ...image_processor import PipelineImageInput
# 从上级模块导入 IPAdapterMixin 类
from ...loaders import IPAdapterMixin
# 从上级模块导入模型类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel
# 从上级模块导入调度器类
from ...schedulers import KarrasDiffusionSchedulers
# 从上级模块导入工具函数
from ...utils import deprecate, logging
# 从工具模块导入随机张量生成函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入 DiffusionPipeline 和 StableDiffusionMixin
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从当前模块导入输出类
from . import StableDiffusionSafePipelineOutput
# 从当前模块导入安全检查器
from .safety_checker import SafeStableDiffusionSafetyChecker
# 创建日志记录器实例以便于记录信息
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个安全的稳定扩散管道类,继承自 DiffusionPipeline 和其他混合类
class StableDiffusionPipelineSafe(DiffusionPipeline, StableDiffusionMixin, IPAdapterMixin):
r"""
基于 [`StableDiffusionPipeline`] 的管道,用于使用安全的潜在扩散进行文本到图像生成。
此模型继承自 [`DiffusionPipeline`]。查看超类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。
该管道还继承以下加载方法:
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器(VAE)模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行标记的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在表示的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码图像潜在表示。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的一个。
safety_checker ([`StableDiffusionSafetyChecker`]):
评估生成的图像是否可能被认为具有攻击性或有害的分类模块。
有关模型潜在危害的更多详细信息,请参阅 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于从生成的图像中提取特征的 `CLIPImageProcessor`;作为输入传递给 `safety_checker`。
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 初始化方法,用于创建类的实例
def __init__(
self, # 方法参数列表的开始
vae: AutoencoderKL, # 变分自编码器实例
text_encoder: CLIPTextModel, # 文本编码器实例
tokenizer: CLIPTokenizer, # 分词器实例
unet: UNet2DConditionModel, # U-Net 条件模型实例
scheduler: KarrasDiffusionSchedulers, # Karras 扩散调度器实例
safety_checker: SafeStableDiffusionSafetyChecker, # 安全检查器实例
feature_extractor: CLIPImageProcessor, # 特征提取器实例
image_encoder: Optional[CLIPVisionModelWithProjection] = None, # 可选的图像编码器实例
requires_safety_checker: bool = True, # 是否需要安全检查器的标志
):
# 安全概念的属性获取器
@property
def safety_concept(self):
r""" # 文档字符串,描述获取器的功能
Getter method for the safety concept used with SLD # 获取安全概念的方法
Returns:
`str`: The text describing the safety concept # 返回安全概念的文本描述
"""
return self._safety_text_concept # 返回安全概念的内部文本
# 安全概念的属性设置器
@safety_concept.setter
def safety_concept(self, concept):
r""" # 文档字符串,描述设置器的功能
Setter method for the safety concept used with SLD # 设置安全概念的方法
Args:
concept (`str`): # 参数说明,新的安全概念文本
The text of the new safety concept
"""
self._safety_text_concept = concept # 设置新的安全概念文本
# 编码提示的方法
def _encode_prompt(
self, # 方法参数列表的开始
prompt, # 输入的提示文本
device, # 运行设备(CPU/GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否执行无分类器引导
negative_prompt, # 负提示文本
enable_safety_guidance, # 是否启用安全引导
):
# 运行安全检查器的方法
def run_safety_checker(self, image, device, dtype, enable_safety_guidance):
# 检查安全检查器是否存在
if self.safety_checker is not None:
images = image.copy() # 复制输入图像以供检查
# 提取特征并转换为张量,准备安全检查器的输入
safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)
# 运行安全检查器,检查图像是否包含 NSFW 内容
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype) # 传递图像和特征输入
)
# 创建一个标记图像的数组
flagged_images = np.zeros((2, *image.shape[1:])) # 用于存放被标记的图像
# 如果检测到 NSFW 概念
if any(has_nsfw_concept):
logger.warning( # 记录警告信息
"Potential NSFW content was detected in one or more images. A black image will be returned"
" instead."
f"{'You may look at this images in the `unsafe_images` variable of the output at your own discretion.' if enable_safety_guidance else 'Try again with a different prompt and/or seed.'}"
)
# 遍历每个图像,检查是否存在 NSFW 概念
for idx, has_nsfw_concept in enumerate(has_nsfw_concept):
if has_nsfw_concept: # 如果检测到 NSFW 概念
flagged_images[idx] = images[idx] # 保存被标记的图像
image[idx] = np.zeros(image[idx].shape) # 将该图像替换为黑色图像
else: # 如果没有安全检查器
has_nsfw_concept = None # NSFW 概念为 None
flagged_images = None # 被标记的图像为 None
return image, has_nsfw_concept, flagged_images # 返回处理后的图像和概念信息
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制的代码
# 解码潜在变量以生成图像
def decode_latents(self, latents):
# 生成关于 decode_latents 方法被弃用的提示信息
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用 deprecate 函数记录方法弃用信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据缩放因子调整潜在变量的值
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量并获取生成的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像值缩放到 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,并进行必要的形状变换
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回生成的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_extra_step_kwargs(self, generator, eta):
# 准备额外的参数以用于调度器步骤,因为并非所有调度器都有相同的签名
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将被忽略
# eta 对应 DDIM 论文中的 η,范围应在 [0, 1] 之间
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤参数的字典
extra_step_kwargs = {}
# 如果接受 eta 参数,则将其添加到字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator 参数,则将其添加到字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.StableDiffusionKDiffusionPipeline.check_inputs 复制
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否都能被8整除
if height % 8 != 0 or width % 8 != 0:
# 如果不能整除,则抛出值错误,提示高度和宽度的当前值
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步数是否有效
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 如果回调步数不是正整数,则抛出值错误,提示其当前值和类型
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查给定的回调输入是否在已定义的回调张量输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有不在列表中的输入,则抛出值错误,列出不符合的输入
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了提示和提示嵌入
if prompt is not None and prompt_embeds is not None:
# 如果两者都提供,则抛出值错误,说明只允许提供其中一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否都未提供提示和提示嵌入
elif prompt is None and prompt_embeds is None:
# 如果两者都未定义,则抛出值错误,提示至少需要提供一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示是否为有效类型
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 如果提示不是字符串或列表,则抛出值错误,提示其当前类型
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了负提示和负提示嵌入
if negative_prompt is not None and negative_prompt_embeds is not None:
# 如果两者都提供,则抛出值错误,说明只允许提供其中一个
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入的形状是否相同
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 如果形状不匹配,则抛出值错误,说明两者的形状必须相同
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 复制自 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents
# 准备潜在变量的函数,定义输入参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,考虑 VAE 的缩放因子
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器的类型及其长度是否与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出值错误,提示生成器数量与批次大小不一致
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在变量,则生成随机的潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,将其转移到指定设备
latents = latents.to(device)
# 将初始噪声按调度器要求的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 执行安全引导的函数,定义输入参数
def perform_safety_guidance(
self,
enable_safety_guidance,
safety_momentum,
noise_guidance,
noise_pred_out,
i,
sld_guidance_scale,
sld_warmup_steps,
sld_threshold,
sld_momentum_scale,
sld_mom_beta,
):
# 如果启用了安全引导
if enable_safety_guidance:
# 如果安全动量未定义,则初始化为与噪声引导相同形状的零张量
if safety_momentum is None:
safety_momentum = torch.zeros_like(noise_guidance)
# 从噪声预测输出中提取文本噪声和无条件噪声预测
noise_pred_text, noise_pred_uncond = noise_pred_out[0], noise_pred_out[1]
noise_pred_safety_concept = noise_pred_out[2]
# 计算安全引导的比例(公式6)
scale = torch.clamp(torch.abs((noise_pred_text - noise_pred_safety_concept)) * sld_guidance_scale, max=1.0)
# 根据阈值计算安全概念的比例(公式6)
safety_concept_scale = torch.where(
(noise_pred_text - noise_pred_safety_concept) >= sld_threshold, torch.zeros_like(scale), scale
)
# 计算安全噪声引导(公式4)
noise_guidance_safety = torch.mul((noise_pred_safety_concept - noise_pred_uncond), safety_concept_scale)
# 将动量加入安全噪声引导(公式7)
noise_guidance_safety = noise_guidance_safety + sld_momentum_scale * safety_momentum
# 更新安全动量(公式8)
safety_momentum = sld_mom_beta * safety_momentum + (1 - sld_mom_beta) * noise_guidance_safety
# 如果当前步骤超过暖身步骤
if i >= sld_warmup_steps: # Warmup
# 根据安全噪声引导调整总噪声引导(公式3)
noise_guidance = noise_guidance - noise_guidance_safety
# 返回调整后的噪声引导和安全动量
return noise_guidance, safety_momentum
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_image 复制的内容
# 定义一个编码图像的方法,接受图像及其相关参数
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入的图像不是张量类型
if not isinstance(image, torch.Tensor):
# 使用特征提取器将图像转换为张量,并返回张量的像素值
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定的设备,并转换为正确的数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 编码图像并获取倒数第二层的隐藏状态
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 将隐藏状态重复指定次数,沿着第0维
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 对于无条件图像,创建一个与输入图像相同形状的零张量,并编码其隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 将无条件图像的隐藏状态重复指定次数,沿着第0维
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回编码后的图像和无条件图像的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 编码图像并获取图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 将图像嵌入重复指定次数,沿着第0维
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入相同形状的零张量作为无条件图像嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回编码后的图像嵌入和无条件图像嵌入
return image_embeds, uncond_image_embeds
# 使用装饰器禁止梯度计算
@torch.no_grad()
# 定义一个可调用的方法,接受多个参数
def __call__(
self,
prompt: Union[str, List[str]], # 输入提示,可以是字符串或字符串列表
height: Optional[int] = None, # 可选的图像高度
width: Optional[int] = None, # 可选的图像宽度
num_inference_steps: int = 50, # 推理步骤数量,默认为50
guidance_scale: float = 7.5, # 引导尺度,默认为7.5
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负面提示
num_images_per_prompt: Optional[int] = 1, # 每个提示生成的图像数量,默认为1
eta: float = 0.0, # 额外参数,默认为0.0
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 可选的随机数生成器
latents: Optional[torch.Tensor] = None, # 可选的潜在张量
ip_adapter_image: Optional[PipelineImageInput] = None, # 可选的适配器图像输入
output_type: Optional[str] = "pil", # 输出类型,默认为"pil"
return_dict: bool = True, # 是否返回字典格式,默认为True
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, # 可选的回调函数
callback_steps: int = 1, # 回调步骤,默认为1
sld_guidance_scale: Optional[float] = 1000, # 可选的平滑引导尺度,默认为1000
sld_warmup_steps: Optional[int] = 10, # 可选的预热步骤,默认为10
sld_threshold: Optional[float] = 0.01, # 可选的阈值,默认为0.01
sld_momentum_scale: Optional[float] = 0.3, # 可选的动量尺度,默认为0.3
sld_mom_beta: Optional[float] = 0.4, # 可选的动量贝塔,默认为0.4
.\diffusers\pipelines\stable_diffusion_safe\safety_checker.py
# 版权声明,表明此文件归 HuggingFace 团队所有
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache License 2.0 版许可使用此文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 仅在遵守许可的情况下使用此文件
# you may not use this file except in compliance with the License.
# 可以在以下网址获取许可证的副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非根据适用法律或书面协议另有规定,否则根据许可证分发的软件是“按原样”提供的
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 不提供任何明示或暗示的担保或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 请参阅许可证以了解特定的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入 PyTorch 库
import torch
# 导入 PyTorch 的神经网络模块
import torch.nn as nn
# 从 transformers 库导入 CLIP 配置和模型
from transformers import CLIPConfig, CLIPVisionModel, PreTrainedModel
# 从上级模块导入日志工具
from ...utils import logging
# 创建一个记录器,用于记录日志信息
logger = logging.get_logger(__name__)
# 定义计算余弦距离的函数
def cosine_distance(image_embeds, text_embeds):
# 对图像嵌入进行归一化处理
normalized_image_embeds = nn.functional.normalize(image_embeds)
# 对文本嵌入进行归一化处理
normalized_text_embeds = nn.functional.normalize(text_embeds)
# 返回图像嵌入与文本嵌入的余弦相似度矩阵
return torch.mm(normalized_image_embeds, normalized_text_embeds.t())
# 定义安全的稳定扩散安全检查器类,继承自预训练模型
class SafeStableDiffusionSafetyChecker(PreTrainedModel):
# 设置配置类为 CLIPConfig
config_class = CLIPConfig
# 定义不进行模型分割的模块列表
_no_split_modules = ["CLIPEncoderLayer"]
# 初始化方法,接收 CLIPConfig 配置
def __init__(self, config: CLIPConfig):
# 调用父类构造函数
super().__init__(config)
# 创建 CLIP 视觉模型实例
self.vision_model = CLIPVisionModel(config.vision_config)
# 定义线性层,用于视觉投影
self.visual_projection = nn.Linear(config.vision_config.hidden_size, config.projection_dim, bias=False)
# 定义概念嵌入参数,不需要梯度更新
self.concept_embeds = nn.Parameter(torch.ones(17, config.projection_dim), requires_grad=False)
# 定义特殊关注嵌入参数,不需要梯度更新
self.special_care_embeds = nn.Parameter(torch.ones(3, config.projection_dim), requires_grad=False)
# 定义概念嵌入权重参数,不需要梯度更新
self.concept_embeds_weights = nn.Parameter(torch.ones(17), requires_grad=False)
# 定义特殊关注嵌入权重参数,不需要梯度更新
self.special_care_embeds_weights = nn.Parameter(torch.ones(3), requires_grad=False)
# 使用装饰器,表示该方法在计算图中不需要梯度
@torch.no_grad()
# 定义前向传播方法,接受 CLIP 输入和图像
def forward(self, clip_input, images):
# 使用视觉模型处理 CLIP 输入,并提取池化输出
pooled_output = self.vision_model(clip_input)[1] # pooled_output
# 将池化输出进行视觉投影,生成图像嵌入
image_embeds = self.visual_projection(pooled_output)
# 计算图像嵌入与特殊关心嵌入之间的余弦距离,并转换为 NumPy 数组
special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds).cpu().float().numpy()
# 计算图像嵌入与概念嵌入之间的余弦距离,并转换为 NumPy 数组
cos_dist = cosine_distance(image_embeds, self.concept_embeds).cpu().float().numpy()
# 初始化结果列表
result = []
# 获取当前批次的大小
batch_size = image_embeds.shape[0]
# 遍历每个图像
for i in range(batch_size):
# 初始化结果字典,存储分数和关心概念
result_img = {"special_scores": {}, "special_care": [], "concept_scores": {}, "bad_concepts": []}
# 调整值,增加此值将增强 `nfsw` 过滤器的强度
adjustment = 0.0
# 遍历特殊关心嵌入的余弦距离
for concept_idx in range(len(special_cos_dist[0])):
concept_cos = special_cos_dist[i][concept_idx] # 获取特定概念的余弦距离
concept_threshold = self.special_care_embeds_weights[concept_idx].item() # 获取对应阈值
# 计算特殊分数并进行四舍五入
result_img["special_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
# 如果分数大于零,记录特殊关心概念及其分数
if result_img["special_scores"][concept_idx] > 0:
result_img["special_care"].append({concept_idx, result_img["special_scores"][concept_idx]})
adjustment = 0.01 # 调整值增加,防止重复
# 遍历概念嵌入的余弦距离
for concept_idx in range(len(cos_dist[0])):
concept_cos = cos_dist[i][concept_idx] # 获取特定概念的余弦距离
concept_threshold = self.concept_embeds_weights[concept_idx].item() # 获取对应阈值
# 计算概念分数并进行四舍五入
result_img["concept_scores"][concept_idx] = round(concept_cos - concept_threshold + adjustment, 3)
# 如果分数大于零,记录不良概念
if result_img["concept_scores"][concept_idx] > 0:
result_img["bad_concepts"].append(concept_idx)
# 将当前图像结果添加到总结果中
result.append(result_img)
# 检查是否存在不良概念
has_nsfw_concepts = [len(res["bad_concepts"]) > 0 for res in result]
# 返回图像和不良概念的标记
return images, has_nsfw_concepts
# 禁用梯度计算,减少内存消耗
@torch.no_grad()
# 定义前向传播方法,接受输入张量和图像张量
def forward_onnx(self, clip_input: torch.Tensor, images: torch.Tensor):
# 通过视觉模型处理输入,获取池化输出
pooled_output = self.vision_model(clip_input)[1] # pooled_output
# 将池化输出通过视觉投影生成图像嵌入
image_embeds = self.visual_projection(pooled_output)
# 计算图像嵌入与特殊关注嵌入之间的余弦距离
special_cos_dist = cosine_distance(image_embeds, self.special_care_embeds)
# 计算图像嵌入与概念嵌入之间的余弦距离
cos_dist = cosine_distance(image_embeds, self.concept_embeds)
# 增加该值以创建更强的“nsfw”过滤器
# 但可能增加误过滤无害图像的概率
adjustment = 0.0
# 计算特殊评分,调整后与特殊关注嵌入权重比较
special_scores = special_cos_dist - self.special_care_embeds_weights + adjustment
# 取整到小数点后三位(已注释)
# special_scores = special_scores.round(decimals=3)
# 判断特殊评分是否大于0,生成布尔值张量
special_care = torch.any(special_scores > 0, dim=1)
# 为特殊关注创建调整值
special_adjustment = special_care * 0.01
# 扩展调整值以匹配余弦距离的形状
special_adjustment = special_adjustment.unsqueeze(1).expand(-1, cos_dist.shape[1])
# 计算概念评分,加入特殊调整
concept_scores = (cos_dist - self.concept_embeds_weights) + special_adjustment
# 取整到小数点后三位(已注释)
# concept_scores = concept_scores.round(decimals=3)
# 判断概念评分是否大于0,生成布尔值张量
has_nsfw_concepts = torch.any(concept_scores > 0, dim=1)
# 返回图像和是否存在nsfw概念的布尔值
return images, has_nsfw_concepts
.\diffusers\pipelines\stable_diffusion_safe\__init__.py
# 导入数据类装饰器,用于定义简单的类
from dataclasses import dataclass
# 导入枚举类,用于定义常量
from enum import Enum
# 导入类型检查相关工具
from typing import TYPE_CHECKING, List, Optional, Union
# 导入 NumPy 库
import numpy as np
# 导入 PIL 库用于图像处理
import PIL
# 从 PIL 导入图像模块
from PIL import Image
# 从 utils 模块导入多个工具和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢导入的标志
BaseOutput, # 基础输出类
OptionalDependencyNotAvailable, # 可选依赖不可用的异常
_LazyModule, # 懒加载模块的类
get_objects_from_module, # 从模块获取对象的函数
is_torch_available, # 检查 Torch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 定义安全配置的数据类
@dataclass
class SafetyConfig(object):
# 弱安全级别配置
WEAK = {
"sld_warmup_steps": 15, # 预热步骤数
"sld_guidance_scale": 20, # 引导比例
"sld_threshold": 0.0, # 阈值
"sld_momentum_scale": 0.0, # 动量比例
"sld_mom_beta": 0.0, # 动量 beta 值
}
# 中安全级别配置
MEDIUM = {
"sld_warmup_steps": 10, # 预热步骤数
"sld_guidance_scale": 1000, # 引导比例
"sld_threshold": 0.01, # 阈值
"sld_momentum_scale": 0.3, # 动量比例
"sld_mom_beta": 0.4, # 动量 beta 值
}
# 强安全级别配置
STRONG = {
"sld_warmup_steps": 7, # 预热步骤数
"sld_guidance_scale": 2000, # 引导比例
"sld_threshold": 0.025, # 阈值
"sld_momentum_scale": 0.5, # 动量比例
"sld_mom_beta": 0.7, # 动量 beta 值
}
# 最大安全级别配置
MAX = {
"sld_warmup_steps": 0, # 预热步骤数
"sld_guidance_scale": 5000, # 引导比例
"sld_threshold": 1.0, # 阈值
"sld_momentum_scale": 0.5, # 动量比例
"sld_mom_beta": 0.7, # 动量 beta 值
}
# 初始化空字典,存储虚拟对象
_dummy_objects = {}
# 初始化空字典,存储额外导入的对象
_additional_imports = {}
# 初始化导入结构的字典
_import_structure = {}
# 更新额外导入,添加 SafetyConfig 类
_additional_imports.update({"SafetyConfig": SafetyConfig})
# 尝试检查可选依赖
try:
if not (is_transformers_available() and is_torch_available()): # 检查 Transformers 和 Torch 是否可用
raise OptionalDependencyNotAvailable() # 若不可用,抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
# 从 utils 导入虚拟对象以处理依赖问题
from ...utils import dummy_torch_and_transformers_objects
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else: # 若依赖可用
# 更新导入结构字典,添加相关模块
_import_structure.update(
{
"pipeline_output": ["StableDiffusionSafePipelineOutput"], # 管道输出模块
"pipeline_stable_diffusion_safe": ["StableDiffusionPipelineSafe"], # 稳定扩散安全管道
"safety_checker": ["StableDiffusionSafetyChecker"], # 安全检查器
}
)
# 如果进行类型检查或慢导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
if not (is_transformers_available() and is_torch_available()): # 检查依赖可用性
raise OptionalDependencyNotAvailable() # 若不可用,抛出异常
except OptionalDependencyNotAvailable: # 捕获异常
# 从 utils 导入虚拟对象
from ...utils.dummy_torch_and_transformers_objects import *
else: # 若依赖可用
# 从模块导入所需类
from .pipeline_output import StableDiffusionSafePipelineOutput
from .pipeline_stable_diffusion_safe import StableDiffusionPipelineSafe
from .safety_checker import SafeStableDiffusionSafetyChecker
else: # 若不是进行类型检查或慢导入
import sys # 导入 sys 模块
# 使用懒加载模块更新当前模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
# 将虚拟对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
# 将额外导入的对象添加到当前模块
for name, value in _additional_imports.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_sag\pipeline_stable_diffusion_sag.py
# 版权声明,说明此文件的版权所有者和许可信息
# Copyright 2024 Susung Hong and The HuggingFace Team. All rights reserved.
#
# 根据 Apache 2.0 许可协议授权使用此文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 使用此文件前需遵循许可协议
# you may not use this file except in compliance with the License.
# 可在以下链接获取许可的副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则软件按 "AS IS" 基础分发,不提供任何形式的担保
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以了解特定的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect # 导入 inspect 模块,用于检查对象的属性和方法
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示所需的类型
import torch # 导入 PyTorch 库,用于深度学习
import torch.nn.functional as F # 导入 PyTorch 的函数式 API,用于常用的神经网络功能
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModelWithProjection # 从 transformers 导入 CLIP 相关类
from ...image_processor import PipelineImageInput, VaeImageProcessor # 从图像处理模块导入输入和处理器类
from ...loaders import IPAdapterMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器混合类
from ...models import AutoencoderKL, ImageProjection, UNet2DConditionModel # 导入模型类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入调度器类
from ...utils import ( # 导入常用工具函数
USE_PEFT_BACKEND,
deprecate,
logging,
replace_example_docstring,
scale_lora_layers,
unscale_lora_layers,
)
from ...utils.torch_utils import randn_tensor # 导入用于生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道和混合类
from ..stable_diffusion import StableDiffusionPipelineOutput # 导入稳定扩散管道的输出类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 导入安全检查器类
logger = logging.get_logger(__name__) # 初始化记录器,使用当前模块名作为标识
EXAMPLE_DOC_STRING = """ # 示例文档字符串,用于展示如何使用管道
Examples:
```py
>>> import torch # 导入 PyTorch 库
>>> from diffusers import StableDiffusionSAGPipeline # 从 diffusers 导入 StableDiffusionSAGPipeline
>>> pipe = StableDiffusionSAGPipeline.from_pretrained( # 从预训练模型创建管道
... "runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16 # 指定模型和数据类型
... )
>>> pipe = pipe.to("cuda") # 将管道移动到 GPU
>>> prompt = "a photo of an astronaut riding a horse on mars" # 定义生成图像的提示
>>> image = pipe(prompt, sag_scale=0.75).images[0] # 生成图像并获取第一个图像
```py
"""
# 处理和存储注意力概率的类
class CrossAttnStoreProcessor:
def __init__(self): # 初始化方法
self.attention_probs = None # 创建一个用于存储注意力概率的属性
def __call__( # 定义可调用对象的方法
self,
attn, # 注意力张量
hidden_states, # 隐藏状态张量
encoder_hidden_states=None, # 编码器隐藏状态(可选)
attention_mask=None, # 注意力掩码(可选)
):
# 获取隐藏状态的批次大小、序列长度和特征维度
batch_size, sequence_length, _ = hidden_states.shape
# 准备注意力掩码,以便后续计算
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# 将隐藏状态转换为查询向量
query = attn.to_q(hidden_states)
# 检查编码器隐藏状态是否为 None
if encoder_hidden_states is None:
# 如果为 None,则将其设置为隐藏状态
encoder_hidden_states = hidden_states
# 如果需要进行归一化处理
elif attn.norm_cross:
# 对编码器隐藏状态进行归一化
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将编码器隐藏状态转换为键向量
key = attn.to_k(encoder_hidden_states)
# 将编码器隐藏状态转换为值向量
value = attn.to_v(encoder_hidden_states)
# 将查询向量从头维度转换为批次维度
query = attn.head_to_batch_dim(query)
# 将键向量从头维度转换为批次维度
key = attn.head_to_batch_dim(key)
# 将值向量从头维度转换为批次维度
value = attn.head_to_batch_dim(value)
# 计算注意力分数
self.attention_probs = attn.get_attention_scores(query, key, attention_mask)
# 使用注意力分数和值向量进行批次矩阵乘法,得到新的隐藏状态
hidden_states = torch.bmm(self.attention_probs, value)
# 将隐藏状态从批次维度转换回头维度
hidden_states = attn.batch_to_head_dim(hidden_states)
# 进行线性变换以获取最终的隐藏状态
hidden_states = attn.to_out[0](hidden_states)
# 应用 dropout 以减少过拟合
hidden_states = attn.to_out[1](hidden_states)
# 返回处理后的隐藏状态
return hidden_states
# 修改以获取自注意力引导缩放,在此论文中作为输入 (https://arxiv.org/pdf/2210.00939.pdf)
class StableDiffusionSAGPipeline(DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, IPAdapterMixin):
r"""
使用稳定扩散的文本到图像生成管道。
此模型继承自 [`DiffusionPipeline`]。查看超类文档以获取所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
此管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.IPAdapterMixin.load_ip_adapter`] 用于加载 IP 适配器
参数:
vae ([`AutoencoderKL`]):
用于编码和解码图像到潜在表示的变分自编码器 (VAE) 模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
一个 `CLIPTokenizer` 用于对文本进行标记化。
unet ([`UNet2DConditionModel`]):
一个 `UNet2DConditionModel` 用于去噪编码的图像潜在。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 结合使用以去噪编码的图像潜在的调度器。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
估计生成的图像是否可能被视为冒犯或有害的分类模块。
请参考 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5) 获取更多
关于模型潜在危害的详细信息。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
一个 `CLIPImageProcessor` 用于从生成的图像中提取特征;用作 `safety_checker` 的输入。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件的列表
_optional_components = ["safety_checker", "feature_extractor", "image_encoder"]
# 定义不包括在 CPU 卸载中的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,接受多个参数以配置管道
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler: KarrasDiffusionSchedulers,
safety_checker: StableDiffusionSafetyChecker,
feature_extractor: CLIPImageProcessor,
# 可选的图像编码器,默认为 None
image_encoder: Optional[CLIPVisionModelWithProjection] = None,
# 指示是否需要安全检查器的布尔值,默认为 True
requires_safety_checker: bool = True,
# 初始化父类
):
super().__init__()
# 注册各个模块,供后续使用
self.register_modules(
vae=vae, # 注册变分自编码器
text_encoder=text_encoder, # 注册文本编码器
tokenizer=tokenizer, # 注册分词器
unet=unet, # 注册 U-Net 模型
scheduler=scheduler, # 注册调度器
safety_checker=safety_checker, # 注册安全检查器
feature_extractor=feature_extractor, # 注册特征提取器
image_encoder=image_encoder, # 注册图像编码器
)
# 计算 VAE 的缩放因子,基于块输出通道的长度
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 将配置注册到当前实例,指定是否需要安全检查器
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 复制的编码提示方法
def _encode_prompt(
self,
prompt, # 输入提示
device, # 设备信息
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=None, # 负提示(可选)
prompt_embeds: Optional[torch.Tensor] = None, # 提示嵌入(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None, # 负提示嵌入(可选)
lora_scale: Optional[float] = None, # Lora 缩放因子(可选)
**kwargs, # 其他参数
):
# 设置废弃警告信息
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 发出废弃警告
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法获取提示嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt,
device=device,
num_images_per_prompt=num_images_per_prompt,
do_classifier_free_guidance=do_classifier_free_guidance,
negative_prompt=negative_prompt,
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
lora_scale=lora_scale,
**kwargs,
)
# 将嵌入元组连接以兼容旧版本
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回合并后的提示嵌入
return prompt_embeds
# 从 StableDiffusionPipeline 复制的编码提示方法
def encode_prompt(
self,
prompt, # 输入提示
device, # 设备信息
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=None, # 负提示(可选)
prompt_embeds: Optional[torch.Tensor] = None, # 提示嵌入(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None, # 负提示嵌入(可选)
lora_scale: Optional[float] = None, # Lora 缩放因子(可选)
clip_skip: Optional[int] = None, # 跳过剪辑的数量(可选)
# 从 StableDiffusionPipeline 复制的编码图像方法
# 定义一个方法,用于编码输入图像,返回图像的嵌入表示
def encode_image(self, image, device, num_images_per_prompt, output_hidden_states=None):
# 获取图像编码器参数的数据类型
dtype = next(self.image_encoder.parameters()).dtype
# 如果输入的图像不是张量,则使用特征提取器将其转换为张量
if not isinstance(image, torch.Tensor):
image = self.feature_extractor(image, return_tensors="pt").pixel_values
# 将图像移动到指定的设备上,并转换为正确的数据类型
image = image.to(device=device, dtype=dtype)
# 如果需要输出隐藏状态
if output_hidden_states:
# 编码图像,并获取倒数第二层的隐藏状态
image_enc_hidden_states = self.image_encoder(image, output_hidden_states=True).hidden_states[-2]
# 重复隐藏状态以匹配每个提示的图像数量
image_enc_hidden_states = image_enc_hidden_states.repeat_interleave(num_images_per_prompt, dim=0)
# 编码全零图像以获取无条件隐藏状态
uncond_image_enc_hidden_states = self.image_encoder(
torch.zeros_like(image), output_hidden_states=True
).hidden_states[-2]
# 重复无条件隐藏状态以匹配每个提示的图像数量
uncond_image_enc_hidden_states = uncond_image_enc_hidden_states.repeat_interleave(
num_images_per_prompt, dim=0
)
# 返回有条件和无条件的隐藏状态
return image_enc_hidden_states, uncond_image_enc_hidden_states
else:
# 编码图像以获取图像嵌入
image_embeds = self.image_encoder(image).image_embeds
# 重复图像嵌入以匹配每个提示的图像数量
image_embeds = image_embeds.repeat_interleave(num_images_per_prompt, dim=0)
# 创建与图像嵌入形状相同的全零张量作为无条件嵌入
uncond_image_embeds = torch.zeros_like(image_embeds)
# 返回有条件和无条件的图像嵌入
return image_embeds, uncond_image_embeds
# 定义一个方法,用于准备 IP 适配器的图像嵌入
def prepare_ip_adapter_image_embeds(
self, ip_adapter_image, ip_adapter_image_embeds, device, num_images_per_prompt, do_classifier_free_guidance
):
# 检查 ip_adapter_image_embeds 是否为 None
if ip_adapter_image_embeds is None:
# 检查 ip_adapter_image 是否为列表,如果不是则将其转换为列表
if not isinstance(ip_adapter_image, list):
ip_adapter_image = [ip_adapter_image]
# 检查 ip_adapter_image 的长度是否与 IP 适配器的数量一致
if len(ip_adapter_image) != len(self.unet.encoder_hid_proj.image_projection_layers):
# 如果不一致,抛出一个 ValueError 异常
raise ValueError(
f"`ip_adapter_image` must have same length as the number of IP Adapters. Got {len(ip_adapter_image)} images and {len(self.unet.encoder_hid_proj.image_projection_layers)} IP Adapters."
)
# 初始化一个空列表,用于存储图像嵌入
image_embeds = []
# 遍历每个图像适配器图像和对应的图像投影层
for single_ip_adapter_image, image_proj_layer in zip(
ip_adapter_image, self.unet.encoder_hid_proj.image_projection_layers
):
# 判断输出隐藏状态是否为 True,条件是图像投影层不是 ImageProjection 实例
output_hidden_state = not isinstance(image_proj_layer, ImageProjection)
# 编码单个图像,获取嵌入和负嵌入
single_image_embeds, single_negative_image_embeds = self.encode_image(
single_ip_adapter_image, device, 1, output_hidden_state
)
# 将单个图像嵌入复制 num_images_per_prompt 次,形成一个新维度
single_image_embeds = torch.stack([single_image_embeds] * num_images_per_prompt, dim=0)
# 同样处理负嵌入
single_negative_image_embeds = torch.stack(
[single_negative_image_embeds] * num_images_per_prompt, dim=0
)
# 如果执行无分类器引导
if do_classifier_free_guidance:
# 将负嵌入和正嵌入连接在一起
single_image_embeds = torch.cat([single_negative_image_embeds, single_image_embeds])
# 将嵌入数据移动到指定设备
single_image_embeds = single_image_embeds.to(device)
# 将处理后的图像嵌入添加到列表中
image_embeds.append(single_image_embeds)
else:
# 如果 ip_adapter_image_embeds 不是 None,则直接使用它
image_embeds = ip_adapter_image_embeds
# 返回最终的图像嵌入列表
return image_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
def run_safety_checker(self, image, device, dtype):
# 检查安全检查器是否为 None
if self.safety_checker is None:
# 如果安全检查器为 None,设置 has_nsfw_concept 为 None
has_nsfw_concept = None
else:
# 检查图像是否为张量
if torch.is_tensor(image):
# 如果是张量,使用图像处理器后处理图像,输出类型为 PIL
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果不是张量,将 NumPy 数组转换为 PIL 图像
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 提取特征,返回张量并移动到指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 运行安全检查器,返回处理后的图像和 nsfw 概念的存在情况
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 nsfw 概念的存在情况
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
# 解码潜在向量的方法
def decode_latents(self, latents):
# 设置过时警告信息,提示该方法将在 1.0.0 版本中被移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用 deprecate 函数记录该方法的使用和警告信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 将潜在向量根据缩放因子进行缩放
latents = 1 / self.vae.config.scaling_factor * latents
# 使用 VAE 解码潜在向量,返回图像数据
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像数据标准化到 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 类型,兼容 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像数据
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 中复制
def prepare_extra_step_kwargs(self, generator, eta):
# 准备调度器步骤的额外参数,因为并不是所有调度器都有相同的签名
# eta(η)仅用于 DDIMScheduler,其他调度器将忽略它。
# eta 对应 DDIM 论文中的 η,范围应在 [0, 1] 之间
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
# 从 diffusers.pipelines.stable_diffusion_k_diffusion.pipeline_stable_diffusion_k_diffusion.StableDiffusionKDiffusionPipeline.check_inputs 中复制
def check_inputs(
# 定义检查输入参数的方法,包含 prompt 和图像尺寸等
self,
prompt,
height,
width,
callback_steps,
# 可选的负面提示
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds=None,
# 可选的负面提示嵌入
negative_prompt_embeds=None,
# 可选的回调步骤结束时的张量输入
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否能够被8整除,若不能则抛出错误
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步骤是否为正整数,若不满足条件则抛出错误
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查在步骤结束时的回调张量输入是否在已定义的回调张量输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了 `prompt` 和 `prompt_embeds`
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否同时未提供 `prompt` 和 `prompt_embeds`
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 `prompt` 是否为字符串或列表类型
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了 `negative_prompt` 和 `negative_prompt_embeds`
if negative_prompt is not None and negative_prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 `prompt_embeds` 和 `negative_prompt_embeds` 的形状是否一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在向量的函数,接收多个参数以配置潜在向量的形状和生成方式
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在向量的形状,基于输入的批大小和通道数,调整高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor, # 高度缩放
int(width) // self.vae_scale_factor, # 宽度缩放
)
# 检查生成器是否为列表,并确保列表长度与批大小一致
if isinstance(generator, list) and len(generator) != batch_size:
# 如果不一致,抛出值错误
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在向量,则生成随机潜在向量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在向量,则将其转移到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在向量
return latents
# 禁用梯度计算以节省内存和加快推理速度
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象,接收多个参数以执行生成过程
def __call__(
# 提示文本,可以是单个字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 生成图像的高度
height: Optional[int] = None,
# 生成图像的宽度
width: Optional[int] = None,
# 推理步骤的数量
num_inference_steps: int = 50,
# 指导尺度,用于控制生成的质量
guidance_scale: float = 7.5,
# SAG尺度,用于调整特定生成效果
sag_scale: float = 0.75,
# 负面提示文本,可以是单个字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量
num_images_per_prompt: Optional[int] = 1,
# 用于生成的 eta 参数
eta: float = 0.0,
# 随机数生成器,可以是单个生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在向量,如果有的话
latents: Optional[torch.Tensor] = None,
# 提示嵌入向量
prompt_embeds: Optional[torch.Tensor] = None,
# 负面提示嵌入向量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输入适配器图像
ip_adapter_image: Optional[PipelineImageInput] = None,
# 输入适配器图像嵌入向量列表
ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
# 输出类型,默认为 PIL 图像
output_type: Optional[str] = "pil",
# 是否返回字典格式的输出
return_dict: bool = True,
# 可选的回调函数,用于在推理过程中执行某些操作
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤的频率
callback_steps: Optional[int] = 1,
# 交叉注意力的额外参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的 CLIP 层数
clip_skip: Optional[int] = None,
# 定义一个方法用于SAG掩膜处理
def sag_masking(self, original_latents, attn_map, map_size, t, eps):
# 按照SAG论文中的掩膜处理流程
bh, hw1, hw2 = attn_map.shape # 解包注意力图的维度
b, latent_channel, latent_h, latent_w = original_latents.shape # 解包原始潜变量的维度
h = self.unet.config.attention_head_dim # 获取注意力头的维度
if isinstance(h, list): # 如果h是列表
h = h[-1] # 取最后一个维度值
# 生成注意力掩膜
attn_map = attn_map.reshape(b, h, hw1, hw2) # 重新调整注意力图的形状
attn_mask = attn_map.mean(1, keepdim=False).sum(1, keepdim=False) > 1.0 # 计算掩膜,阈值为1.0
attn_mask = (
attn_mask.reshape(b, map_size[0], map_size[1]) # 重新调整掩膜形状以匹配map_size
.unsqueeze(1) # 增加一个维度
.repeat(1, latent_channel, 1, 1) # 在channel维度上重复掩膜
.type(attn_map.dtype) # 转换为与注意力图相同的数据类型
)
attn_mask = F.interpolate(attn_mask, (latent_h, latent_w)) # 按照潜变量的高度和宽度插值调整掩膜
# 根据自注意力掩膜进行模糊处理
degraded_latents = gaussian_blur_2d(original_latents, kernel_size=9, sigma=1.0) # 对原始潜变量进行高斯模糊
degraded_latents = degraded_latents * attn_mask + original_latents * (1 - attn_mask) # 按掩膜加权融合模糊与原始潜变量
# 重新加噪声以匹配噪声水平
degraded_latents = self.scheduler.add_noise(degraded_latents, noise=eps, timesteps=t[None]) # 添加噪声
return degraded_latents # 返回处理后的潜变量
# 从diffusers.schedulers.scheduling_ddim.DDIMScheduler.step修改而来
# 注意:有些调度器会裁剪或不返回x_0(PNDMScheduler, DDIMScheduler等)
def pred_x0(self, sample, model_output, timestep):
alpha_prod_t = self.scheduler.alphas_cumprod[timestep].to(sample.device) # 获取当前时间步的累积alpha值
beta_prod_t = 1 - alpha_prod_t # 计算beta值
if self.scheduler.config.prediction_type == "epsilon": # 如果预测类型为“epsilon”
pred_original_sample = (sample - beta_prod_t ** (0.5) * model_output) / alpha_prod_t ** (0.5) # 计算原始样本的预测值
elif self.scheduler.config.prediction_type == "sample": # 如果预测类型为“sample”
pred_original_sample = model_output # 直接使用模型输出作为预测值
elif self.scheduler.config.prediction_type == "v_prediction": # 如果预测类型为“v_prediction”
pred_original_sample = (alpha_prod_t**0.5) * sample - (beta_prod_t**0.5) * model_output # 计算预测值V
# 预测V
model_output = (alpha_prod_t**0.5) * model_output + (beta_prod_t**0.5) * sample # 更新模型输出
else: # 如果预测类型不匹配
raise ValueError( # 抛出异常
f"prediction_type given as {self.scheduler.config.prediction_type} must be one of `epsilon`, `sample`,"
" or `v_prediction`"
)
return pred_original_sample # 返回预测的原始样本
# 定义一个方法,用于根据给定样本和模型输出计算预测值
def pred_epsilon(self, sample, model_output, timestep):
# 获取当前时间步的累积 alpha 值
alpha_prod_t = self.scheduler.alphas_cumprod[timestep]
# 计算当前时间步的 beta 值
beta_prod_t = 1 - alpha_prod_t
# 根据预测类型选择对应的计算方式
if self.scheduler.config.prediction_type == "epsilon":
# 直接使用模型输出作为预测值
pred_eps = model_output
elif self.scheduler.config.prediction_type == "sample":
# 通过样本和模型输出计算预测值
pred_eps = (sample - (alpha_prod_t**0.5) * model_output) / (beta_prod_t**0.5)
elif self.scheduler.config.prediction_type == "v_prediction":
# 计算加权组合的预测值
pred_eps = (beta_prod_t**0.5) * sample + (alpha_prod_t**0.5) * model_output
else:
# 如果预测类型不合法,抛出错误
raise ValueError(
f"prediction_type given as {self.scheduler.config.prediction_type} must be one of `epsilon`, `sample`,"
" or `v_prediction`"
)
# 返回计算得到的预测值
return pred_eps
# 高斯模糊
def gaussian_blur_2d(img, kernel_size, sigma):
# 计算卷积核的一半大小
ksize_half = (kernel_size - 1) * 0.5
# 创建一个从 -ksize_half 到 ksize_half 的线性空间,步长为 kernel_size
x = torch.linspace(-ksize_half, ksize_half, steps=kernel_size)
# 计算概率密度函数(PDF),用于高斯分布
pdf = torch.exp(-0.5 * (x / sigma).pow(2))
# 将 PDF 归一化为高斯核
x_kernel = pdf / pdf.sum()
# 将高斯核转换为与输入图像相同的设备和数据类型
x_kernel = x_kernel.to(device=img.device, dtype=img.dtype)
# 生成二维高斯核,使用外积
kernel2d = torch.mm(x_kernel[:, None], x_kernel[None, :])
# 扩展高斯核以适应输入图像的通道数
kernel2d = kernel2d.expand(img.shape[-3], 1, kernel2d.shape[0], kernel2d.shape[1])
# 计算填充的大小,用于处理边界
padding = [kernel_size // 2, kernel_size // 2, kernel_size // 2, kernel_size // 2]
# 使用反射模式对图像进行填充
img = F.pad(img, padding, mode="reflect")
# 对填充后的图像应用卷积,使用高斯核
img = F.conv2d(img, kernel2d, groups=img.shape[-3])
# 返回模糊处理后的图像
return img
.\diffusers\pipelines\stable_diffusion_sag\__init__.py
# 导入类型检查相关的常量
from typing import TYPE_CHECKING
# 从父级模块导入一些工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 指示慢导入的标志
OptionalDependencyNotAvailable, # 可选依赖不可用的异常
_LazyModule, # 用于延迟加载模块的类
get_objects_from_module, # 从模块获取对象的函数
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
)
# 用于存储虚拟对象的字典
_dummy_objects = {}
# 用于定义导入结构的字典
_import_structure = {}
# 尝试检查依赖是否可用
try:
# 如果 Transformers 或 Torch 不可用,则抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具模块导入虚拟的 Torch 和 Transformers 对象
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新 _dummy_objects 字典,包含虚拟对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果依赖可用,更新导入结构
else:
_import_structure["pipeline_stable_diffusion_sag"] = ["StableDiffusionSAGPipeline"]
# 检查类型是否在检查状态或是否启用慢导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 如果 Transformers 或 Torch 不可用,则抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具模块导入所有虚拟对象
from ...utils.dummy_torch_and_transformers_objects import *
# 如果依赖可用,导入真实的 StableDiffusionSAGPipeline
else:
from .pipeline_stable_diffusion_sag import StableDiffusionSAGPipeline
# 如果不在类型检查或慢导入模式下
else:
# 导入系统模块
import sys
# 将当前模块替换为懒加载模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"], # 当前文件的全局变量
_import_structure, # 导入结构
module_spec=__spec__, # 模块的规格
)
# 将虚拟对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\stable_diffusion_xl\pipeline_flax_stable_diffusion_xl.py
# 版权所有 2024 The HuggingFace Team. 保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)授权;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下地址获得许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件在“按现状”基础上分发,
# 不提供任何形式的明示或暗示的保证或条件。
# 请参阅许可证以获取有关权限和限制的具体语言。
# 从 functools 模块导入 partial 函数,用于部分应用函数
from functools import partial
# 从 typing 模块导入类型提示,便于类型检查
from typing import Dict, List, Optional, Union
# 导入 jax 库,用于高性能数值计算
import jax
# 导入 jax.numpy,提供类似于 NumPy 的数组操作
import jax.numpy as jnp
# 从 flax.core 导入 FrozenDict,提供不可变字典的实现
from flax.core.frozen_dict import FrozenDict
# 从 transformers 导入 CLIPTokenizer 和 FlaxCLIPTextModel,用于文本编码
from transformers import CLIPTokenizer, FlaxCLIPTextModel
# 从 diffusers.utils 导入 logging 模块,用于日志记录
from diffusers.utils import logging
# 从相对路径导入 FlaxAutoencoderKL 和 FlaxUNet2DConditionModel 模型
from ...models import FlaxAutoencoderKL, FlaxUNet2DConditionModel
# 从相对路径导入各种调度器
from ...schedulers import (
FlaxDDIMScheduler,
FlaxDPMSolverMultistepScheduler,
FlaxLMSDiscreteScheduler,
FlaxPNDMScheduler,
)
# 从相对路径导入 FlaxDiffusionPipeline 基类
from ..pipeline_flax_utils import FlaxDiffusionPipeline
# 从相对路径导入 FlaxStableDiffusionXLPipelineOutput 类
from .pipeline_output import FlaxStableDiffusionXLPipelineOutput
# 获取当前模块的日志记录器
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 设置为 True 将使用 Python 循环而不是 jax.fori_loop,方便调试
DEBUG = False
# 定义 FlaxStableDiffusionXLPipeline 类,继承自 FlaxDiffusionPipeline
class FlaxStableDiffusionXLPipeline(FlaxDiffusionPipeline):
# 初始化方法,设置模型及其相关参数
def __init__(
self,
text_encoder: FlaxCLIPTextModel, # 文本编码器模型 1
text_encoder_2: FlaxCLIPTextModel, # 文本编码器模型 2
vae: FlaxAutoencoderKL, # 变分自编码器模型
tokenizer: CLIPTokenizer, # 文本标记器 1
tokenizer_2: CLIPTokenizer, # 文本标记器 2
unet: FlaxUNet2DConditionModel, # UNet 模型
scheduler: Union[
FlaxDDIMScheduler, FlaxPNDMScheduler, FlaxLMSDiscreteScheduler, FlaxDPMSolverMultistepScheduler
], # 调度器,可选多种类型
dtype: jnp.dtype = jnp.float32, # 数据类型,默认为 float32
):
# 调用父类的初始化方法
super().__init__()
# 设置数据类型属性
self.dtype = dtype
# 注册模型模块到管道
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
scheduler=scheduler,
)
# 计算变分自编码器的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 准备输入的方法,处理文本提示
def prepare_inputs(self, prompt: Union[str, List[str]]):
# 检查 prompt 的类型是否为字符串或列表
if not isinstance(prompt, (str, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 假设有两个编码器
inputs = [] # 初始化输入列表
# 遍历两个标记器
for tokenizer in [self.tokenizer, self.tokenizer_2]:
# 使用标记器处理输入提示,返回填充后的输入 ID
text_inputs = tokenizer(
prompt,
padding="max_length", # 填充到最大长度
max_length=self.tokenizer.model_max_length, # 最大长度设置为标记器的最大值
truncation=True, # 截断超出最大长度的部分
return_tensors="np", # 返回 NumPy 格式的张量
)
# 将输入 ID 添加到输入列表
inputs.append(text_inputs.input_ids)
# 将输入 ID 堆叠为一个数组,按轴 1 组合
inputs = jnp.stack(inputs, axis=1)
# 返回准备好的输入
return inputs
# 定义调用方法,接受多个参数用于生成图像
def __call__(
self,
prompt_ids: jax.Array, # 输入的提示 ID 数组
params: Union[Dict, FrozenDict], # 模型参数字典
prng_seed: jax.Array, # 随机数种子,用于生成随机数
num_inference_steps: int = 50, # 推理步骤数量,默认值为 50
guidance_scale: Union[float, jax.Array] = 7.5, # 引导比例,默认值为 7.5
height: Optional[int] = None, # 生成图像的高度,默认为 None
width: Optional[int] = None, # 生成图像的宽度,默认为 None
latents: jnp.array = None, # 潜在变量,默认为 None
neg_prompt_ids: jnp.array = None, # 负提示 ID 数组,默认为 None
return_dict: bool = True, # 是否返回字典格式,默认为 True
output_type: str = None, # 输出类型,默认为 None
jit: bool = False, # 是否启用 JIT 编译,默认为 False
):
# 0. 默认高度和宽度设置为 unet 配置的样本大小乘以 VAE 缩放因子
height = height or self.unet.config.sample_size * self.vae_scale_factor
width = width or self.unet.config.sample_size * self.vae_scale_factor
# 如果引导比例为浮点数且启用 JIT 编译
if isinstance(guidance_scale, float) and jit:
# 将引导比例转换为张量,确保每个设备都有副本
guidance_scale = jnp.array([guidance_scale] * prompt_ids.shape[0])
guidance_scale = guidance_scale[:, None] # 增加维度以便于后续计算
# 检查是否返回潜在变量
return_latents = output_type == "latent"
# 根据是否启用 JIT 调用不同的生成函数
if jit:
images = _p_generate(
self,
prompt_ids,
params,
prng_seed,
num_inference_steps,
height,
width,
guidance_scale,
latents,
neg_prompt_ids,
return_latents,
)
else:
images = self._generate(
prompt_ids,
params,
prng_seed,
num_inference_steps,
height,
width,
guidance_scale,
latents,
neg_prompt_ids,
return_latents,
)
# 如果不返回字典格式,直接返回图像
if not return_dict:
return (images,)
# 返回包含生成图像的输出对象
return FlaxStableDiffusionXLPipelineOutput(images=images)
# 定义获取嵌入的方法
def get_embeddings(self, prompt_ids: jnp.array, params):
# 假设我们有两个编码器
# bs, encoder_input, seq_length
te_1_inputs = prompt_ids[:, 0, :] # 获取第一个编码器的输入
te_2_inputs = prompt_ids[:, 1, :] # 获取第二个编码器的输入
# 使用第一个文本编码器生成嵌入,输出隐藏状态
prompt_embeds = self.text_encoder(te_1_inputs, params=params["text_encoder"], output_hidden_states=True)
prompt_embeds = prompt_embeds["hidden_states"][-2] # 获取倒数第二个隐藏状态
prompt_embeds_2_out = self.text_encoder_2(
te_2_inputs, params=params["text_encoder_2"], output_hidden_states=True
) # 使用第二个文本编码器生成嵌入
prompt_embeds_2 = prompt_embeds_2_out["hidden_states"][-2] # 获取倒数第二个隐藏状态
text_embeds = prompt_embeds_2_out["text_embeds"] # 获取文本嵌入
# 将两个嵌入沿最后一个维度拼接
prompt_embeds = jnp.concatenate([prompt_embeds, prompt_embeds_2], axis=-1)
return prompt_embeds, text_embeds # 返回拼接后的嵌入和文本嵌入
# 定义获取添加时间 ID 的方法
def _get_add_time_ids(self, original_size, crops_coords_top_left, target_size, bs, dtype):
# 将原始大小、裁剪坐标和目标大小组合成一个列表
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 将列表转换为指定数据类型的数组,并复制 bs 次
add_time_ids = jnp.array([add_time_ids] * bs, dtype=dtype)
return add_time_ids # 返回添加时间 ID 的数组
# 定义生成函数,接收多个参数
def _generate(
# 输入提示的 ID 数组
self,
prompt_ids: jnp.array,
# 模型参数,可以是字典或冷冻字典
params: Union[Dict, FrozenDict],
# 随机种子,用于生成随机数
prng_seed: jax.Array,
# 推理步骤的数量
num_inference_steps: int,
# 生成图像的高度
height: int,
# 生成图像的宽度
width: int,
# 引导比例,用于控制生成质量
guidance_scale: float,
# 可选的潜在变量数组,默认为 None
latents: Optional[jnp.array] = None,
# 可选的负提示 ID 数组,默认为 None
neg_prompt_ids: Optional[jnp.array] = None,
# 返回潜在变量的标志,默认为 False
return_latents=False,
# 静态参数是管道、推理步数、高度、宽度和返回潜在变量。任何更改都会触发重新编译。
# 非静态参数是输入张量,映射到其第一个维度(因此为 `0`)。
@partial(
jax.pmap, # 使用 JAX 的并行映射函数
in_axes=(None, 0, 0, 0, None, None, None, 0, 0, 0, None), # 指定输入参数在并行处理时的轴
static_broadcasted_argnums=(0, 4, 5, 6, 10), # 指定静态广播参数的索引
)
def _p_generate( # 定义生成函数,处理图像生成任务
pipe, # 管道对象,用于生成图像
prompt_ids, # 提示词 ID 的张量输入
params, # 生成所需的参数
prng_seed, # 随机数种子,用于生成随机性
num_inference_steps, # 推理步数,决定生成过程的细节
height, # 输出图像的高度
width, # 输出图像的宽度
guidance_scale, # 引导比例,控制生成的质量
latents, # 潜在变量,用于生成过程中的信息
neg_prompt_ids, # 负提示词 ID 的张量输入
return_latents, # 是否返回潜在变量的标志
):
return pipe._generate( # 调用管道的生成方法
prompt_ids, # 传入提示词 ID
params, # 传入生成参数
prng_seed, # 传入随机数种子
num_inference_steps, # 传入推理步数
height, # 传入输出图像高度
width, # 传入输出图像宽度
guidance_scale, # 传入引导比例
latents, # 传入潜在变量
neg_prompt_ids, # 传入负提示词 ID
return_latents, # 传入是否返回潜在变量的标志
)