PiPPy: PyTorch的流水线并行
[!注意] PiPPy已经被迁移到PyTorch中作为一个子包:
torch.distributed.pipelining
。你可以在这里找到详细文档。当前仓库主要作为示例的存放处。PiPPy库代码将被移除。请使用torch.distributed.pipelining
中的API。谢谢!
为什么选择PiPPy? | 安装指南 | 示例 | PiPPy详解
为什么选择PiPPy?
推动深度学习技术进步的最重要技术之一是扩展。扩展神经网络的常用技术包括_数据并行_、张量/操作并行_和_流水线并行。在许多情况下,流水线并行尤其可以成为一种有效的扩展技术,但它通常难以实现,需要对模型代码进行侵入式修改,并且需要实现复杂的运行时编排代码。PiPPy旨在提供一个工具包,自动完成这些工作,从而实现模型的高效扩展。
PiPPy是什么?
PiPPy项目由一个编译器和运行时栈组成,用于PyTorch模型的自动并行化和扩展。目前,PiPPy专注于_流水线并行_,这是一种将模型代码分区并让多个_微批次_同时执行模型不同部分的技术。要了解更多关于流水线并行的信息,请参阅这篇文章。
图:流水线并行。"F"、"B"和"U"分别表示前向传播、反向传播和权重更新。不同颜色代表不同的微批次。
PiPPy提供以下特性,使流水线并行变得更加简单:
- 通过追踪模型自动拆分模型代码。目标是让用户直接提供原始模型代码进行并行化,无需进行大规模修改。
- 与上一点相关,PiPPy支持非平凡拓扑结构,包括跳跃连接和共享权重/层。PiPPy为共享权重提供可配置行为,允许在流水线阶段之间传输或复制并同步梯度。
- 对跨主机流水线并行的一流支持,这通常是在较慢的互连上使用PP的场景。这是目前基于torchgpipe的
torch.distributed.pipeline.sync.Pipe
所缺少的。 - 与其他并行方案(如数据并行或张量分割模型并行)的可组合性(总体称为"3D并行")。目前,可以组合流水线并行和数据并行。未来将支持其他组合。
- 支持流水线调度范式,包括填充-排空(GPipe)、1F1B和交错1F1B等调度。未来将添加更多调度方式。
有关深入的技术架构,请参阅ARCHITECTURE.md。
安装
PiPPy需要2.2.0.dev以上版本的PyTorch才能工作。要快速安装,例如PyTorch每日构建版,请在与本README相同的目录下运行以下命令:
pip install -r requirements.txt --find-links https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html
如果您的系统有NVIDIA GPU,也可以选择CUDA版本的PyTorch,例如:
pip install -r requirements.txt --find-links https://download.pytorch.org/whl/nightly/cu118/torch_nightly.html
要从源代码安装PiPPy,请在与本README相同的目录下运行以下命令:
python setup.py install
要暴露PiPPy以进行开发,使得对此仓库的更改能反映在导入的包中,请运行:
python setup.py develop
示例
在这个仓库中,我们提供了基于真实模型的丰富示例。特别是,我们展示了如何在不对模型进行任何代码更改的情况下应用PiPPy。请参阅HuggingFace示例目录。示例包括:BERT、GPT2、T5、LLaMA等。
PiPPy详解
PiPPy由两部分组成:编译器_和_运行时。编译器接收您的模型代码,将其拆分,并转换为Pipe
,这是一个描述每个流水线阶段的模型及其数据流关系的包装器。运行时并行执行PipelineStage
,处理微批次拆分、调度、通信和梯度传播等事务。我们将在本节中介绍这些概念的API。
使用Pipe拆分模型
为了了解如何将模型拆分成流水线,让我们首先看一个简单的神经网络示例:
import torch
class MyNetworkBlock(torch.nn.Module):
def __init__(self, in_dim, out_dim):
super().__init__()
self.lin = torch.nn.Linear(in_dim, out_dim)
def forward(self, x):
x = self.lin(x)
x = torch.relu(x)
return x
class MyNetwork(torch.nn.Module):
def __init__(self, in_dim, layer_dims):
super().__init__()
prev_dim = in_dim
for i, dim in enumerate(layer_dims):
setattr(self, f'layer{i}', MyNetworkBlock(prev_dim, dim))
prev_dim = dim
self.num_layers = len(layer_dims)
# 10个输出类别
self.output_proj = torch.nn.Linear(layer_dims[-1], 10)
def forward(self, x):
for i in range(self.num_layers):
x = getattr(self, f'layer{i}')(x)
return self.output_proj(x)
in_dim = 512
layer_dims = [512, 1024, 256]
mn = MyNetwork(in_dim, layer_dims).to(device)
这个网络是以自由形式的Python代码编写的;它没有针对任何特定的并行技术进行修改。
让我们看看pippy.Pipe
接口的第一个用法:
from pippy import pipeline, annotate_split_points, Pipe, SplitPoint
annotate_split_points(mn, {'layer0': SplitPoint.END,
'layer1': SplitPoint.END})
batch_size = 32
example_input = torch.randn(batch_size, in_dim, device=device)
chunks = 4
pipe = pipeline(mn, chunks, example_args=(example_input,))
print(pipe)
"""
************************************* pipe *************************************
GraphModule(
(submod_0): PipeStageModule(
(L__self___layer0_mod_lin): Linear(in_features=512, out_features=512, bias=True)
)
(submod_1): PipelineStageModule(
(L__self___layer1_mod_lin): Linear(in_features=512, out_features=1024, bias=True)
)
(submod_2): PipelineStageModule(
(L__self___layer2_lin): Linear(in_features=1024, out_features=256, bias=True)
(L__self___output_proj): Linear(in_features=256, out_features=10, bias=True)
)
)
def forward(self, arg0):
submod_0 = self.submod_0(arg0); arg0 = None
submod_1 = self.submod_1(submod_0); submod_0 = None
submod_2 = self.submod_2(submod_1); submod_1 = None
return [submod_2]
"""
这里发生了什么?首先,`pipeline`通过追踪模型将其转换为有向无环图(DAG)。然后,它将操作和参数分组到流水线阶段中。阶段表示为`submod_N`子模块,其中`N`是一个自然数。
我们使用`annotate_split_points`指定代码应在`layer0`和`layer1`的末尾进行拆分。因此,我们的代码被拆分为三个流水线阶段。PiPPy还提供了`SplitPoint.BEGINNING`,如果用户想在某个注释点之前进行拆分。
虽然`annotate_split_points` API为用户提供了一种无需修改模型即可指定拆分点的方法,但PiPPy还提供了一个用于模型内注释的API:`pipe_split()`。有关详细信息,您可以阅读[此示例](https://github.com/pytorch/PiPPy/blob/main/test/test_pipe.py)。
这涵盖了`Pipe` API的基本用法。有关更多信息,请参阅文档。
## 使用PipelineStage进行流水线执行
给定上述`Pipe`对象,我们可以使用`PipelineStage`类之一以流水线方式执行我们的模型。首先,让我们实例化一个`PipelineStage`实例:
```python
# 我们使用`torchrun`来运行这个示例,使用多个进程。
# `torchrun`定义了两个环境变量:`RANK`和`WORLD_SIZE`。
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# 初始化分布式环境
import torch.distributed as dist
dist.init_process_group(rank=rank, world_size=world_size)
# Pipeline stage是我们的主要流水线运行时。它接收pipe对象、
# 这个进程的rank和设备。
from pippy.PipelineStage import PipelineStage
stage = PipelineStage(pipe, rank, device)
现在我们可以通过向第一个PipelineStage
传递输入来运行流水线:
# 输入数据
x = torch.randn(batch_size, in_dim, device=device)
# 使用输入`x`运行流水线。将批次分成4个微批次
# 并在流水线上并行运行它们
if rank == 0:
stage(x)
elif rank == world_size - 1:
output = stage()
else:
stage()
请注意,由于我们将模型拆分为三个阶段,因此必须使用三个工作进程运行此脚本。对于这个示例,我们将使用torchrun
在单台机器内运行多个进程进行演示。我们可以将上面所有的代码块收集到一个名为example.py的文件中,然后使用torchrun
运行它,如下所示:
torchrun --nproc_per_node=3 example.py
许可证
PiPPy采用3条款BSD许可证,详见LICENSE文件。
引用PiPPy
如果您在出版物中使用PiPPy,请使用以下BibTeX条目进行引用。
@Misc{pippy2022,
author = {James Reed, Pavel Belevich, Ke Wen, Howard Huang, Will Constable},
title = {PiPPy: Pipeline Parallelism for PyTorch},
howpublished = {\url{https://github.com/pytorch/PiPPy}},
year = {2022}
}