Skip to content

Commit 99d75ce

Browse files
committed
Redirect dist_pipeline and pipelinining tutorials
1 parent a551cdf commit 99d75ce

File tree

2 files changed

+14
-229
lines changed

2 files changed

+14
-229
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Distributed Pipeline Parallelism Using RPC
2+
==========================================
3+
4+
This tutorial has been deprecated.
5+
6+
Redirecting in 3 seconds...
7+
8+
.. raw:: html
9+
10+
<meta http-equiv="Refresh" content="3; url='https://pytorch.org/tutorials/index.html'" />
Lines changed: 4 additions & 229 deletions
Original file line numberDiff line numberDiff line change
@@ -1,236 +1,11 @@
11
Introduction to Distributed Pipeline Parallelism
22
================================================
3-
**Authors**: `Howard Huang <https://github.com/H-Huang>`_
43

5-
.. note::
6-
|edit| View and edit this tutorial in `github <https://github.com/pytorch/tutorials/blob/main/intermediate_source/pipelining_tutorial.rst>`__.
4+
This tutorial has been deprecated.
75

8-
This tutorial uses a gpt-style transformer model to demonstrate implementing distributed
9-
pipeline parallelism with `torch.distributed.pipelining <https://pytorch.org/docs/main/distributed.pipelining.html>`__
10-
APIs.
6+
Redirecting in 3 seconds...
117

12-
.. grid:: 2
8+
.. raw:: html
139

14-
.. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn
15-
:class-card: card-prerequisites
10+
<meta http-equiv="Refresh" content="3; url='https://pytorch.org/tutorials/index.html'" />
1611

