GLM-4v-9B-源码解析-一-
GLM-4v-9B 源码解析(一)
Raise valuable PR / 提出有价值的PR
Caution/ 注意事项:
Users should keep the following points in mind when submitting PRs:
- The proposed PR should be about this project.
- the proposed PR should be relevant, if there are multiple ideas and optimizations, they should be assigned to different PRs.
用户在提交PR时候应该注意以下几点:
- 提出的PR应该是关于本项目的。
- 提出的PR应该具有针对性,如果具有多个不同的想法和优化方案,应该分配到不同的PR中。
不应该提出的PR / PRs that should not be proposed
If a developer proposes a PR about any of the following, it may be closed or Rejected.
- those that don't describe improvement options.
- multiple issues of different types combined in one PR.
- The proposed PR is highly duplicative of already existing PRs.
如果开发者提出关于以下方面的PR,则可能会被直接关闭或拒绝通过。
- 没有说明改进方案的。
- 多个不同类型的问题合并在一个PR中的。
- 提出的PR与已经存在的PR高度重复的。
检查您的PR
.\chatglm4-finetune\basic_demo\glm4v_server.py
# 导入垃圾回收模块
import gc
# 导入线程模块
import threading
# 导入时间模块
import time
# 导入 base64 编码模块
import base64
# 导入系统模块
import sys
# 从上下文管理器导入异步上下文管理器
from contextlib import asynccontextmanager
# 导入类型提示相关模块
from typing import List, Literal, Union, Tuple, Optional
# 导入 PyTorch 库
import torch
# 导入 uvicorn 作为 ASGI 服务器
import uvicorn
# 导入请求库
import requests
# 导入 FastAPI 框架
from fastapi import FastAPI, HTTPException
# 导入 CORS 中间件
from fastapi.middleware.cors import CORSMiddleware
# 导入 Pydantic 基础模型和字段定义
from pydantic import BaseModel, Field
# 导入 SSE 事件源响应
from sse_starlette.sse import EventSourceResponse
# 导入 Transformers 模型相关模块
from transformers import (
AutoTokenizer,
AutoModel,
TextIteratorStreamer
)
# 导入 PEFT 模型
from peft import PeftModelForCausalLM
# 导入图像处理库
from PIL import Image
# 导入字节流模块
from io import BytesIO
# 导入路径处理模块
from pathlib import Path
# 设置设备为 CUDA,如果不可用则为 CPU
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# 根据 GPU 能力选择适当的 PyTorch 数据类型
TORCH_TYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8 else torch.float16
# 定义异步上下文管理器以管理 FastAPI 应用的生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
一个异步上下文管理器,用于管理 FastAPI 应用的生命周期。
确保在应用生命周期结束后清理 GPU 内存,这是 GPU 环境中高效资源管理的关键。
"""
yield
# 如果可用,清空 CUDA 缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 进行 CUDA IPC 垃圾回收
torch.cuda.ipc_collect()
# 创建 FastAPI 应用,并传入生命周期管理器
app = FastAPI(lifespan=lifespan)
# 添加 CORS 中间件,以允许跨域请求
app.add_middleware(
CORSMiddleware,
# 允许所有来源
allow_origins=["*"],
# 允许凭证
allow_credentials=True,
# 允许所有方法
allow_methods=["*"],
# 允许所有请求头
allow_headers=["*"],
)
# 定义表示模型卡的 Pydantic 模型
class ModelCard(BaseModel):
"""
表示模型卡的 Pydantic 模型,提供机器学习模型的元数据。
包括模型 ID、所有者和创建时间等字段。
"""
id: str
object: str = "model" # 模型对象类型
created: int = Field(default_factory=lambda: int(time.time())) # 创建时间戳
owned_by: str = "owner" # 所有者信息
root: Optional[str] = None # 根模型信息(可选)
parent: Optional[str] = None # 父模型信息(可选)
permission: Optional[list] = None # 权限信息(可选)
# 定义表示模型列表的 Pydantic 模型
class ModelList(BaseModel):
object: str = "list" # 对象类型为列表
data: List[ModelCard] = [] # 包含模型卡的列表
# 定义表示图像 URL 的 Pydantic 模型
class ImageUrl(BaseModel):
url: str # 图像 URL 字符串
# 定义表示文本内容的 Pydantic 模型
class TextContent(BaseModel):
type: Literal["text"] # 类型为文本
text: str # 文本内容
# 定义表示图像 URL 内容的 Pydantic 模型
class ImageUrlContent(BaseModel):
type: Literal["image_url"] # 类型为图像 URL
image_url: ImageUrl # 图像 URL 对象
# 定义内容项的联合类型
ContentItem = Union[TextContent, ImageUrlContent]
# 定义聊天消息输入的 Pydantic 模型
class ChatMessageInput(BaseModel):
role: Literal["user", "assistant", "system"] # 消息角色
content: Union[str, List[ContentItem]] # 消息内容,可以是字符串或内容项列表
name: Optional[str] = None # 消息发送者名称(可选)
# 定义聊天消息响应的 Pydantic 模型
class ChatMessageResponse(BaseModel):
role: Literal["assistant"] # 消息角色为助手
content: str = None # 消息内容
name: Optional[str] = None # 消息发送者名称(可选)
# 定义增量消息的 Pydantic 模型
class DeltaMessage(BaseModel):
role: Optional[Literal["user", "assistant", "system"]] = None # 可选角色
content: Optional[str] = None # 可选内容
# 定义聊天完成请求的 Pydantic 模型
class ChatCompletionRequest(BaseModel):
model: str # 使用的模型名称
messages: List[ChatMessageInput] # 消息输入列表
temperature: Optional[float] = 0.8 # 温度参数(控制生成随机性)
top_p: Optional[float] = 0.8 # Top-p 采样参数
max_tokens: Optional[int] = None # 最大生成标记数(可选)
stream: Optional[bool] = False # 是否流式返回结果
# 附加参数
repetition_penalty: Optional[float] = 1.0 # 重复惩罚参数
# 定义聊天完成响应选择的 Pydantic 模型
class ChatCompletionResponseChoice(BaseModel):
index: int # 选择的索引
message: ChatMessageResponse # 消息内容
# 定义一个模型类,用于表示聊天完成响应流中的选择
class ChatCompletionResponseStreamChoice(BaseModel):
# 选择的索引
index: int
# 存储差异信息的消息
delta: DeltaMessage
# 定义一个模型类,用于表示使用信息
class UsageInfo(BaseModel):
# 提示的 token 数量,默认为 0
prompt_tokens: int = 0
# 总 token 数量,默认为 0
total_tokens: int = 0
# 完成的 token 数量,默认为 0
completion_tokens: Optional[int] = 0
# 定义一个模型类,用于表示聊天完成的响应
class ChatCompletionResponse(BaseModel):
# 使用的模型名称
model: str
# 对象的字面量类型
object: Literal["chat.completion", "chat.completion.chunk"]
# 选择列表,包含聊天完成的选择
choices: List[Union[ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice]]
# 创建时间,默认为当前时间
created: Optional[int] = Field(default_factory=lambda: int(time.time()))
# 使用信息,可选
usage: Optional[UsageInfo] = None
# 定义一个 GET 请求的端点,列出可用模型
@app.get("/v1/models", response_model=ModelList)
async def list_models():
"""
一个用于列出可用模型的端点。返回模型卡的列表。
这对客户端查询和了解可用模型非常有用。
"""
# 创建一个模型卡实例
model_card = ModelCard(id="GLM-4v-9b")
# 返回包含模型卡的模型列表
return ModelList(data=[model_card])
# 定义一个 POST 请求的端点,用于创建聊天完成
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def create_chat_completion(request: ChatCompletionRequest):
# 声明全局变量,模型和分词器
global model, tokenizer
# 验证请求是否有效,确保消息数量大于 0 且最后一条消息不是助手
if len(request.messages) < 1 or request.messages[-1].role == "assistant":
raise HTTPException(status_code=400, detail="Invalid request")
# 设置生成参数的字典
gen_params = dict(
messages=request.messages,
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens or 1024,
echo=False,
stream=request.stream,
repetition_penalty=request.repetition_penalty
)
# 如果请求要求流式生成
if request.stream:
# 调用预测函数生成内容
generate = predict(request.model, gen_params)
# 返回事件源响应
return EventSourceResponse(generate, media_type="text/event-stream")
# 生成聊天完成响应
response = generate_glm4v(model, tokenizer, gen_params)
# 创建使用信息的实例
usage = UsageInfo()
# 创建聊天消息响应,角色为助手
message = ChatMessageResponse(
role="assistant",
content=response["text"],
)
# 创建聊天完成响应选择数据
choice_data = ChatCompletionResponseChoice(
index=0,
message=message,
)
# 验证和累加任务使用信息
task_usage = UsageInfo.model_validate(response["usage"])
for usage_key, usage_value in task_usage.model_dump().items():
setattr(usage, usage_key, getattr(usage, usage_key) + usage_value)
# 返回聊天完成响应
return ChatCompletionResponse(model=request.model, choices=[choice_data], object="chat.completion", usage=usage)
# 定义一个预测函数,生成聊天完成的响应
def predict(model_id: str, params: dict):
# 声明全局变量,模型和分词器
global model, tokenizer
# 创建聊天完成响应流选择数据
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(role="assistant"),
finish_reason=None
)
# 创建聊天完成响应的片段
chunk = ChatCompletionResponse(model=model_id, choices=[choice_data], object="chat.completion.chunk")
# 生成响应的 JSON 格式
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
# 初始化之前的文本为空
previous_text = ""
# 遍历生成的响应流,获取每个新的响应
for new_response in generate_stream_glm4v(model, tokenizer, params):
# 从新响应中提取解码后的文本内容
decoded_unicode = new_response["text"]
# 计算与之前文本的差异部分
delta_text = decoded_unicode[len(previous_text):]
# 更新之前的文本为当前的解码文本
previous_text = decoded_unicode
# 创建一个 DeltaMessage 对象,包含差异文本和角色信息
delta = DeltaMessage(content=delta_text, role="assistant")
# 创建选择数据对象,表示响应中的一个选项
choice_data = ChatCompletionResponseStreamChoice(index=0, delta=delta)
# 创建一个聊天完成响应对象,包含模型ID和选择数据
chunk = ChatCompletionResponse(model=model_id, choices=[choice_data], object="chat.completion.chunk")
# 生成并返回 JSON 格式的聊天完成响应,排除未设置的值
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
# 创建一个空的 DeltaMessage 对象,用于最后的响应
choice_data = ChatCompletionResponseStreamChoice(index=0, delta=DeltaMessage())
# 创建最后的聊天完成响应对象
chunk = ChatCompletionResponse(model=model_id, choices=[choice_data], object="chat.completion.chunk")
# 生成并返回 JSON 格式的聊天完成响应,排除未设置的值
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
# 生成基于 GLM-4v-9b 模型的响应,处理聊天历史和图像数据(如果有),并调用模型生成响应
def generate_glm4v(model: AutoModel, tokenizer: AutoTokenizer, params: dict):
# 初始化响应变量为 None
response = None
# 通过生成器函数生成模型响应,使用 for 循环遍历每个生成的响应
for response in generate_stream_glm4v(model, tokenizer, params):
pass # 在此处不做任何处理,仅用于消耗生成器
# 返回生成的响应
return response
# 处理历史消息以提取文本,识别最后的用户查询,并将 base64 编码的图像 URL 转换为 PIL 图像
def process_history_and_images(messages: List[ChatMessageInput]) -> Tuple[
Optional[str], Optional[List[Tuple[str, str]]], Optional[List[Image.Image]]]:
"""
Args:
messages(List[ChatMessageInput]): ChatMessageInput 对象的列表。
return: 包含三个元素的元组:
- 最后一个用户查询的字符串。
- 格式化为模型所需的元组列表的文本历史。
- 从消息中提取的 PIL 图像对象的列表。
"""
# 初始化格式化历史、图像列表和最后用户查询
formatted_history = []
image_list = []
last_user_query = ''
# 遍历每条消息及其索引
for i, message in enumerate(messages):
role = message.role # 获取消息的角色
content = message.content # 获取消息内容
if isinstance(content, list): # 如果内容是列表,处理文本
# 将内容中的文本连接为一个字符串
text_content = ' '.join(item.text for item in content if isinstance(item, TextContent))
else:
text_content = content # 否则直接使用内容
if isinstance(content, list): # 如果内容是列表,处理图像
for item in content:
if isinstance(item, ImageUrlContent): # 如果项是图像 URL 内容
image_url = item.image_url.url # 获取图像 URL
if image_url.startswith("data:image/jpeg;base64,"): # 如果是 base64 编码图像
# 解码 base64 编码的图像数据
base64_encoded_image = image_url.split("data:image/jpeg;base64,")[1]
image_data = base64.b64decode(base64_encoded_image) # 解码
image = Image.open(BytesIO(image_data)).convert('RGB') # 转换为 RGB 图像
else: # 如果是常规 URL
response = requests.get(image_url, verify=False) # 获取图像数据
image = Image.open(BytesIO(response.content)).convert('RGB') # 转换为 RGB 图像
image_list.append(image) # 将图像添加到图像列表中
if role == 'user': # 如果角色是用户
if i == len(messages) - 1: # 如果是最后一条用户消息
last_user_query = text_content # 更新最后用户查询
else:
formatted_history.append((text_content, '')) # 将文本内容添加到格式化历史中
elif role == 'assistant': # 如果角色是助手
if formatted_history: # 如果格式化历史不为空
if formatted_history[-1][1] != '': # 检查最后的查询是否已经有回答
assert False, f"the last query is answered. answer again. {formatted_history[-1][0]}, {formatted_history[-1][1]}, {text_content}"
formatted_history[-1] = (formatted_history[-1][0], text_content) # 更新助手的回答
else:
assert False, f"assistant reply before user" # 如果助手在用户之前回复,触发错误
else:
assert False, f"unrecognized role: {role}" # 如果角色不被识别,触发错误
# 返回最后用户查询、格式化历史和图像列表
return last_user_query, formatted_history, image_list
# 使用 PyTorch 的推理模式定义生成流
@torch.inference_mode()
def generate_stream_glm4v(model: AutoModel, tokenizer: AutoTokenizer, params: dict):
# 从参数中提取消息列表
messages = params["messages"]
# 获取温度参数,默认值为1.0
temperature = float(params.get("temperature", 1.0))
# 获取重复惩罚参数,默认值为1.0
repetition_penalty = float(params.get("repetition_penalty", 1.0))
# 获取top_p参数,默认值为1.0
top_p = float(params.get("top_p", 1.0))
# 获取最大新标记数,默认值为256
max_new_tokens = int(params.get("max_tokens", 256))
# 处理历史消息和图片,返回查询、历史记录和图片列表
query, history, image_list = process_history_and_images(messages)
# 初始化输入列表
inputs = []
# 遍历历史记录,按索引和消息对进行迭代
for idx, (user_msg, model_msg) in enumerate(history):
# 如果是最后一条历史记录且没有模型消息
if idx == len(history) - 1 and not model_msg:
# 将用户消息添加到输入列表
inputs.append({"role": "user", "content": user_msg})
# 如果有图片且未上传
if image_list and not uploaded:
# 更新输入列表中的最后一条,添加图片
inputs[-1].update({"image": image_list[0]})
uploaded = True
# 跳出循环
break
# 如果有用户消息
if user_msg:
# 添加用户消息到输入列表
inputs.append({"role": "user", "content": user_msg})
# 如果有模型消息
if model_msg:
# 添加模型消息到输入列表
inputs.append({"role": "assistant", "content": model_msg})
# 将查询和图片添加到输入列表
inputs.append({"role": "user", "content": query, "image": image_list[0]})
# 使用tokenizer处理输入,应用聊天模板
model_inputs = tokenizer.apply_chat_template(
inputs,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
return_dict=True
).to(next(model.parameters()).device) # 将模型输入移动到模型所在设备
# 计算输入回声长度
input_echo_len = len(model_inputs["input_ids"][0])
# 初始化文本流迭代器
streamer = TextIteratorStreamer(
tokenizer=tokenizer,
timeout=60.0,
skip_prompt=True,
skip_special_tokens=True
)
# 准备生成文本的参数
gen_kwargs = {
"repetition_penalty": repetition_penalty,
"max_new_tokens": max_new_tokens,
"do_sample": True if temperature > 1e-5 else False, # 根据温度判断是否采样
"top_p": top_p if temperature > 1e-5 else 0, # 根据温度调整top_p
"top_k": 1,
'streamer': streamer, # 使用文本流迭代器
}
# 如果温度大于阈值,添加温度参数
if temperature > 1e-5:
gen_kwargs["temperature"] = temperature
# 初始化生成文本的变量
generated_text = ""
# 定义生成文本的函数
def generate_text():
with torch.no_grad(): # 禁用梯度计算以节省内存
model.generate(**model_inputs, **gen_kwargs) # 调用模型生成文本
# 启动生成文本的线程
generation_thread = threading.Thread(target=generate_text)
generation_thread.start()
# 记录输入回声的总长度
total_len = input_echo_len
# 从流中获取下一个文本片段
for next_text in streamer:
# 将下一个文本片段添加到生成文本中
generated_text += next_text
# 更新总长度
total_len = len(tokenizer.encode(generated_text))
# 生成并返回当前文本和使用情况
yield {
"text": generated_text,
"usage": {
"prompt_tokens": input_echo_len, # 输入提示的标记数
"completion_tokens": total_len - input_echo_len, # 生成文本的标记数
"total_tokens": total_len, # 总标记数
},
}
# 等待生成线程完成
generation_thread.join()
# 最终返回生成的文本和使用情况
yield {
"text": generated_text,
"usage": {
"prompt_tokens": input_echo_len,
"completion_tokens": total_len - input_echo_len,
"total_tokens": total_len,
},
}
# 垃圾回收,释放未使用的内存
gc.collect()
# 清空 CUDA 缓存以释放显存
torch.cuda.empty_cache()
# 判断是否为主程序入口
if __name__ == "__main__":
# 从命令行参数获取模型路径
MODEL_PATH = sys.argv[1]
# 解析模型路径,处理用户目录符号并获取绝对路径
model_dir = Path(MODEL_PATH).expanduser().resolve()
# 检查配置文件是否存在
if (model_dir / 'adapter_config.json').exists():
# 导入 JSON 库以读取配置文件
import json
# 打开配置文件并以 UTF-8 编码读取内容
with open(model_dir / 'adapter_config.json', 'r', encoding='utf-8') as file:
# 加载 JSON 配置为字典
config = json.load(file)
# 从预训练模型中加载基础模型,配置自动选择设备
model = AutoModel.from_pretrained(
config.get('base_model_name_or_path'),
trust_remote_code=True,
device_map='auto',
torch_dtype=TORCH_TYPE
)
# 从预训练模型中加载适配模型
model = PeftModelForCausalLM.from_pretrained(
model=model,
model_id=model_dir,
trust_remote_code=True,
)
# 从预训练模型中加载分词器
tokenizer = AutoTokenizer.from_pretrained(
config.get('base_model_name_or_path'),
trust_remote_code=True,
encode_special_tokens=True
)
# 将模型设置为评估模式并移动到指定设备
model.eval().to(DEVICE)
else:
# 从模型路径加载分词器
tokenizer = AutoTokenizer.from_pretrained(
MODEL_PATH,
trust_remote_code=True,
encode_special_tokens=True
)
# 从模型路径加载预训练模型,设置数据类型和设备
model = AutoModel.from_pretrained(
MODEL_PATH,
torch_dtype=TORCH_TYPE,
trust_remote_code=True,
device_map="auto",
).eval().to(DEVICE)
# 启动 Uvicorn 服务器,监听指定的 IP 和端口
uvicorn.run(app, host='0.0.0.0', port=8000, workers=1)
.\chatglm4-finetune\basic_demo\glm_server.py
# 导入时间模块,用于时间处理
import time
# 从 asyncio 日志中导入 logger,通常用于异步日志记录
from asyncio.log import logger
# 导入正则表达式模块,用于字符串匹配和操作
import re
# 导入系统模块,提供对 Python 解释器使用或维护的一些变量和函数的访问
import sys
# 导入 Uvicorn,作为 ASGI 服务器运行 FastAPI 应用
import uvicorn
# 导入垃圾回收模块,管理内存使用
import gc
# 导入 JSON 模块,用于处理 JSON 数据
import json
# 导入 PyTorch 库,支持深度学习模型的构建与训练
import torch
# 导入随机模块,提供随机数生成
import random
# 导入字符串模块,处理字符串操作
import string
# 从 vllm 库中导入相关的参数和引擎类
from vllm import SamplingParams, AsyncEngineArgs, AsyncLLMEngine
# 从 FastAPI 导入应用程序类和异常处理
from fastapi import FastAPI, HTTPException, Response
# 导入 CORS 中间件,处理跨域资源共享
from fastapi.middleware.cors import CORSMiddleware
# 导入异步上下文管理器
from contextlib import asynccontextmanager
# 导入类型注释相关工具
from typing import List, Literal, Optional, Union
# 导入 Pydantic 基类和字段定义工具
from pydantic import BaseModel, Field
# 从 transformers 库中导入相关工具,用于处理自然语言处理模型
from transformers import AutoTokenizer, LogitsProcessor
# 从 SSE Starlette 库中导入事件源响应
from sse_starlette.sse import EventSourceResponse
# 设置默认的事件源响应心跳间隔为 1000 毫秒
EventSourceResponse.DEFAULT_PING_INTERVAL = 1000
# 定义最大模型长度为 8192
MAX_MODEL_LENGTH = 8192
# 定义异步上下文管理器,处理应用的生命周期
@asynccontextmanager
async def lifespan(app: FastAPI):
# 生成上下文,允许在此处执行初始化
yield
# 如果可用,清空 CUDA 的缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 执行 CUDA 进程间通信的收集
torch.cuda.ipc_collect()
# 创建 FastAPI 应用,传入生命周期管理器
app = FastAPI(lifespan=lifespan)
# 添加中间件以处理跨域请求
app.add_middleware(
CORSMiddleware,
# 允许所有来源的请求
allow_origins=["*"],
# 允许凭据
allow_credentials=True,
# 允许所有 HTTP 方法
allow_methods=["*"],
# 允许所有请求头
allow_headers=["*"],
)
# 定义生成唯一 ID 的函数,接受前缀和长度参数
def generate_id(prefix: str, k=29) -> str:
# 随机生成指定长度的后缀,由字母和数字组成
suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=k))
# 返回组合的前缀和后缀
return f"{prefix}{suffix}"
# 定义模型卡类,表示模型的信息
class ModelCard(BaseModel):
# 模型 ID,默认为空字符串
id: str = ""
# 对象类型,默认为 "model"
object: str = "model"
# 创建时间,默认为当前时间戳
created: int = Field(default_factory=lambda: int(time.time()))
# 模型拥有者,默认为 "owner"
owned_by: str = "owner"
# 根模型的 ID,可选
root: Optional[str] = None
# 父模型的 ID,可选
parent: Optional[str] = None
# 权限信息,可选
permission: Optional[list] = None
# 定义模型列表类,表示多个模型的信息
class ModelList(BaseModel):
# 对象类型,默认为 "list"
object: str = "list"
# 包含的模型卡数据,默认为一个包含 "glm-4" 的列表
data: List[ModelCard] = ["glm-4"]
# 定义函数调用类,表示函数名称和参数
class FunctionCall(BaseModel):
# 函数名称,可选
name: Optional[str] = None
# 函数参数,可选
arguments: Optional[str] = None
# 定义工具调用的函数参数类
class ChoiceDeltaToolCallFunction(BaseModel):
# 函数名称,可选
name: Optional[str] = None
# 函数参数,可选
arguments: Optional[str] = None
# 定义使用信息类,记录提示和完成的令牌数量
class UsageInfo(BaseModel):
# 提示令牌数量,默认为 0
prompt_tokens: int = 0
# 总令牌数量,默认为 0
total_tokens: int = 0
# 完成令牌数量,可选,默认为 0
completion_tokens: Optional[int] = 0
# 定义聊天完成消息工具调用类
class ChatCompletionMessageToolCall(BaseModel):
# 消息索引,可选,默认为 0
index: Optional[int] = 0
# 消息 ID,可选
id: Optional[str] = None
# 函数调用对象
function: FunctionCall
# 消息类型,可选,默认为 "function"
type: Optional[Literal["function"]] = 'function'
# 定义聊天消息类,表示用户与助手之间的消息
class ChatMessage(BaseModel):
# “function” 字段解释:
# 使用较老的OpenAI API版本需要注意在这里添加 function 字段并在 process_messages函数中添加相应角色转换逻辑为 observation
# 消息角色,取值包括 "user"、"assistant"、"system" 和 "tool"
role: Literal["user", "assistant", "system", "tool"]
# 消息内容,可选
content: Optional[str] = None
# 函数调用,可选
function_call: Optional[ChoiceDeltaToolCallFunction] = None
# 工具调用列表,可选
tool_calls: Optional[List[ChatCompletionMessageToolCall]] = None
# 定义增量消息类,表示消息的变化
class DeltaMessage(BaseModel):
# 消息角色,可选,取值包括 "user"、"assistant" 和 "system"
role: Optional[Literal["user", "assistant", "system"]] = None
# 消息内容,可选
content: Optional[str] = None
# 函数调用,可选
function_call: Optional[ChoiceDeltaToolCallFunction] = None
# 工具调用列表,可选
tool_calls: Optional[List[ChatCompletionMessageToolCall]] = None
# 定义聊天完成响应选择类
class ChatCompletionResponseChoice(BaseModel):
# 消息索引
index: int
# 消息内容
message: ChatMessage
# 完成原因,取值包括 "stop"、"length" 和 "tool_calls"
finish_reason: Literal["stop", "length", "tool_calls"]
# 定义聊天完成响应流选择类
class ChatCompletionResponseStreamChoice(BaseModel):
# 增量消息
delta: DeltaMessage
# 完成原因,取值包括 "stop"、"length" 和 "tool_calls",可选
finish_reason: Optional[Literal["stop", "length", "tool_calls"]]
# 消息索引
index: int
# 定义一个聊天完成响应类,继承自 BaseModel
class ChatCompletionResponse(BaseModel):
# 模型名称
model: str
# 唯一 ID,使用默认工厂函数生成
id: Optional[str] = Field(default_factory=lambda: generate_id('chatcmpl-', 29))
# 对象类型,限制为特定字符串
object: Literal["chat.completion", "chat.completion.chunk"]
# 可选项,包含响应选择的列表
choices: List[Union[ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice]]
# 创建时间戳,使用默认工厂函数生成
created: Optional[int] = Field(default_factory=lambda: int(time.time()))
# 系统指纹,使用默认工厂函数生成
system_fingerprint: Optional[str] = Field(default_factory=lambda: generate_id('fp_', 9))
# 可选的使用信息
usage: Optional[UsageInfo] = None
# 定义一个聊天完成请求类,继承自 BaseModel
class ChatCompletionRequest(BaseModel):
# 模型名称
model: str
# 消息列表
messages: List[ChatMessage]
# 温度参数,默认为 0.8
temperature: Optional[float] = 0.8
# top_p 参数,默认为 0.8
top_p: Optional[float] = 0.8
# 最大 token 数,默认为 None
max_tokens: Optional[int] = None
# 流式输出标志,默认为 False
stream: Optional[bool] = False
# 可选工具,可能是字典或字典列表
tools: Optional[Union[dict, List[dict]]] = None
# 可选工具选择,可能是字符串或字典
tool_choice: Optional[Union[str, dict]] = None
# 重复惩罚参数,默认为 1.1
repetition_penalty: Optional[float] = 1.1
# 定义一个无效得分日志处理器,继承自 LogitsProcessor
class InvalidScoreLogitsProcessor(LogitsProcessor):
# 重载调用方法以处理得分
def __call__(
self, input_ids: torch.LongTensor, scores: torch.FloatTensor
) -> torch.FloatTensor:
# 检查得分是否为 NaN 或无穷大
if torch.isnan(scores).any() or torch.isinf(scores).any():
# 将得分置为零
scores.zero_()
# 设置特定索引的得分值
scores[..., 5] = 5e4
# 返回处理后的得分
return scores
# 定义一个处理响应的函数
def process_response(output: str, tools: dict | List[dict] = None, use_tool: bool = False) -> Union[str, dict]:
# 去除输出的多余空白并按行分割
lines = output.strip().split("\n")
# 初始化 JSON 参数
arguments_json = None
# 定义特殊工具列表
special_tools = ["cogview", "simple_browser"]
# 将工具提取为集合,若工具参数为 None 则为空集合
tools = {tool['function']['name'] for tool in tools} if tools else {}
# 这是一个简单的工具比较函数,不能保证拦截所有非工具输出的结果,比如参数未对齐等特殊情况。
##TODO 如果你希望做更多判断,可以在这里进行逻辑完善。
# 检查行数是否大于等于2且第二行以"{"开头
if len(lines) >= 2 and lines[1].startswith("{"):
# 获取第一行并去除首尾空白,作为函数名
function_name = lines[0].strip()
# 将第二行及之后的内容合并为一个字符串,去除首尾空白
arguments = "\n".join(lines[1:]).strip()
# 检查函数名是否在工具或特殊工具列表中
if function_name in tools or function_name in special_tools:
try:
# 尝试将参数字符串解析为 JSON 格式
arguments_json = json.loads(arguments)
# 标记为工具调用
is_tool_call = True
except json.JSONDecodeError:
# 如果解析失败,检查函数名是否在特殊工具中
is_tool_call = function_name in special_tools
# 如果确认是工具调用且允许使用工具
if is_tool_call and use_tool:
# 创建内容字典,包含函数名和参数
content = {
"name": function_name,
"arguments": json.dumps(arguments_json if isinstance(arguments_json, dict) else arguments,
ensure_ascii=False)
}
# 特殊处理 "simple_browser" 函数
if function_name == "simple_browser":
# 定义正则表达式用于匹配搜索模式
search_pattern = re.compile(r'search\("(.+?)"\s*,\s*recency_days\s*=\s*(\d+)\)')
# 尝试在参数中匹配搜索模式
match = search_pattern.match(arguments)
if match:
# 如果匹配成功,更新内容字典中的参数
content["arguments"] = json.dumps({
"query": match.group(1),
"recency_days": int(match.group(2))
}, ensure_ascii=False)
# 特殊处理 "cogview" 函数
elif function_name == "cogview":
# 更新内容字典中的参数为提示文本
content["arguments"] = json.dumps({
"prompt": arguments
}, ensure_ascii=False)
# 返回内容字典
return content
# 返回处理后的输出,去除首尾空白
return output.strip()
# 定义一个异步函数,用于生成流式输出
@torch.inference_mode()
async def generate_stream_glm4(params):
# 从参数中提取消息
messages = params["messages"]
# 从参数中提取工具
tools = params["tools"]
# 从参数中提取工具选择
tool_choice = params["tool_choice"]
# 获取温度参数,默认为1.0
temperature = float(params.get("temperature", 1.0))
# 获取重复惩罚参数,默认为1.0
repetition_penalty = float(params.get("repetition_penalty", 1.0))
# 获取top_p参数,默认为1.0
top_p = float(params.get("top_p", 1.0))
# 获取最大新标记数,默认为8192
max_new_tokens = int(params.get("max_tokens", 8192))
# 处理消息并根据工具和选择进行调整
messages = process_messages(messages, tools=tools, tool_choice=tool_choice)
# 应用聊天模板,将消息转化为输入格式
inputs = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
# 创建参数字典,包含生成设置
params_dict = {
"n": 1,
"best_of": 1,
"presence_penalty": 1.0,
"frequency_penalty": 0.0,
"temperature": temperature,
"top_p": top_p,
"top_k": -1,
"repetition_penalty": repetition_penalty,
"use_beam_search": False,
"length_penalty": 1,
"early_stopping": False,
"stop_token_ids": [151329, 151336, 151338],
"ignore_eos": False,
"max_tokens": max_new_tokens,
"logprobs": None,
"prompt_logprobs": None,
"skip_special_tokens": True,
}
# 将参数字典转换为SamplingParams对象
sampling_params = SamplingParams(**params_dict)
# 异步生成输出,遍历生成的结果
async for output in engine.generate(inputs=inputs, sampling_params=sampling_params, request_id=f"{time.time()}"):
# 计算输出的标记长度
output_len = len(output.outputs[0].token_ids)
# 计算输入的标记长度
input_len = len(output.prompt_token_ids)
# 构建返回结果字典
ret = {
"text": output.outputs[0].text,
"usage": {
"prompt_tokens": input_len,
"completion_tokens": output_len,
"total_tokens": output_len + input_len
},
"finish_reason": output.outputs[0].finish_reason,
}
# 生成结果
yield ret
# 垃圾回收,释放内存
gc.collect()
# 清空CUDA缓存
torch.cuda.empty_cache()
# 定义一个函数处理消息,带有可选工具和工具选择
def process_messages(messages, tools=None, tool_choice="none"):
# 将原始消息存储到变量中
_messages = messages
# 创建一个空的处理消息列表
processed_messages = []
# 标记消息是否包含系统角色
msg_has_sys = False
# 定义一个过滤工具的函数
def filter_tools(tool_choice, tools):
# 从工具选择中获取函数名称
function_name = tool_choice.get('function', {}).get('name', None)
# 如果没有函数名称,返回空列表
if not function_name:
return []
# 过滤工具,仅保留与函数名称匹配的工具
filtered_tools = [
tool for tool in tools
if tool.get('function', {}).get('name') == function_name
]
return filtered_tools
# 如果工具选择不为“none”
if tool_choice != "none":
# 如果工具选择是字典,进行过滤
if isinstance(tool_choice, dict):
tools = filter_tools(tool_choice, tools)
# 如果有过滤后的工具,添加系统角色消息
if tools:
processed_messages.append(
{
"role": "system",
"content": None,
"tools": tools
}
)
msg_has_sys = True
# 如果工具选择是字典且存在工具,添加助手角色消息
if isinstance(tool_choice, dict) and tools:
processed_messages.append(
{
"role": "assistant",
"metadata": tool_choice["function"]["name"],
"content": ""
}
)
# 遍历消息列表 _messages 中的每条消息
for m in _messages:
# 获取消息的角色、内容和函数调用信息
role, content, func_call = m.role, m.content, m.function_call
# 获取消息中的工具调用,如果没有则为 None
tool_calls = getattr(m, 'tool_calls', None)
# 如果消息的角色是 "function"
if role == "function":
# 将处理后的观察结果添加到列表中,包含角色和内容
processed_messages.append(
{
"role": "observation", # 角色设为 "observation"
"content": content # 内容为消息的内容
}
)
# 如果消息的角色是 "tool"
elif role == "tool":
# 将处理后的观察结果添加到列表中,包含角色、内容和函数调用标志
processed_messages.append(
{
"role": "observation", # 角色设为 "observation"
"content": content, # 内容为消息的内容
"function_call": True # 表示这是一个函数调用
}
)
# 如果消息的角色是 "assistant"
elif role == "assistant":
# 如果存在工具调用
if tool_calls:
# 遍历每个工具调用
for tool_call in tool_calls:
# 将工具调用的处理结果添加到列表中
processed_messages.append(
{
"role": "assistant", # 角色设为 "assistant"
"metadata": tool_call.function.name, # 函数名作为元数据
"content": tool_call.function.arguments # 函数参数作为内容
}
)
# 如果没有工具调用
else:
# 将内容按换行符分割为多个响应
for response in content.split("\n"):
# 如果响应包含换行符
if "\n" in response:
# 将响应分为元数据和子内容,最多分割一次
metadata, sub_content = response.split("\n", maxsplit=1)
else:
# 如果没有换行符,则元数据为空,子内容为响应
metadata, sub_content = "", response
# 将处理结果添加到列表中
processed_messages.append(
{
"role": role, # 角色设为当前消息的角色
"metadata": metadata, # 元数据为解析得到的元数据
"content": sub_content.strip() # 内容为去除前后空格的子内容
}
)
# 处理其他角色
else:
# 如果角色是 "system" 且 msg_has_sys 为 True
if role == "system" and msg_has_sys:
msg_has_sys = False # 标记系统消息已处理
continue # 跳过当前循环,继续下一条消息
# 添加处理后的消息到列表中
processed_messages.append({"role": role, "content": content})
# 如果没有工具或选择的工具为 "none"
if not tools or tool_choice == "none":
# 再次遍历消息列表 _messages
for m in _messages:
# 如果消息的角色是 'system'
if m.role == 'system':
# 将系统消息插入到处理结果的开头
processed_messages.insert(0, {"role": m.role, "content": m.content})
break # 找到后跳出循环
# 返回处理后的消息列表
return processed_messages
# 定义健康检查的 HTTP GET 路由
@app.get("/health")
# 定义异步处理健康检查请求的函数,返回 Response 对象
async def health() -> Response:
"""Health check.""" # 函数的文档字符串,描述该函数的用途
# 返回状态码为 200 的响应,表示服务正常
return Response(status_code=200)
# 定义获取模型列表的 HTTP GET 路由
@app.get("/v1/models", response_model=ModelList)
# 定义异步处理获取模型列表请求的函数
async def list_models():
# 创建一个模型卡对象,ID 为 "glm-4"
model_card = ModelCard(id="glm-4")
# 返回一个包含模型卡的 ModelList 对象
return ModelList(data=[model_card])
# 定义创建聊天完成的 HTTP POST 路由
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
# 定义异步处理聊天完成请求的函数,接受 ChatCompletionRequest 类型的请求体
async def create_chat_completion(request: ChatCompletionRequest):
# 检查消息列表是否为空或最后一条消息角色为 "assistant"
if len(request.messages) < 1 or request.messages[-1].role == "assistant":
# 如果条件不满足,抛出 HTTP 400 异常,表示请求无效
raise HTTPException(status_code=400, detail="Invalid request")
# 创建生成参数字典,包含来自请求的各项参数
gen_params = dict(
messages=request.messages, # 消息列表
temperature=request.temperature, # 温度参数
top_p=request.top_p, # top-p 采样参数
max_tokens=request.max_tokens or 1024, # 最大 token 数,默认为 1024
echo=False, # 是否回显输入
stream=request.stream, # 是否使用流式输出
repetition_penalty=request.repetition_penalty, # 重复惩罚参数
tools=request.tools, # 可用工具
tool_choice=request.tool_choice, # 工具选择
)
# 记录生成参数的调试日志
logger.debug(f"==== request ====\n{gen_params}")
# 如果请求使用流式输出
if request.stream:
# 调用 predict_stream 函数生成预测流生成器
predict_stream_generator = predict_stream(request.model, gen_params)
# 获取预测流中的第一个输出
output = await anext(predict_stream_generator)
# 如果有输出,返回事件源响应
if output:
return EventSourceResponse(predict_stream_generator, media_type="text/event-stream")
# 记录第一个结果的调试日志
logger.debug(f"First result output:\n{output}")
# 初始化函数调用变量
function_call = None
# 如果有输出并且请求使用工具
if output and request.tools:
try:
# 处理响应以获取工具调用
function_call = process_response(output, request.tools, use_tool=True)
except:
# 如果解析工具调用失败,记录警告日志
logger.warning("Failed to parse tool call")
# 如果函数调用是字典类型
if isinstance(function_call, dict):
# 将字典转换为 ChoiceDeltaToolCallFunction 对象
function_call = ChoiceDeltaToolCallFunction(**function_call)
# 解析输出文本并生成
generate = parse_output_text(request.model, output, function_call=function_call)
# 返回事件源响应
return EventSourceResponse(generate, media_type="text/event-stream")
else:
# 如果没有有效的函数调用,返回事件源响应
return EventSourceResponse(predict_stream_generator, media_type="text/event-stream")
# 初始化响应变量
response = ""
# 异步生成 GLM4 的流式输出
async for response in generate_stream_glm4(gen_params):
pass # 持续生成直到结束
# 如果响应文本以换行符开始,去掉第一个换行符
if response["text"].startswith("\n"):
response["text"] = response["text"][1:]
# 去掉响应文本的前后空白
response["text"] = response["text"].strip()
# 创建使用信息对象
usage = UsageInfo()
# 初始化函数调用和结束原因变量
function_call, finish_reason = None, "stop"
# 如果请求使用工具
tool_calls = None
if request.tools:
try:
# 处理响应以获取工具调用
function_call = process_response(response["text"], request.tools, use_tool=True)
except Exception as e:
# 如果解析工具调用失败,记录警告日志
logger.warning(f"Failed to parse tool call: {e}")
# 检查 function_call 是否为字典类型
if isinstance(function_call, dict):
# 设置完成原因为工具调用
finish_reason = "tool_calls"
# 使用提供的字典参数创建 ChoiceDeltaToolCallFunction 实例
function_call_response = ChoiceDeltaToolCallFunction(**function_call)
# 创建 FunctionCall 实例,传入函数名和参数
function_call_instance = FunctionCall(
name=function_call_response.name,
arguments=function_call_response.arguments
)
# 创建工具调用列表,包括生成的 FunctionCall 实例
tool_calls = [
ChatCompletionMessageToolCall(
id=generate_id('call_', 24), # 生成唯一 ID
function=function_call_instance, # 传入函数调用实例
type="function")]
# 创建 ChatMessage 实例,内容为响应文本或空,视 tool_calls 是否存在而定
message = ChatMessage(
role="assistant",
content=None if tool_calls else response["text"], # 如果有工具调用,内容为 None
function_call=None, # 没有函数调用
tool_calls=tool_calls, # 包含工具调用列表
)
# 记录调试信息,输出消息内容
logger.debug(f"==== message ====\n{message}")
# 创建 ChatCompletionResponseChoice 实例,包含索引、消息和完成原因
choice_data = ChatCompletionResponseChoice(
index=0,
message=message, # 传入之前创建的消息
finish_reason=finish_reason, # 传入完成原因
)
# 使用模型验证响应中的使用信息
task_usage = UsageInfo.model_validate(response["usage"])
# 遍历使用信息,将其添加到使用统计中
for usage_key, usage_value in task_usage.model_dump().items():
setattr(usage, usage_key, getattr(usage, usage_key) + usage_value)
# 返回 ChatCompletionResponse 实例,包含模型、选择和使用信息
return ChatCompletionResponse(
model=request.model, # 请求的模型
choices=[choice_data], # 选择列表包含一个选择数据
object="chat.completion", # 对象类型
usage=usage # 包含使用信息
)
# 异步函数,预测流式输出
async def predict_stream(model_id, gen_params):
# 初始化输出为空字符串
output = ""
# 标记是否为函数调用
is_function_call = False
# 标记是否已经发送第一个数据块
has_send_first_chunk = False
# 获取当前时间戳
created_time = int(time.time())
# 初始化函数名称为 None
function_name = None
# 生成响应 ID,前缀为 'chatcmpl-',长度为 29
response_id = generate_id('chatcmpl-', 29)
# 生成系统指纹,前缀为 'fp_',长度为 9
system_fingerprint = generate_id('fp_', 9)
# 从生成参数中提取工具名称,若无工具则为空集合
tools = {tool['function']['name'] for tool in gen_params['tools']} if gen_params['tools'] else {}
# 初始化增量文本为空
delta_text = ""
# 如果是函数调用,返回相应格式的响应
if is_function_call:
yield ChatCompletionResponse(
model=model_id,
id=response_id,
system_fingerprint=system_fingerprint,
choices=[
ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(
content=None,
role=None,
function_call=None,
),
finish_reason="tool_calls" # 结束原因为工具调用
)],
created=created_time,
object="chat.completion.chunk", # 指定对象类型为聊天完成块
usage=None
).model_dump_json(exclude_unset=True) # 转换为 JSON 格式并排除未设置的字段
# 如果增量文本不为空,则处理增量文本
elif delta_text != "":
# 创建增量消息
message = DeltaMessage(
content="",
role="assistant",
function_call=None,
)
# 创建选择数据
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=message,
finish_reason=None # 结束原因为空
)
# 创建响应块
chunk = ChatCompletionResponse(
model=model_id,
id=response_id,
choices=[choice_data],
created=created_time,
system_fingerprint=system_fingerprint,
object="chat.completion.chunk"
)
yield chunk.model_dump_json(exclude_unset=True) # 返回响应块的 JSON 格式
# 设置结束原因为 'stop'
finish_reason = 'stop'
# 创建增量消息,包含增量文本
message = DeltaMessage(
content=delta_text,
role="assistant",
function_call=None,
)
# 清空增量文本
delta_text = ""
# 创建新的选择数据
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=message,
finish_reason=finish_reason # 设置结束原因
)
# 创建响应块
chunk = ChatCompletionResponse(
model=model_id,
id=response_id,
choices=[choice_data],
created=created_time,
system_fingerprint=system_fingerprint,
object="chat.completion.chunk"
)
yield chunk.model_dump_json(exclude_unset=True) # 返回响应块的 JSON 格式
yield '[DONE]' # 返回完成标识
else:
yield '[DONE]' # 如果没有增量文本,直接返回完成标识
# 异步函数,解析输出文本
async def parse_output_text(model_id: str, value: str, function_call: ChoiceDeltaToolCallFunction = None):
# 创建增量消息,角色为助手,内容为提供的值
delta = DeltaMessage(role="assistant", content=value)
# 如果提供了函数调用,则将其赋值给增量消息
if function_call is not None:
delta.function_call = function_call
# 创建选择数据
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=delta,
finish_reason=None # 结束原因为空
)
# 创建响应块
chunk = ChatCompletionResponse(
model=model_id,
choices=[choice_data],
object="chat.completion.chunk" # 指定对象类型为聊天完成块
)
# 生成器返回 JSON 格式的字符串,包含模型数据,排除未设置的字段
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
# 生成器返回 '[DONE]' 字符串,表示处理完成
yield '[DONE]'
# 当脚本作为主程序运行时执行以下代码
if __name__ == "__main__":
# 从命令行参数获取模型路径
MODEL_PATH = sys.argv[1]
# 加载预训练的分词器,信任远程代码
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
# 设置异步引擎的参数,包括模型路径和分词器路径
engine_args = AsyncEngineArgs(
model=MODEL_PATH,
tokenizer=MODEL_PATH,
# 设置张量并行的显卡数量,默认为1
tensor_parallel_size=1,
# 指定数据类型为半精度
dtype="half",
# 信任远程代码
trust_remote_code=True,
# 设置 GPU 显存占用比例,根据显卡显存大小调整
gpu_memory_utilization=0.9,
# 强制启用 eager 执行
enforce_eager=True,
# 禁用 Ray 工作线程
worker_use_ray=False,
# 禁用日志请求
disable_log_requests=True,
# 设置模型的最大长度
max_model_len=MAX_MODEL_LENGTH,
)
# 从引擎参数创建异步 LLM 引擎
engine = AsyncLLMEngine.from_engine_args(engine_args)
# 启动 Uvicorn 服务器,监听所有 IP 地址,端口为 8000,使用 1 个工作进程
uvicorn.run(app, host='0.0.0.0', port=8000, workers=1)
.\chatglm4-finetune\basic_demo\openai_api_request.py
"""
该脚本创建了一个 OpenAI 请求示例,用于与 glm-4-9b 模型进行交互,只需使用 OpenAI API 进行交互。
"""
# 从 openai 库导入 OpenAI 类
from openai import OpenAI
# 导入 base64 编码库
import base64
# 定义 API 的基本 URL
base_url = "http://127.0.0.1:8000/v1/"
# 创建 OpenAI 客户端,使用空的 API 密钥和自定义基本 URL
client = OpenAI(api_key="EMPTY", base_url=base_url)
# 定义与聊天模型交互的函数,接受是否使用流式响应的参数
def function_chat(use_stream=False):
# 定义消息列表,包含用户发送的消息
messages = [
{
"role": "user", "content": "What's the Celsius temperature in San Francisco?"
},
# 给出观察结果的注释示例
# {
# "role": "assistant",
# "content": None,
# "function_call": None,
# "tool_calls": [
# {
# "id": "call_1717912616815",
# "function": {
# "name": "get_current_weather",
# "arguments": "{\"location\": \"San Francisco, CA\", \"format\": \"celsius\"}"
# },
# "type": "function"
# }
# ]
# },
# {
# "tool_call_id": "call_1717912616815",
# "role": "tool",
# "name": "get_current_weather",
# "content": "23°C",
# }
]
# 定义工具列表,包含获取当前天气的工具
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather", # 工具名称
"description": "Get the current weather", # 工具描述
"parameters": { # 工具参数定义
"type": "object",
"properties": { # 参数属性
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA", # 位置参数描述
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"], # 温度单位可选项
"description": "The temperature unit to use. Infer this from the users location.", # 温度单位描述
},
},
"required": ["location", "format"], # 必需的参数
},
}
},
]
# 所有工具示例:CogView
# messages = [{"role": "user", "content": "帮我画一张天空的画画吧"}]
# tools = [{"type": "cogview"}]
# 所有工具示例:搜索工具
# messages = [{"role": "user", "content": "今天黄金的价格"}]
# tools = [{"type": "simple_browser"}]
# 调用 OpenAI API 创建聊天响应
response = client.chat.completions.create(
model="glm-4", # 指定使用的模型
messages=messages, # 传入消息列表
tools=tools, # 传入工具列表
stream=use_stream, # 是否使用流式响应
max_tokens=256, # 设置生成响应的最大 token 数量
temperature=0.9, # 设置温度以控制生成文本的多样性
presence_penalty=1.2, # 设置存在惩罚以增加话题多样性
top_p=0.1, # 设置累积概率阈值
tool_choice="auto" # 工具选择方式
)
# 检查是否有响应
if response:
# 如果使用流式响应,则逐块打印响应
if use_stream:
for chunk in response:
print(chunk)
# 否则,直接打印完整响应
else:
print(response)
# 如果没有响应,则打印错误信息
else:
print("Error:", response.status_code)
# 定义一个简单聊天函数,接受是否使用流式响应的参数
def simple_chat(use_stream=False):
# 创建一个包含系统消息和用户消息的列表
messages = [
{
# 指定角色为系统
"role": "system",
# 系统消息内容,要求输出时带有“喵喵喵”
"content": "请在你输出的时候都带上“喵喵喵”三个字,放在开头。",
},
{
# 指定角色为用户
"role": "user",
# 用户消息内容,询问身份
"content": "你是谁"
}
]
# 调用客户端的聊天完成方法,生成响应
response = client.chat.completions.create(
# 指定使用的模型
model="glm-4",
# 传入消息列表
messages=messages,
# 是否使用流式响应
stream=use_stream,
# 设置最大令牌数
max_tokens=256,
# 设置温度参数,影响生成的随机性
temperature=0.4,
# 设置存在惩罚,控制重复性
presence_penalty=1.2,
# 设置采样的累积概率
top_p=0.8,
)
# 检查响应是否存在
if response:
# 如果使用流式响应
if use_stream:
# 遍历响应的每个块并打印
for chunk in response:
print(chunk)
# 如果不是流式响应
else:
# 打印完整响应
print(response)
# 如果响应不存在,打印错误信息
else:
print("Error:", response.status_code)
# 创建聊天补全函数,接收消息列表和是否使用流的标志
def create_chat_completion(messages, use_stream=False):
# 调用客户端的聊天补全方法,创建模型响应
response = client.chat.completions.create(
model="glm-4v", # 指定使用的模型
messages=messages, # 传入消息列表
stream=use_stream, # 指定是否使用流式响应
max_tokens=256, # 设置生成的最大令牌数
temperature=0.4, # 设置生成的温度,影响随机性
presence_penalty=1.2, # 增加新主题的惩罚系数
top_p=0.8, # 设置 nucleus 采样的阈值
)
# 如果有响应,进行处理
if response:
# 如果使用流式响应,逐块打印响应
if use_stream:
for chunk in response:
print(chunk) # 打印每个数据块
# 否则打印完整的响应
else:
print(response)
# 如果没有响应,打印错误信息
else:
print("Error:", response.status_code) # 输出错误状态码
# 编码图像文件为 base64 字符串的函数
def encode_image(image_path):
"""
将图像文件编码为 base64 字符串。
参数:
image_path (str): 图像文件的路径。
此函数打开指定的图像文件,读取其内容,并将其编码为 base64 字符串。
base64 编码用于通过 HTTP 作为文本发送图像。
"""
# 以二进制读取模式打开图像文件
with open(image_path, "rb") as image_file:
# 读取文件内容并返回 base64 编码的字符串
return base64.b64encode(image_file.read()).decode("utf-8")
# 简单图像聊天的函数,涉及到图像
def glm4v_simple_image_chat(use_stream=False, img_path=None):
"""
促进简单的图像聊天互动。
参数:
use_stream (bool): 指定是否使用流式聊天响应。
img_path (str): 要包含在聊天中的图像文件路径。
此函数编码指定的图像,并构建包含该图像的预定义对话。
然后调用 `create_chat_completion` 生成模型响应。
对话包括询问图像内容及后续问题。
"""
# 将图像路径编码为 base64 URL
img_url = f"data:image/jpeg;base64,{encode_image(img_path)}"
# 构建消息列表,包括用户和助手的对话内容
messages = [
{
"role": "user", # 用户角色
"content": [
{
"type": "text", # 文本类型消息
"text": "What’s in this image?", # 用户提问
},
{
"type": "image_url", # 图像 URL 类型消息
"image_url": {
"url": img_url # 图像的 base64 URL
},
},
],
},
{
"role": "assistant", # 助手角色
"content": "The image displays a wooden boardwalk extending through a vibrant green grassy wetland. The sky is partly cloudy with soft, wispy clouds, indicating nice weather. Vegetation is seen on either side of the boardwalk, and trees are present in the background, suggesting that this area might be a natural reserve or park designed for ecological preservation and outdoor recreation. The boardwalk allows visitors to explore the area without disturbing the natural habitat.", # 助手的回答
},
{
"role": "user", # 用户角色
"content": "Do you think this is a spring or winter photo?" # 用户的后续问题
},
]
# 调用创建聊天补全的函数,生成响应
create_chat_completion(messages=messages, use_stream=use_stream)
# 主程序入口
if __name__ == "__main__":
simple_chat(use_stream=False) # 调用简单聊天函数
# function_chat(use_stream=False) # 注释掉的函数调用
# glm4v_simple_image_chat(use_stream=False, img_path="demo.jpg") # 注释掉的图像聊天调用
.\chatglm4-finetune\basic_demo\openai_api_server.py
# 导入操作系统相关的模块
import os
# 导入用于运行子进程的模块
import subprocess
# 设置模型路径,优先从环境变量获取,如果没有则使用默认值 'THUDM/glm-4-9b-chat'
MODEL_PATH = os.environ.get('MODEL_PATH', 'THUDM/glm-4-9b-chat')
# 注释掉的代码行,用于另一个模型的路径
# MODEL_PATH = os.environ.get('MODEL_PATH', 'THUDM/glm-4v-9b')
# 检查模型路径是否包含 '4v'(不区分大小写)
if '4v' in MODEL_PATH.lower():
# 如果包含 '4v',则运行 glm4v_server.py 脚本,传入模型路径
subprocess.run(["python", "glm4v_server.py", MODEL_PATH])
else:
# 如果不包含 '4v',则运行 glm_server.py 脚本,传入模型路径
subprocess.run(["python", "glm_server.py", MODEL_PATH])
Basic Demo
Read this in English.
本 demo 中,你将体验到如何使用 GLM-4-9B 开源模型进行基本的任务。
请严格按照文档的步骤进行操作,以避免不必要的错误。
设备和依赖检查
相关推理测试数据
本文档的数据均在以下硬件环境测试,实际运行环境需求和运行占用的显存略有不同,请以实际运行环境为准。
测试硬件信息:
- OS: Ubuntu 22.04
- Memory: 512GB
- Python: 3.10.12 (推荐) / 3.12.3 均已测试
- CUDA Version: 12.3
- GPU Driver: 535.104.05
- GPU: NVIDIA A100-SXM4-80GB * 8
相关推理的压力测试数据如下:
所有测试均在单张GPU上进行测试,所有显存消耗都按照峰值左右进行测算
GLM-4-9B-Chat
精度 | 显存占用 | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 19 GB | 0.2s | 27.8 tokens/s | 输入长度为 1000 |
BF16 | 21 GB | 0.8s | 31.8 tokens/s | 输入长度为 8000 |
BF16 | 28 GB | 4.3s | 14.4 tokens/s | 输入长度为 32000 |
BF16 | 58 GB | 38.1s | 3.4 tokens/s | 输入长度为 128000 |
精度 | 显存占用 | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
INT4 | 8 GB | 0.2s | 23.3 tokens/s | 输入长度为 1000 |
INT4 | 10 GB | 0.8s | 23.4 tokens/s | 输入长度为 8000 |
INT4 | 17 GB | 4.3s | 14.6 tokens/s | 输入长度为 32000 |
GLM-4-9B-Chat-1M
精度 | 显存占用 | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 75 GB | 98.4s | 2.3 tokens/s | 输入长度为 200000 |
如果您的输入超过200K,我们建议您使用vLLM后端进行多卡推理,以获得更好的性能。
GLM-4V-9B
精度 | 显存占用 | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 28 GB | 0.1s | 33.4 tokens/s | 输入长度为 1000 |
BF16 | 33 GB | 0.7s | 39.2 tokens/s | 输入长度为 8000 |
精度 | 显存占用 | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
INT4 | 10 GB | 0.1s | 28.7 tokens/s | 输入长度为 1000 |
INT4 | 15 GB | 0.8s | 24.2 tokens/s | 输入长度为 8000 |
最低硬件要求
如果您希望运行官方提供的最基础代码 (transformers 后端) 您需要:
- Python >= 3.10
- 内存不少于 32 GB
如果您希望运行官方提供的本文件夹的所有代码,您还需要:
- Linux 操作系统 (Debian 系列最佳)
- 大于 8GB 显存的,支持 CUDA 或者 ROCM 并且支持
BF16
推理的 GPU 设备。(FP16
精度无法训练,推理有小概率出现问题)
安装依赖
pip install -r requirements.txt
基础功能调用
除非特殊说明,本文件夹所有 demo 并不支持 Function Call 和 All Tools 等进阶用法
使用 transformers 后端代码
- 使用命令行与 GLM-4-9B 模型进行对话。
python trans_cli_demo.py # GLM-4-9B-Chat
python trans_cli_vision_demo.py # GLM-4V-9B
- 使用 Gradio 网页端与 GLM-4-9B 模型进行对话。
python trans_web_demo.py # GLM-4-9B-Chat
python trans_web_vision_demo.py # GLM-4V-9B
- 使用 Batch 推理。
python trans_batch_demo.py
使用 vLLM 后端代码
- 使用命令行与 GLM-4-9B-Chat 模型进行对话。
python vllm_cli_demo.py
- 在 GLM-4-9B-Chat 模型上使用带有 Lora adapter 的 vLLM
# vllm_cli_demo.py
# 添加 LORA_PATH = ''
- 自行构建服务端,并使用
OpenAI API
的请求格式与 GLM-4-9B-Chat GLM-4v-9B 或者模型进行对话。本 demo 支持 Function Call 和 All Tools功能。 - 修改
open_api_server.py
中模型路径MODEL_PATH
,可选择构建 GLM-4-9B-Chat 或者 GLM-4v-9B 服务端
启动服务端:
python openai_api_server.py
客户端请求:
python openai_api_request.py
压力测试
用户可以在自己的设备上使用本代码测试模型在 transformers后端的生成速度:
python trans_stress_test.py
使用昇腾卡运行代码
用户可以在昇腾硬件环境下运行以上代码,只需将transformers修改为openmind,将device中的cuda设备修改为npu:
#from transformers import AutoModelForCausalLM, AutoTokenizer
from openmind import AutoModelForCausalLM, AutoTokenizer
#device = 'cuda'
device = 'npu'
Basic Demo
In this demo, you will experience how to use the GLM-4-9B open source model to perform basic tasks.
Please follow the steps in the document strictly to avoid unnecessary errors.
Device and dependency check
Related inference test data
The data in this document are tested in the following hardware environment. The actual operating environment
requirements and the GPU memory occupied by the operation are slightly different. Please refer to the actual operating
environment.
Test hardware information:
- OS: Ubuntu 22.04
- Memory: 512GB
- Python: 3.10.12 (recommend) / 3.12.3 have been tested
- CUDA Version: 12.3
- GPU Driver: 535.104.05
- GPU: NVIDIA A100-SXM4-80GB * 8
The stress test data of relevant inference are as follows:
All tests are performed on a single GPU, and all GPU memory consumption is calculated based on the peak value
GLM-4-9B-Chat
Dtype | GPU Memory | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 19 GB | 0.2s | 27.8 tokens/s | Input length is 1000 |
BF16 | 21 GB | 0.8s | 31.8 tokens/s | Input length is 8000 |
BF16 | 28 GB | 4.3s | 14.4 tokens/s | Input length is 32000 |
BF16 | 58 GB | 38.1s | 3.4 tokens/s | Input length is 128000 |
Dtype | GPU Memory | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
INT4 | 8 GB | 0.2s | 23.3 tokens/s | Input length is 1000 |
INT4 | 10 GB | 0.8s | 23.4 tokens/s | Input length is 8000 |
INT4 | 17 GB | 4.3s | 14.6 tokens/s | Input length is 32000 |
GLM-4-9B-Chat-1M
Dtype | GPU Memory | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 74497MiB | 98.4s | 2.3653 tokens/s | Input length is 200000 |
If your input exceeds 200K, we recommend that you use the vLLM backend with multi gpus for inference to get better
performance.
GLM-4V-9B
Dtype | GPU Memory | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
BF16 | 28 GB | 0.1s | 33.4 tokens/s | Input length is 1000 |
BF16 | 33 GB | 0.7s | 39.2 tokens/s | Input length is 8000 |
Dtype | GPU Memory | Prefilling | Decode Speed | Remarks |
---|---|---|---|---|
INT4 | 10 GB | 0.1s | 28.7 tokens/s | Input length is 1000 |
INT4 | 15 GB | 0.8s | 24.2 tokens/s | Input length is 8000 |
Minimum hardware requirements
If you want to run the most basic code provided by the official (transformers backend) you need:
- Python >= 3.10
- Memory of at least 32 GB
If you want to run all the codes in this folder provided by the official, you also need:
- Linux operating system (Debian series is best)
- GPU device with more than 8GB GPU memory, supporting CUDA or ROCM and supporting
BF16
reasoning (FP16
precision
cannot be finetuned, and there is a small probability of problems in infering)
Install dependencies
pip install -r requirements.txt
Basic function calls
**Unless otherwise specified, all demos in this folder do not support advanced usage such as Function Call and All Tools
**
Use transformers backend code
- Use the command line to communicate with the GLM-4-9B model.
python trans_cli_demo.py # GLM-4-9B-Chat
python trans_cli_vision_demo.py # GLM-4V-9B
- Use the Gradio web client to communicate with the GLM-4-9B model.
python trans_web_demo.py # GLM-4-9B-Chat
python trans_web_vision_demo.py # GLM-4V-9B
- Use Batch inference.
python trans_batch_demo.py
Use vLLM backend code
- Use the command line to communicate with the GLM-4-9B-Chat model.
python vllm_cli_demo.py
- use LoRA adapters with vLLM on GLM-4-9B-Chat model.
# vllm_cli_demo.py
# add LORA_PATH = ''
- Build the server by yourself and use the request format of
OpenAI API
to communicate with the glm-4-9b model. This
demo supports Function Call and All Tools functions. - Modify the
MODEL_PATH
inopen_api_server.py
, and you can choose to build the GLM-4-9B-Chat or GLM-4v-9B server side.
Start the server:
python openai_api_server.py
Client request:
python openai_api_request.py
Stress test
Users can use this code to test the generation speed of the model on the transformers backend on their own devices:
python trans_stress_test.py
Use Ascend card to run code
Users can run the above code in the Ascend hardware environment. They only need to change the transformers to openmind and the cuda device in device to npu.
#from transformers import AutoModelForCausalLM, AutoTokenizer
from openmind import AutoModelForCausalLM, AutoTokenizer
#device = 'cuda'
device = 'npu'
.\chatglm4-finetune\basic_demo\trans_batch_demo.py
"""
# 示例文档,说明如何使用批量请求 glm-4-9b,
# 这里需要自己构建对话格式,然后调用批量函数以进行批量请求。
# 请注意,此演示中内存消耗显著增加。
"""
# 导入所需的类型和库
from typing import Optional, Union
from transformers import AutoModel, AutoTokenizer, LogitsProcessorList
# 定义模型的路径
MODEL_PATH = 'THUDM/glm-4-9b-chat'
# 从预训练模型加载分词器
tokenizer = AutoTokenizer.from_pretrained(
MODEL_PATH,
trust_remote_code=True, # 信任远程代码
encode_special_tokens=True # 编码特殊标记
)
# 从预训练模型加载模型,并设置为评估模式
model = AutoModel.from_pretrained(MODEL_PATH, trust_remote_code=True, device_map="auto").eval()
# 定义处理模型输出的函数
def process_model_outputs(inputs, outputs, tokenizer):
responses = [] # 存储响应列表
# 遍历输入 ID 和输出 ID
for input_ids, output_ids in zip(inputs.input_ids, outputs):
# 解码输出 ID 为响应文本,去掉特殊标记
response = tokenizer.decode(output_ids[len(input_ids):], skip_special_tokens=True).strip()
responses.append(response) # 添加响应到列表
return responses # 返回所有响应
# 定义批量处理函数
def batch(
model,
tokenizer,
messages: Union[str, list[str]], # 输入消息可以是字符串或字符串列表
max_input_tokens: int = 8192, # 最大输入标记数
max_new_tokens: int = 8192, # 最大生成的新标记数
num_beams: int = 1, # 光束搜索的数量
do_sample: bool = True, # 是否进行采样
top_p: float = 0.8, # Top-p 采样的阈值
temperature: float = 0.8, # 温度控制生成的多样性
logits_processor: Optional[LogitsProcessorList] = LogitsProcessorList(), # 日志处理器
):
# 将字符串消息转换为列表格式
messages = [messages] if isinstance(messages, str) else messages
# 使用分词器编码消息,并返回张量,填充到最大长度
batched_inputs = tokenizer(messages, return_tensors="pt", padding="max_length", truncation=True,
max_length=max_input_tokens).to(model.device)
# 定义生成的参数
gen_kwargs = {
"max_new_tokens": max_new_tokens,
"num_beams": num_beams,
"do_sample": do_sample,
"top_p": top_p,
"temperature": temperature,
"logits_processor": logits_processor,
"eos_token_id": model.config.eos_token_id # 获取模型的结束标记 ID
}
# 生成模型输出
batched_outputs = model.generate(**batched_inputs, **gen_kwargs)
# 处理模型输出以获取响应
batched_response = process_model_outputs(batched_inputs, batched_outputs, tokenizer)
return batched_response # 返回批量响应
# 主程序入口
if __name__ == "__main__":
# 定义批量消息的示例
batch_message = [
[
{"role": "user", "content": "我的爸爸和妈妈结婚为什么不能带我去"}, # 用户提问
{"role": "assistant", "content": "因为他们结婚时你还没有出生"}, # 助手回答
{"role": "user", "content": "我刚才的提问是"} # 用户提问
],
[
{"role": "user", "content": "你好,你是谁"} # 用户提问
]
]
batch_inputs = [] # 存储批量输入
max_input_tokens = 1024 # 初始化最大输入标记数
# 遍历批量消息,构建输入
for i, messages in enumerate(batch_message):
# 使用分词器应用聊天模板,添加生成提示
new_batch_input = tokenizer.apply_chat_template(messages, add_generation_prompt=True, tokenize=False)
# 更新最大输入标记数
max_input_tokens = max(max_input_tokens, len(new_batch_input))
batch_inputs.append(new_batch_input) # 添加到批量输入列表
# 定义生成的参数
gen_kwargs = {
"max_input_tokens": max_input_tokens,
"max_new_tokens": 8192,
"do_sample": True,
"top_p": 0.8,
"temperature": 0.8,
"num_beams": 1,
}
# 调用批量处理函数,生成响应
batch_responses = batch(model, tokenizer, batch_inputs, **gen_kwargs)
# 遍历批量响应列表中的每个响应
for response in batch_responses:
# 打印十个等号,作为分隔符
print("=" * 10)
# 打印当前响应的内容
print(response)
.\chatglm4-finetune\basic_demo\trans_cli_demo.py
"""
# 该脚本创建一个 CLI 演示,使用 transformers 后端的 glm-4-9b 模型,
# 允许用户通过命令行接口与模型进行交互。
# 用法:
# - 运行脚本以启动 CLI 演示。
# - 通过输入问题与模型进行互动,并接收响应。
# 注意:该脚本包含处理 markdown 到纯文本转换的修改,
# 确保 CLI 接口正确显示格式化文本。
# 如果使用闪存注意力,您应该安装 flash-attn 并在模型加载中添加 attn_implementation="flash_attention_2"。
"""
# 导入操作系统模块
import os
# 导入 PyTorch 库
import torch
# 从 threading 模块导入 Thread 类
from threading import Thread
# 从 transformers 模块导入相关类
from transformers import AutoTokenizer, StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer, AutoModel
# 从环境变量获取模型路径,默认值为 'THUDM/glm-4-9b-chat'
MODEL_PATH = os.environ.get('MODEL_PATH', 'THUDM/glm-4-9b-chat')
## 如果使用 peft 模型。
# def load_model_and_tokenizer(model_dir, trust_remote_code: bool = True):
# # 检查适配器配置文件是否存在
# if (model_dir / 'adapter_config.json').exists():
# # 从预训练模型加载模型,设置为自动设备映射
# model = AutoModel.from_pretrained(
# model_dir, trust_remote_code=trust_remote_code, device_map='auto'
# )
# # 获取基本模型名称
# tokenizer_dir = model.peft_config['default'].base_model_name_or_path
# else:
# # 从预训练模型加载模型,设置为自动设备映射
# model = AutoModel.from_pretrained(
# model_dir, trust_remote_code=trust_remote_code, device_map='auto'
# )
# # 将模型目录作为 tokenizer 目录
# tokenizer_dir = model_dir
# # 从预训练 tokenizer 目录加载 tokenizer
# tokenizer = AutoTokenizer.from_pretrained(
# tokenizer_dir, trust_remote_code=trust_remote_code, use_fast=False
# )
# # 返回模型和 tokenizer
# return model, tokenizer
# 从预训练模型路径加载 tokenizer,允许远程代码
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
# 从预训练模型路径加载模型,允许远程代码,设置为自动设备映射并评估模式
model = AutoModel.from_pretrained(
MODEL_PATH,
trust_remote_code=True,
# attn_implementation="flash_attention_2", # 使用闪存注意力
# torch_dtype=torch.bfloat16, # 使用闪存注意力必须使用 bfloat16 或 float16
device_map="auto").eval()
# 定义 StopOnTokens 类,继承自 StoppingCriteria
class StopOnTokens(StoppingCriteria):
# 定义调用方法,检查是否满足停止条件
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
# 获取模型的结束标记 ID
stop_ids = model.config.eos_token_id
# 遍历停止 ID 列表
for stop_id in stop_ids:
# 如果最后一个输入 ID 等于停止 ID,则返回 True
if input_ids[0][-1] == stop_id:
return True
# 否则返回 False
return False
# 如果该脚本是主程序,则执行以下代码
if __name__ == "__main__":
# 初始化历史记录列表
history = []
# 设置最大输入长度
max_length = 8192
# 设置 top_p 参数
top_p = 0.8
# 设置温度参数
temperature = 0.6
# 实例化停止条件类
stop = StopOnTokens()
# 打印欢迎信息
print("Welcome to the GLM-4-9B CLI chat. Type your messages below.")
# 无限循环,直到用户输入退出指令
while True:
# 获取用户输入
user_input = input("\nYou: ")
# 如果用户输入为“退出”或“结束”,则跳出循环
if user_input.lower() in ["exit", "quit"]:
break
# 将用户输入添加到历史记录中,模型回复先留空
history.append([user_input, ""])
# 初始化消息列表
messages = []
# 遍历历史记录,获取用户和模型消息
for idx, (user_msg, model_msg) in enumerate(history):
# 如果是最后一条消息且模型回复为空,添加用户消息
if idx == len(history) - 1 and not model_msg:
messages.append({"role": "user", "content": user_msg})
break
# 如果用户消息存在,添加到消息列表
if user_msg:
messages.append({"role": "user", "content": user_msg})
# 如果模型消息存在,添加到消息列表
if model_msg:
messages.append({"role": "assistant", "content": model_msg})
# 将消息列表转换为模型输入格式
model_inputs = tokenizer.apply_chat_template(
messages,
add_generation_prompt=True, # 添加生成提示
tokenize=True, # 启用分词
return_tensors="pt" # 返回 PyTorch 张量格式
).to(model.device) # 移动到模型设备
# 创建文本流处理器
streamer = TextIteratorStreamer(
tokenizer=tokenizer, # 使用的分词器
timeout=60, # 设置超时时间
skip_prompt=True, # 跳过提示
skip_special_tokens=True # 跳过特殊标记
)
# 设置生成参数
generate_kwargs = {
"input_ids": model_inputs, # 模型输入 ID
"streamer": streamer, # 文本流处理器
"max_new_tokens": max_length, # 生成的最大新 token 数量
"do_sample": True, # 启用采样
"top_p": top_p, # 采样阈值
"temperature": temperature, # 温度参数
"stopping_criteria": StoppingCriteriaList([stop]), # 停止标准
"repetition_penalty": 1.2, # 重复惩罚
"eos_token_id": model.config.eos_token_id, # 结束 token ID
}
# 创建一个新线程来生成模型输出
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start() # 启动线程
# 打印模型输出提示
print("GLM-4:", end="", flush=True)
# 从流处理器中获取新生成的 token
for new_token in streamer:
if new_token:
# 打印生成的 token
print(new_token, end="", flush=True)
# 将新 token 添加到最后一条历史记录的模型回复中
history[-1][1] += new_token
# 清理最后一条模型回复的前后空格
history[-1][1] = history[-1][1].strip()
.\chatglm4-finetune\basic_demo\trans_cli_vision_demo.py
"""
# 此脚本创建一个命令行界面(CLI)演示,使用 transformers 后端,针对 glm-4v-9b 模型,
# 允许用户通过命令行界面与模型进行交互。
# 用法:
# - 运行脚本以启动 CLI 演示。
# - 通过输入问题与模型互动并接收响应。
# 注意:脚本包括一个处理 markdown 转换为纯文本的修改,
# 确保 CLI 界面正确显示格式化文本。
"""
# 导入操作系统模块
import os
# 导入 PyTorch 库
import torch
# 从 threading 模块导入 Thread 类
from threading import Thread
# 从 transformers 库导入相关类和函数
from transformers import (
AutoTokenizer, # 自动加载预训练的分词器
StoppingCriteria, # 停止标准类
StoppingCriteriaList, # 停止标准列表类
TextIteratorStreamer, # 文本迭代流处理器
AutoModel, # 自动加载预训练模型
BitsAndBytesConfig # 位和字节配置类
)
# 从 PIL 导入图像处理库
from PIL import Image
# 从环境变量中获取模型路径,默认值为 'THUDM/glm-4v-9b'
MODEL_PATH = os.environ.get('MODEL_PATH', 'THUDM/glm-4v-9b')
# 从预训练模型加载分词器
tokenizer = AutoTokenizer.from_pretrained(
MODEL_PATH, # 指定模型路径
trust_remote_code=True, # 信任远程代码
encode_special_tokens=True # 编码特殊标记
)
# 从预训练模型加载模型,并配置相关参数
model = AutoModel.from_pretrained(
MODEL_PATH, # 指定模型路径
trust_remote_code=True, # 信任远程代码
# attn_implementation="flash_attention_2", # 使用闪电注意力(被注释掉)
torch_dtype=torch.bfloat16, # 设置张量数据类型为 bfloat16
device_map="auto", # 自动选择设备映射
).eval() # 将模型设置为评估模式
## 针对 INT4 推理的配置
# model = AutoModel.from_pretrained(
# MODEL_PATH, # 指定模型路径
# trust_remote_code=True, # 信任远程代码
# quantization_config=BitsAndBytesConfig(load_in_4bit=True), # 配置为 4 位量化
# torch_dtype=torch.bfloat16, # 设置张量数据类型为 bfloat16
# low_cpu_mem_usage=True # 低 CPU 内存使用模式
# ).eval() # 将模型设置为评估模式
# 定义停止标准类
class StopOnTokens(StoppingCriteria):
# 重写 __call__ 方法以定义停止条件
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
stop_ids = model.config.eos_token_id # 获取模型的结束标记 ID
for stop_id in stop_ids: # 遍历结束标记 ID
if input_ids[0][-1] == stop_id: # 检查输入的最后一个 ID 是否为结束标记
return True # 如果是,返回 True 表示停止
return False # 否则返回 False 表示继续
# 主程序入口
if __name__ == "__main__":
history = [] # 初始化对话历史列表
max_length = 1024 # 设置最大输入长度
top_p = 0.8 # 设置 top-p 采样值
temperature = 0.6 # 设置温度控制生成文本的随机性
stop = StopOnTokens() # 创建停止标准实例
uploaded = False # 初始化上传状态
image = None # 初始化图像变量
print("Welcome to the GLM-4-9B CLI chat. Type your messages below.") # 打印欢迎信息
image_path = input("Image Path:") # 提示用户输入图像路径
try:
image = Image.open(image_path).convert("RGB") # 尝试打开图像并转换为 RGB 模式
except: # 捕获任何异常
print("Invalid image path. Continuing with text conversation.") # 打印错误信息并继续文本对话
# 无限循环,直到用户选择退出
while True:
# 获取用户输入
user_input = input("\nYou: ")
# 如果用户输入是 "exit" 或 "quit",则退出循环
if user_input.lower() in ["exit", "quit"]:
break
# 将用户输入添加到历史记录中,助手消息暂时为空
history.append([user_input, ""])
# 初始化消息列表
messages = []
# 遍历历史记录,获取用户和助手的消息
for idx, (user_msg, model_msg) in enumerate(history):
# 如果是最后一条用户消息且助手消息为空
if idx == len(history) - 1 and not model_msg:
# 添加用户消息到消息列表
messages.append({"role": "user", "content": user_msg})
# 如果有图片且尚未上传,则将图片添加到消息中
if image and not uploaded:
messages[-1].update({"image": image})
# 标记图片已上传
uploaded = True
# 结束当前循环
break
# 如果用户消息存在,添加到消息列表
if user_msg:
messages.append({"role": "user", "content": user_msg})
# 如果助手消息存在,添加到消息列表
if model_msg:
messages.append({"role": "assistant", "content": model_msg})
# 使用 tokenizer 处理消息,准备模型输入
model_inputs = tokenizer.apply_chat_template(
messages,
# 添加生成提示
add_generation_prompt=True,
# 启用分词
tokenize=True,
# 返回 PyTorch 张量
return_tensors="pt",
# 返回字典格式
return_dict=True
).to(next(model.parameters()).device) # 将输入移动到模型所在的设备
# 创建一个文本流迭代器,用于生成文本
streamer = TextIteratorStreamer(
tokenizer=tokenizer,
# 设置超时
timeout=60,
# 跳过提示
skip_prompt=True,
# 跳过特殊字符
skip_special_tokens=True
)
# 准备生成的参数
generate_kwargs = {
# 合并模型输入参数
**model_inputs,
# 指定使用流式生成
"streamer": streamer,
# 设置最大生成的 token 数量
"max_new_tokens": max_length,
# 启用采样
"do_sample": True,
# 设置 nucleus 采样的概率阈值
"top_p": top_p,
# 设置生成温度
"temperature": temperature,
# 设置停止条件
"stopping_criteria": StoppingCriteriaList([stop]),
# 设置重复惩罚
"repetition_penalty": 1.2,
# 指定结束 token 的 ID
"eos_token_id": [151329, 151336, 151338],
}
# 创建并启动线程以生成文本
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
# 打印生成模型的提示
print("GLM-4V:", end="", flush=True)
# 从流中读取新生成的 token
for new_token in streamer:
# 如果新 token 存在
if new_token:
# 打印新 token
print(new_token, end="", flush=True)
# 将新 token 添加到历史记录的助手消息中
history[-1][1] += new_token
# 清理助手消息,去掉首尾空白
history[-1][1] = history[-1][1].strip()
.\chatglm4-finetune\basic_demo\trans_stress_test.py
# 导入所需的库
import argparse # 用于解析命令行参数
import time # 用于时间操作
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, BitsAndBytesConfig # 导入模型和tokenizer
import torch # 导入PyTorch库
from threading import Thread # 导入线程支持库
MODEL_PATH = 'THUDM/glm-4-9b-chat' # 定义模型路径
def stress_test(token_len, n, num_gpu): # 定义压力测试函数,接收token长度、数量和GPU数量
# 确定使用的设备,优先选择GPU
device = torch.device(f"cuda:{num_gpu - 1}" if torch.cuda.is_available() and num_gpu > 0 else "cpu")
# 加载tokenizer,并设置相关参数
tokenizer = AutoTokenizer.from_pretrained(
MODEL_PATH,
trust_remote_code=True,
padding_side="left"
)
# 加载预训练的因果语言模型,设置为评估模式并转移到指定设备
model = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
trust_remote_code=True,
torch_dtype=torch.bfloat16
).to(device).eval()
# 使用INT4权重推理的代码块(注释掉的部分)
# model = AutoModelForCausalLM.from_pretrained(
# MODEL_PATH,
# trust_remote_code=True,
# quantization_config=BitsAndBytesConfig(load_in_4bit=True),
# low_cpu_mem_usage=True,
# ).eval()
times = [] # 用于记录每次生成的时间
decode_times = [] # 用于记录解码时间
print("Warming up...") # 输出热身提示
vocab_size = tokenizer.vocab_size # 获取词汇表大小
warmup_token_len = 20 # 设置热身阶段token的长度
# 随机生成token ID,范围在3到vocab_size - 200之间
random_token_ids = torch.randint(3, vocab_size - 200, (warmup_token_len - 5,), dtype=torch.long)
start_tokens = [151331, 151333, 151336, 198] # 定义起始token ID
end_tokens = [151337] # 定义结束token ID
# 创建输入ID张量,并添加起始和结束token
input_ids = torch.tensor(start_tokens + random_token_ids.tolist() + end_tokens, dtype=torch.long).unsqueeze(0).to(
device)
# 创建注意力掩码
attention_mask = torch.ones_like(input_ids, dtype=torch.bfloat16).to(device)
# 创建位置ID张量
position_ids = torch.arange(len(input_ids[0]), dtype=torch.bfloat16).unsqueeze(0).to(device)
# 将输入准备为字典格式
warmup_inputs = {
'input_ids': input_ids,
'attention_mask': attention_mask,
'position_ids': position_ids
}
# 禁用梯度计算以节省内存
with torch.no_grad():
# 生成模型输出,执行热身操作
_ = model.generate(
input_ids=warmup_inputs['input_ids'],
attention_mask=warmup_inputs['attention_mask'],
max_new_tokens=2048, # 设置生成的最大token数
do_sample=False, # 不进行随机采样
repetition_penalty=1.0, # 设置重复惩罚
eos_token_id=[151329, 151336, 151338] # 定义结束token ID
)
print("Warming up complete. Starting stress test...") # 输出热身完成提示
# 循环 n 次以生成多组输入
for i in range(n):
# 生成随机的 token ID,范围在 3 到 vocab_size - 200 之间,长度为 token_len - 5
random_token_ids = torch.randint(3, vocab_size - 200, (token_len - 5,), dtype=torch.long)
# 将开始 token、随机 token 和结束 token 合并为输入 ID,并转换为张量,增加维度并转移到指定设备
input_ids = torch.tensor(start_tokens + random_token_ids.tolist() + end_tokens, dtype=torch.long).unsqueeze(
0).to(device)
# 创建与 input_ids 相同形状的注意力掩码,初始值为 1
attention_mask = torch.ones_like(input_ids, dtype=torch.bfloat16).to(device)
# 生成位置 ID,表示每个 token 的位置,转换为张量并转移到指定设备
position_ids = torch.arange(len(input_ids[0]), dtype=torch.bfloat16).unsqueeze(0).to(device)
# 创建测试输入字典,包含 input_ids、attention_mask 和 position_ids
test_inputs = {
'input_ids': input_ids,
'attention_mask': attention_mask,
'position_ids': position_ids
}
# 初始化文本流迭代器,设置超时和是否跳过提示及特殊标记
streamer = TextIteratorStreamer(
tokenizer=tokenizer,
timeout=36000,
skip_prompt=True,
skip_special_tokens=True
)
# 设置生成文本的参数
generate_kwargs = {
"input_ids": test_inputs['input_ids'],
"attention_mask": test_inputs['attention_mask'],
"max_new_tokens": 512,
"do_sample": False,
"repetition_penalty": 1.0,
"eos_token_id": [151329, 151336, 151338],
"streamer": streamer
}
# 记录开始时间
start_time = time.time()
# 创建并启动线程,调用模型的生成方法
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
# 初始化第一个 token 时间和所有 token 时间的列表
first_token_time = None
all_token_times = []
# 遍历生成的每个 token
for token in streamer:
current_time = time.time() # 获取当前时间
if first_token_time is None: # 如果是第一个 token,记录时间
first_token_time = current_time
times.append(first_token_time - start_time) # 计算并存储填充时间
all_token_times.append(current_time) # 记录所有 token 的时间
t.join() # 等待生成线程结束
end_time = time.time() # 记录结束时间
# 计算每个 token 的平均解码时间
avg_decode_time_per_token = len(all_token_times) / (end_time - first_token_time) if all_token_times else 0
decode_times.append(avg_decode_time_per_token) # 存储平均解码时间
# 打印当前迭代的填充时间和平均解码时间
print(
f"Iteration {i + 1}/{n} - Prefilling Time: {times[-1]:.4f} seconds - Average Decode Time: {avg_decode_time_per_token:.4f} tokens/second")
# 清空 CUDA 缓存
torch.cuda.empty_cache()
# 计算 n 次迭代的平均第一个 token 时间
avg_first_token_time = sum(times) / n
# 计算 n 次迭代的平均解码时间
avg_decode_time = sum(decode_times) / n
# 打印总体平均时间
print(f"\nAverage First Token Time over {n} iterations: {avg_first_token_time:.4f} seconds")
print(f"Average Decode Time per Token over {n} iterations: {avg_decode_time:.4f} tokens/second")
# 返回填充时间、平均第一个 token 时间、解码时间和平均解码时间
return times, avg_first_token_time, decode_times, avg_decode_time
# 主函数,用于执行模型推理的压力测试
def main():
# 创建解析器,提供程序描述
parser = argparse.ArgumentParser(description="Stress test for model inference")
# 添加参数,指定每次测试的标记数量,默认为1000
parser.add_argument('--token_len', type=int, default=1000, help='Number of tokens for each test')
# 添加参数,指定压力测试的迭代次数,默认为3
parser.add_argument('--n', type=int, default=3, help='Number of iterations for the stress test')
# 添加参数,指定用于推理的GPU数量,默认为1
parser.add_argument('--num_gpu', type=int, default=1, help='Number of GPUs to use for inference')
# 解析命令行参数并将其存储在args中
args = parser.parse_args()
# 从args中获取标记长度
token_len = args.token_len
# 从args中获取迭代次数
n = args.n
# 从args中获取GPU数量
num_gpu = args.num_gpu
# 调用压力测试函数,传入标记长度、迭代次数和GPU数量
stress_test(token_len, n, num_gpu)
# 如果当前模块是主模块,则调用main函数
if __name__ == "__main__":
main()