让模型服务在云中更高效。
介绍
Mosec 是一个高性能和灵活的模型服务框架,用于构建支持机器学习模型的后端和微服务。它弥合了你刚刚训练的任何机器学习模型与高效在线服务 API 之间的差距。
- 高性能:使用 Rust 🦀 构建的网络层和任务协调,不仅速度飞快而且利用高效的异步 I/O 实现 CPU 利用率的优化
- 易于使用:纯 Python 🐍 用户界面,用户可以使用相同的离线测试代码以与 ML 框架无关的方式来服务他们的模型
- 动态批处理:从不同用户聚合请求进行批处理推理并返回结果
- 流水线阶段:为流水线阶段生成多个进程,以处理 CPU/GPU/IO 混合工作负载
- 云友好:设计为在云中运行,具有模型热启动、优雅关闭和 Prometheus 监控指标,可通过 Kubernetes 或任何容器编排系统轻松管理
- 专注于一件事:专注于在线服务部分,用户可以关注模型优化和业务逻辑
安装
Mosec 需要 Python 3.7 及以上版本。安装最新的 PyPI 包 适用于 Linux x86_64 或 macOS x86_64/ARM64:
pip install -U mosec
# 或使用 conda 安装
conda install conda-forge::mosec
如果要从源码构建,安装 Rust 并运行以下命令:
make package
你将在 dist
文件夹中得到一个 mosec 的 wheel 文件。
用法
我们演示了 Mosec 如何帮助你轻松将预训练的 Stable Diffusion 模型托管为服务。你需要安装 diffusers 和 transformers 作为前提条件:
pip install --upgrade diffusers[torch] transformers
编写服务器
点击查看带有解释的服务器代码。
首先,我们导入库并设置一个基本的日志记录器,以更好地观察所发生的情况。
from io import BytesIO
from typing import List
import torch # type: ignore
from diffusers import StableDiffusionPipeline # type: ignore
from mosec import Server, Worker, get_logger
from mosec.mixin import MsgpackMixin
logger = get_logger()
然后,我们仅通过三步为客户端构建一个 API,供其查询文本提示并基于 stable-diffusion-v1-5 模型 获取图像。
-
定义你的服务为继承
mosec.Worker
的类。这里我们还继承MsgpackMixin
以采用 msgpack 序列化格式。 -
在
__init__
方法中初始化你的模型,并将其放到相应的设备上。你可以选择性地将self.example
分配一些数据来预热模型。注意,数据应与你接下来描述的 handler 的输入格式兼容。 -
重写
forward
方法来编写你的服务 handler,其拼写为forward(self, data: Any | List[Any]) -> Any | List[Any]
。接收/返回单一项或元组取决于是否配置了动态批处理。
class StableDiffusion(MsgpackMixin, Worker):
def __init__(self):
self.pipe = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16
)
device = "cuda" if torch.cuda.is_available() else "cpu"
self.pipe = self.pipe.to(device)
self.example = ["useless example prompt"] * 4 # 预热 (batch_size=4)
def forward(self, data: List[str]) -> List[memoryview]:
logger.debug("生成图像 %s", data)
res = self.pipe(data)
logger.debug("NSFW: %s", res[1])
images = []
for img in res[0]:
dummy_file = BytesIO()
img.save(dummy_file, format="JPEG")
images.append(dummy_file.getbuffer())
return images
[!提示]
(a) 在这个示例中我们返回图像的二进制格式,JSON 不支持这个格式(除非用 base64 编码,这会使得有效负载更大)。因此,msgpack 更符合我们的需求。如果我们不继承
MsgpackMixin
,默认情况下将使用 JSON。换句话说,服务请求/响应协议可以是 msgpack、JSON 或任何其他格式(查看我们的mixins)。(b) 预热通常有助于提前分配 GPU 内存。如果指定了预热示例,服务将在示例通过 handler 后才会准备就绪。然而,如果未给定示例,首次请求的延迟预计会更长。
example
应设置为单一项或元组,具体取决于forward
期望收到的内容。此外,如果你希望使用多个不同的示例进行预热,可以设置multi_examples
(示例在这里)。(c) 此示例展示了单阶段服务,其中
StableDiffusion
工作者直接接收客户端提示请求并返回图像。因此,forward
可视为完整的服务 handler。然而,我们也可以设计多阶段服务,工人执行不同的任务(例如:下载图像、模型推理、后处理)在流水线中。在这种情况下,整个流水线视为服务 handler,第一位工人接收请求,最后一位工人发送响应。工人之间的数据流通过进程间通信完成。(d) 由于在这个例子中启用了动态批处理,
forward
方法将希望接收到一个 字符串列表,例如:['一只可爱的小猫玩着一个红球','一个坐在电脑前的男人',...]
,从不同客户端聚合来用于 批处理推理,提高系统吞吐量。
最后,我们将工人添加到服务器中,构建一个单阶段工作流程(多个阶段可以流水线化以进一步提高吞吐量,参见这个示例),并指定我们希望它并行运行的进程数(num=1
)及最大批处理大小(max_batch_size=4
,动态批处理在超时之前将累积的最大请求数量;超时由 max_wait_time=10
以毫秒为单位定义,即 Mosec 等待批处理发送到 Worker 的最长时间)。
if __name__ == "__main__":
server = Server()
# 1) `num` 指定了将生成的并行运行的进程数量。
# 2) 通过将 `max_batch_size` 配置为值 > 1, 你在 `forward` 函数中的输入数据将是一个列表(批处理);否则,它是一个单一项。
server.append_worker(StableDiffusion, num=1, max_batch_size=4, max_wait_time=10)
server.run()
运行服务器
点击查看如何运行和查询服务器。
以上代码段在我们的示例文件中已合并。你可以直接在项目根目录下运行。我们首先来看一下命令行参数(解释在这里):
python examples/stable_diffusion/server.py --help
然后让我们以调试日志启动服务器:
python examples/stable_diffusion/server.py --log-level debug --timeout 30000
在浏览器中打开 http://127.0.0.1:8000/openapi/swagger/
以获取 OpenAPI 文档。
并在另一个终端中测试:
python examples/stable_diffusion/client.py --prompt "一只可爱的小猫玩着一个红球" --output cat.jpg --port 8000
你将在当前目录中得到一个名为 "cat.jpg" 的图像。
你可以查看指标:
curl http://127.0.0.1:8000/metrics
就是这样!你已经将你的 稳定扩散模型 托管为服务!😉
示例
更多现成可用的示例可以在 示例 部分找到。包括:
- 管道: 一个简单的回声演示,甚至没有任何机器学习模型。
- 请求验证: 使用类型注解验证请求。
- 多路由: 在一个服务中提供多个模型。
- 嵌入服务: 兼容OpenAI的嵌入服务。
- 重新排序服务: 根据查询对段落列表进行重新排序。
- 共享内存IPC: 使用共享内存的进程间通信。
- 定制的GPU分配: 部署多个副本,每个使用不同的GPU。
- 定制化指标: 记录您自己的监控指标。
- Jax即时推理: 即时编译加速推理。
- PyTorch深度学习模型:
配置
- 动态批处理
max_batch_size
和max_wait_time (毫秒)
在调用append_worker
时配置。- 确保使用
max_batch_size
值进行推理不会导致GPU内存不足。 - 通常,
max_wait_time
应小于批处理推理时间。 - 如果启用,它将在累积请求数达到
max_batch_size
或max_wait_time
过去时收集一批请求。当流量高时,服务将从此功能中受益。
- 检查参数文档以获取其他配置。
部署
- 如果您正在寻找一个安装了
mosec
的GPU基础镜像,您可以查看官方镜像mosecorg/mosec
。对于复杂的用例,请参考envd。 - 这个服务不需要Gunicorn或NGINX,但在必要时您当然可以使用入口控制器。
- 该服务应该是容器中的PID 1进程,因为它控制多个进程。如果需要在一个容器中运行多个进程,您将需要一个监督进程。您可以选择Supervisor或Horust。
- 记得收集指标。
mosec_service_batch_size_bucket
显示批处理大小分布。mosec_service_batch_duration_second_bucket
显示每个阶段每个连接的动态批处理持续时间(从接收第一个任务开始)。mosec_service_process_duration_second_bucket
显示每个阶段每个连接的处理持续时间(包括IPC时间但不包括mosec_service_batch_duration_second_bucket
)。mosec_service_remaining_task
显示当前处理任务的数量。mosec_service_throughput
显示服务吞吐量。
- 使用
SIGINT
(CTRL+C
) 或SIGTERM
(kill {PID}
) 停止服务,因为它具有优雅的关闭逻辑。
性能调优
- 找出适合您的推理服务的最佳
max_batch_size
和max_wait_time
。指标将显示实际批量大小和批处理持续时间的直方图。这些是调整这两个参数的关键信息。 - 尝试将整个推理过程拆分为独立的CPU和GPU阶段(参考DistilBERT)。不同阶段将在一个数据管道中运行,这将保持GPU的繁忙状态。
- 您还可以调整每个阶段的工作者数量。例如,如果您的管道包括一个用于预处理的CPU阶段和一个用于模型推理的GPU阶段,增加CPU阶段的工作者数量可以帮助生成更多数据以便在GPU阶段进行批处理推理;增加GPU阶段的工作者数量可以充分利用GPU内存和计算能力。这两种方式可能有助于更高的GPU利用率,从而导致更高的服务吞吐量。
- 对于多阶段服务,注意通过不同阶段传递的数据将由
serialize_ipc/deserialize_ipc
方法序列化/反序列化,因此极大的数据可能会使整个管道变慢。序列化的数据默认通过rust传递到下一个阶段,您可以启用共享内存以潜在地减少延迟(参考RedisShmIPCMixin)。 - 您应该选择合适的
serialize/deserialize
方法来解码用户请求和编码响应。默认情况下,两者都使用JSON。然而,图像和嵌入在JSON中的支持不佳。您可以选择msgpack,它更快且兼容二进制(参考Stable Diffusion)。 - 配置OpenBLAS或MKL的线程。它可能无法选择当前Python进程使用的最合适的CPU。您可以使用环境为每个工作者配置它(参考定制的GPU分配)。
采用者
以下是一些使用Mosec的公司和个人用户:
- Modelz: 机器学习推理的无服务器平台。
- MOSS: 一个类似ChatGPT的开源对话语言模型。
- 腾讯云: 腾讯云机器学习平台,使用Mosec作为核心推理服务器框架。
- TensorChord: 云原生AI基础设施公司。
引用
如果您发现该软件对您的研究有用,请考虑引用
@software{yang2021mosec,
title = {{MOSEC: Model Serving made Efficient in the Cloud}},
author = {Yang, Keming and Liu, Zichen and Cheng, Philip},
url = {https://github.com/mosecorg/mosec},
year = {2021}
}
贡献
我们欢迎任何形式的贡献。请通过提出问题或在Discord上讨论给我们反馈!您也可以直接贡献您的代码和拉取请求!
要开始开发,您可以使用envd创建一个隔离和干净的Python和Rust环境。查看envd-docs或build.envd获取更多信息。