17-
* How to use ``torch.distributed.pipelining`` APIs
18-
* How to apply pipeline parallelism to a transformer model
19-
* How to utilize different schedules on a set of microbatches
20-
21-
22-
.. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites
23-
:class-card: card-prerequisites
24-
25-
* Familiarity with `basic distributed training <https://pytorch.org/tutorials/beginner/dist_overview.html>`__ in PyTorch
26-
27-
Setup
28-
-----
29-
30-
With ``torch.distributed.pipelining`` we will be partitioning the execution of a model and scheduling computation on micro-batches. We will be using a simplified version
31-
of a transformer decoder model. The model architecture is for educational purposes and has multiple transformer decoder layers as we want to demonstrate how to split the model into different
32-
chunks. First, let us define the model:
33-
34-
.. code:: python
35-
36-
import torch
37-
import torch.nn as nn
38-
from dataclasses import dataclass
39-
40-
@dataclass
41-
class ModelArgs:
42-
dim: int = 512
43-
n_layers: int = 8
44-
n_heads: int = 8
45-
vocab_size: int = 10000
46-
47-
class Transformer(nn.Module):
48-
def __init__(self, model_args: ModelArgs):
49-
super().__init__()
50-
51-
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
52-
53-
# Using a ModuleDict lets us delete layers witout affecting names,
54-
# ensuring checkpoints will correctly save and load.
55-
self.layers = torch.nn.ModuleDict()
56-
for layer_id in range(model_args.n_layers):
57-
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
58-
59-
self.norm = nn.LayerNorm(model_args.dim)
60-
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
61-
62-
def forward(self, tokens: torch.Tensor):
63-
# Handling layers being 'None' at runtime enables easy pipeline splitting
64-
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
65-
66-
for layer in self.layers.values():
67-
h = layer(h, h)
68-
69-
h = self.norm(h) if self.norm else h
70-
output = self.output(h).float() if self.output else h
71-
return output
72-
73-
Then, we need to import the necessary libraries in our script and initialize the distributed training process. In this case, we are defining some global variables to use
74-
later in the script:
75-
76-
.. code:: python
77-
78-
import os
79-
import torch.distributed as dist
80-
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe
81-
82-
global rank, device, pp_group, stage_index, num_stages
83-
def init_distributed():
84-
global rank, device, pp_group, stage_index, num_stages
85-
rank = int(os.environ["LOCAL_RANK"])
86-
world_size = int(os.environ["WORLD_SIZE"])
87-
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
88-
dist.init_process_group()
89-
90-
# This group can be a sub-group in the N-D parallel case
91-
pp_group = dist.new_group()
92-
stage_index = rank
93-
num_stages = world_size
94-
95-
The ``rank``, ``world_size``, and ``init_process_group()`` code should seem familiar to you as those are commonly used in
96-
all distributed programs. The globals specific to pipeline parallelism include ``pp_group`` which is the process
97-
group that will be used for send/recv communications, ``stage_index`` which, in this example, is a single rank
98-
per stage so the index is equivalent to the rank, and ``num_stages`` which is equivalent to world_size.
99-
100-
The ``num_stages`` is used to set the number of stages that will be used in the pipeline parallelism schedule. For example,
101-
for ``num_stages=4``, a microbatch will need to go through 4 forwards and 4 backwards before it is completed. The ``stage_index``
102-
is necessary for the framework to know how to communicate between stages. For example, for the first stage (``stage_index=0``), it will
103-
use data from the dataloader and does not need to receive data from any previous peers to perform its computation.
104-
105-
106-
Step 1: Partition the Transformer Model
107-
---------------------------------------
108-
109-
There are two different ways of partitioning the model:
110-
111-
First is the manual mode in which we can manually create two instances of the model by deleting portions of
112-
attributes of the model. In this example for a 2 stage (2 ranks) the model is cut in half.
113-
114-
.. code:: python
115-
116-
def manual_model_split(model, example_input_microbatch, model_args) -> PipelineStage:
117-
if stage_index == 0:
118-
# prepare the first stage model
119-
for i in range(4, 8):
120-
del model.layers[str(i)]
121-
model.norm = None
122-
model.output = None
123-
stage_input_microbatch = example_input_microbatch
124-
125-
elif stage_index == 1:
126-
# prepare the second stage model
127-
for i in range(4):
128-
del model.layers[str(i)]
129-
model.tok_embeddings = None
130-
stage_input_microbatch = torch.randn(example_input_microbatch.shape[0], example_input_microbatch.shape[1], model_args.dim)
131-
132-
stage = PipelineStage(
133-
model,
134-
stage_index,
135-
num_stages,
136-
device,
137-
input_args=stage_input_microbatch,
138-
)
139-
return stage
140-
141-
As we can see the first stage does not have the layer norm or the output layer, and it only includes the first four transformer blocks.
142-
The second stage does not have the input embedding layers, but includes the output layers and the final four transformer blocks. The function
143-
then returns the ``PipelineStage`` for the current rank.
144-
145-
The second method is the tracer-based mode which automatically splits the model based on a ``split_spec`` argument. Using the pipeline specification, we can instruct
146-
``torch.distributed.pipelining`` where to split the model. In the following code block,
147-
we are splitting before the before 4th transformer decoder layer, mirroring the manual split described above. Similarly,
148-
we can retrieve a ``PipelineStage`` by calling ``build_stage`` after this splitting is done.
149-
150-
.. code:: python
151-
def tracer_model_split(model, example_input_microbatch) -> PipelineStage:
152-
pipe = pipeline(
153-
module=model,
154-
mb_args=(example_input_microbatch,),
155-
split_spec={
156-
"layers.4": SplitPoint.BEGINNING,
157-
}
158-
)
159-
stage = pipe.build_stage(stage_index, device, pp_group)
160-
return stage
161-
162-
163-
Step 2: Define The Main Execution
164-
---------------------------------
165-
166-
In the main function we will create a particular pipeline schedule that the stages should follow. ``torch.distributed.pipelining``
167-
supports multiple schedules including supports multiple schedules, including single-stage-per-rank schedules ``GPipe`` and ``1F1B``,
168-
as well as multiple-stage-per-rank schedules such as ``Interleaved1F1B`` and ``LoopedBFS``.
169-
170-
.. code:: python
171-
172-
if __name__ == "__main__":
173-
init_distributed()
174-
num_microbatches = 4
175-
model_args = ModelArgs()
176-
model = Transformer(model_args)
177-
178-
# Dummy data
179-
x = torch.ones(32, 500, dtype=torch.long)
180-
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
181-
example_input_microbatch = x.chunk(num_microbatches)[0]
182-
183-
# Option 1: Manual model splitting
184-
stage = manual_model_split(model, example_input_microbatch, model_args)
185-
186-
# Option 2: Tracer model splitting
187-
# stage = tracer_model_split(model, example_input_microbatch)
188-
189-
x = x.to(device)
190-
y = y.to(device)
191-
192-
def tokenwise_loss_fn(outputs, targets):
193-
loss_fn = nn.CrossEntropyLoss()
194-
outputs = outputs.view(-1, model_args.vocab_size)
195-
targets = targets.view(-1)
196-
return loss_fn(outputs, targets)
197-
198-
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
199-
200-
if rank == 0:
201-
schedule.step(x)
202-
elif rank == 1:
203-
losses = []
204-
output = schedule.step(target=y, losses=losses)
205-
dist.destroy_process_group()
206-
207-
In the example above, we are using the manual method to split the model, but the code can be uncommented to also try the
208-
tracer-based model splitting function. In our schedule, we need to pass in the number of microbatches and
209-
the loss function used to evaluate the targets.
210-
211-
The ``.step()`` function processes the entire minibatch and automatically splits it into microbatches based
212-
on the ``n_microbatches`` passed previously. The microbatches are then operated on according to the schedule class.
213-
In the example above, we are using GPipe, which follows a simple all-forwards and then all-backwards schedule. The output
214-
returned from rank 1 will be the same as if the model was on a single GPU and run with the entire batch. Similarly,
215-
we can pass in a ``losses`` container to store the corresponding losses for each microbatch.
216-
217-
Step 3: Launch the Distributed Processes
218-
----------------------------------------
219-
220-
Finally, we are ready to run the script. We will use ``torchrun`` to create a single host, 2-process job.
221-
Our script is already written in a way rank 0 that performs the required logic for pipeline stage 0, and rank 1
222-
performs the logic for pipeline stage 1.
223-
224-
``torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py``
225-
226-
Conclusion
227-
----------
228-
229-
In this tutorial, we have learned how to implement distributed pipeline parallelism using PyTorch's ``torch.distributed.pipelining`` APIs.
230-
We explored setting up the environment, defining a transformer model, and partitioning it for distributed training.
231-
We discussed two methods of model partitioning, manual and tracer-based, and demonstrated how to schedule computations on
232-
micro-batches across different stages. Finally, we covered the execution of the pipeline schedule and the launch of distributed
233-
processes using ``torchrun``.
234-
235-
For a production ready usage of pipeline parallelism as well as composition with other distributed techniques, see also
236-
`TorchTitan end to end example of 3D parallelism <https://github.com/pytorch/torchtitan>`__.

0 commit comments

Comments
 (0)