Skip to content

Commit c8029c6

Browse files
committed
Switch to torchrun for distributed launches
- Replace deprecated launch utility with torchrun (see PyTorch docs: https://pytorch.org/docs/stable/distributed.html#launch-utility) - Update README to reflect torchrun usage - Remove main.py (no longer referenced in documentation) - Update CI to test example.py script instead Signed-off-by: jafraustro <[email protected]>
1 parent d47f0f3 commit c8029c6

File tree

5 files changed

+98
-298
lines changed

5 files changed

+98
-298
lines changed

distributed/ddp/README.md

Lines changed: 72 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,167 +1,130 @@
1-
# Launching and configuring distributed data parallel applications
21

3-
In this tutorial we will demonstrate how to structure a distributed
4-
model training application so it can be launched conveniently on
5-
multiple nodes, each with multiple GPUs using PyTorch's distributed
6-
[launcher script](https://github.com/pytorch/pytorch/blob/master/torch/distributed/launch.py).
2+
# Distributed Data Parallel (DDP) Applications with PyTorch
73

8-
# Prerequisites
4+
This guide demonstrates how to structure a distributed model training application for convenient multi-node launches using `torchrun`.
95

10-
We assume you are familiar with [PyTorch](https://pytorch.org/tutorials/beginner/deep_learning_60min_blitz.html), the primitives it provides for [writing distributed applications](https://pytorch.org/tutorials/intermediate/dist_tuto.html) as well as training [distributed models](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
6+
---
117

12-
The example program in this tutorial uses the
13-
[`torch.nn.parallel.DistributedDataParallel`](https://pytorch.org/docs/stable/nn.html#distributeddataparallel) class for training models
14-
in a _data parallel_ fashion: multiple workers train the same global
15-
model by processing different portions of a large dataset, computing
16-
local gradients (aka _sub_-gradients) independently and then
17-
collectively synchronizing gradients using the AllReduce primitive. In
18-
HPC terminology, this model of execution is called _Single Program
19-
Multiple Data_ or SPMD since the same application runs on all
20-
application but each one operates on different portions of the
21-
training dataset.
8+
## Prerequisites
229

23-
# Application process topologies
10+
You should be familiar with:
11+
12+
- [PyTorch basics](https://pytorch.org/tutorials/beginner/deep_learning_60min_blitz.html)
13+
- [Writing distributed applications](https://pytorch.org/tutorials/intermediate/dist_tuto.html)
14+
- [Distributed model training](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)
15+
16+
This tutorial uses the [`torch.nn.parallel.DistributedDataParallel`](https://docs.pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel) (DDP) class for data parallel training: multiple workers train the same global model on different data shards, compute local gradients, and synchronize them using AllReduce. In High-Performance Computing (HCP), this is called _Single Program Multiple Data_ (SPMD).
17+
18+
---
19+
20+
## Application Process Topologies
2421

2522
A Distributed Data Parallel (DDP) application can be executed on
2623
multiple nodes where each node can consist of multiple GPU
2724
devices. Each node in turn can run multiple copies of the DDP
2825
application, each of which processes its models on multiple GPUs.
2926

30-
Let _N_ be the number of nodes on which the application is running and
31-
_G_ be the number of GPUs per node. The total number of application
32-
processes running across all the nodes at one time is called the
33-
**World Size**, _W_ and the number of processes running on each node
34-
is referred to as the **Local World Size**, _L_.
27+
Let:
28+
- _N_ = number of nodes
29+
- _G_ = number of GPUs per node
30+
- _W_ = **World Size** = total number of processes
31+
- _L_ = **Local World Size** = processes per node
3532

36-
Each application process is assigned two IDs: a _local_ rank in \[0,
37-
_L_-1\] and a _global_ rank in \[0, _W_-1\].
33+
Each process has:
34+
- **Local rank**: in `[0, L-1]`
35+
- **Global rank**: in `[0, W-1]`
3836

39-
To illustrate the terminology defined above, consider the case where a
40-
DDP application is launched on two nodes, each of which has four
41-
GPUs. We would then like each process to span two GPUs each. The
42-
mapping of processes to nodes is shown in the figure below:
37+
**Example:**
38+
If you launch a DDP app on 2 nodes, each with 4 GPUs, and want each process to span 2 GPUs, the mapping is as follows:
4339

4440
![ProcessMapping](https://user-images.githubusercontent.com/875518/77676984-4c81e400-6f4c-11ea-87d8-f2ff505a99da.png)
4541

46-
While there are quite a few ways to map processes to nodes, a good
47-
rule of thumb is to have one process span a single GPU. This enables
48-
the DDP application to have as many parallel reader streams as there
49-
are GPUs and in practice provides a good balance between I/O and
50-
computational costs. In the rest of this tutorial, we assume that the
51-
application follows this heuristic.
42+
While there are quite a few ways to map processes to nodes, a good rule of thumb is to have one process span a single GPU. This enables the DDP application to have as many parallel reader streams as there are GPUs and in practice provides a good balance between I/O and computational costs. In the rest of this tutorial, we assume that the application follows this heuristic.
5243

5344
# Preparing and launching a DDP application
5445

55-
Independent of how a DDP application is launched, each process needs a
56-
mechanism to know its global and local ranks. Once this is known, all
57-
processes create a `ProcessGroup` that enables them to participate in
58-
collective communication operations such as AllReduce.
59-
60-
A convenient way to start multiple DDP processes and initialize all
61-
values needed to create a `ProcessGroup` is to use the distributed
62-
`launch.py` script provided with PyTorch. The launcher can be found
63-
under the `distributed` subdirectory under the local `torch`
64-
installation directory. Here is a quick way to get the path of
65-
`launch.py` on any operating system:
66-
67-
```sh
68-
python -c "from os import path; import torch; print(path.join(path.dirname(torch.__file__), 'distributed', 'launch.py'))"
69-
```
70-
71-
This will print something like this:
72-
73-
```sh
74-
/home/username/miniconda3/envs/pytorch/lib/python3.8/site-packages/torch/distributed/launch.py
75-
```
76-
77-
When the DDP application is started via `launch.py`, it passes the world size, global rank, master address and master port via environment variables and the local rank as a command-line parameter to each instance.
78-
To use the launcher, an application needs to adhere to the following convention:
79-
80-
1. It must provide an entry-point function for a _single worker_. For example, it should not launch subprocesses using `torch.multiprocessing.spawn`
81-
2. It must use environment variables for initializing the process group.
82-
83-
For simplicity, the application can assume each process maps to a single GPU but in the next section we also show how a more general process-to-GPU mapping can be performed.
46+
Independent of how a DDP application is launched, each process needs a mechanism to know its global and local ranks. Once this is known, all processes create a `ProcessGroup` that enables them to participate in collective communication operations such as AllReduce.
8447

85-
# Sample application
48+
A convenient way to start multiple DDP processes and initialize all values needed to create a `ProcessGroup` is to use the [`torchrun`](https://docs.pytorch.org/docs/stable/elastic/run.html) script provided with PyTorch.
8649

87-
The sample DDP application in this repo is based on the "Hello, World" [DDP tutorial](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
50+
---
8851

89-
## Argument passing convention
52+
## Sample Application
9053

91-
The DDP application takes two command-line arguments:
54+
This example is based on the ["Hello, World" DDP tutorial](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).
9255

93-
1. `--local_rank`: This is passed in via `launch.py`
94-
2. `--local_world_size`: This is passed in explicitly and is typically either $1$ or the number of GPUs per node.
56+
The application calls the `spmd_main` entrypoint:
9557

96-
The application parses these and calls the `spmd_main` entrypoint:
97-
98-
```py
58+
```python
9959
if __name__ == "__main__":
100-
parser = argparse.ArgumentParser()
101-
parser.add_argument("--local_rank", type=int, default=0)
102-
parser.add_argument("--local_world_size", type=int, default=1)
103-
args = parser.parse_args()
104-
spmd_main(args.local_world_size, args.local_rank)
60+
spmd_main()
10561
```
10662

107-
In `spmd_main`, the process group is initialized with just the backend (NCCL or Gloo). The rest of the information needed for rendezvous comes from environment variables set by `launch.py`:
63+
In `spmd_main`, the process group is initialized using the Accelerator API. The rest of the rendezvous information comes from environment variables set by `torchrun`:
10864

109-
```py
110-
def spmd_main(local_world_size, local_rank):
65+
```python
66+
def spmd_main():
11167
# These are the parameters used to initialize the process group
11268
env_dict = {
11369
key: os.environ[key]
11470
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
11571
}
72+
rank = int(env_dict['RANK'])
73+
local_rank = int(env_dict['LOCAL_RANK'])
74+
local_world_size = int(env_dict['LOCAL_WORLD_SIZE'])
75+
11676
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
11777
dist.init_process_group(backend="nccl")
11878
print(
11979
f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
12080
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
12181
)
12282

123-
demo_basic(local_world_size, local_rank)
83+
demo_basic(rank)
12484

12585
# Tear down the process group
126-
dist.destroy_process_group()
86+
torch.distributed.destroy_process_group()
12787
```
12888

129-
Given the local rank and world size, the training function, `demo_basic` initializes the `DistributedDataParallel` model across a set of GPUs local to the node via `device_ids`:
89+
**Key points:**
90+
- Each process reads its rank and world size from environment variables.
91+
- The process group is initialized for distributed communication.
13092

131-
```py
132-
def demo_basic(local_world_size, local_rank):
133-
134-
# setup devices for this process. For local_world_size = 2, num_gpus = 8,
135-
# rank 0 uses GPUs [0, 1, 2, 3] and
136-
# rank 1 uses GPUs [4, 5, 6, 7].
137-
n = torch.cuda.device_count() // local_world_size
138-
device_ids = list(range(local_rank * n, (local_rank + 1) * n))
93+
The training function, `demo_basic`, initializes the DDP model on the appropriate GPU:
13994

95+
```python
96+
def demo_basic(rank):
14097
print(
141-
f"[{os.getpid()}] rank = {dist.get_rank()}, "
142-
+ f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
98+
f"[{os.getpid()}] rank = {torch.distributed.get_rank()}, "
99+
+ f"world_size = {torch.distributed.get_world_size()}"
143100
)
144101

145-
model = ToyModel().cuda(device_ids[0])
146-
ddp_model = DDP(model, device_ids)
102+
model = ToyModel().to(rank)
103+
ddp_model = DDP(model, device_ids=[rank])
147104

148105
loss_fn = nn.MSELoss()
149106
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
150107

151108
optimizer.zero_grad()
152109
outputs = ddp_model(torch.randn(20, 10))
153-
labels = torch.randn(20, 5).to(device_ids[0])
110+
labels = torch.randn(20, 5).to(rank)
154111
loss_fn(outputs, labels).backward()
155112
optimizer.step()
156113
```
157114

158-
The application can be launched via `launch.py` as follows on a 8 GPU node with one process per GPU:
115+
---
116+
117+
## Launching the Application
159118

160119
```sh
161-
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=8 example.py --local_world_size=8
120+
torchrun --nnodes=1 --nproc_per_node=8 example.py
162121
```
163122

164-
and produces an output similar to the one shown below:
123+
---
124+
125+
## Example Output
126+
127+
Expected output:
165128

166129
```sh
167130
*****************************************
@@ -183,30 +146,16 @@ Setting OMP_NUM_THREADS environment variable for each process to be 1 in default
183146
[238632] world_size = 8, rank = 5, backend=nccl
184147
[238634] world_size = 8, rank = 7, backend=nccl
185148
[238627] world_size = 8, rank = 0, backend=nccl
186-
[238633] rank = 6, world_size = 8, n = 1, device_ids = [6]
187-
[238628] rank = 1, world_size = 8, n = 1, device_ids = [1]
188-
[238632] rank = 5, world_size = 8, n = 1, device_ids = [5]
189-
[238634] rank = 7, world_size = 8, n = 1, device_ids = [7]
190-
[238629] rank = 2, world_size = 8, n = 1, device_ids = [2]
191-
[238630] rank = 3, world_size = 8, n = 1, device_ids = [3]
192-
[238631] rank = 4, world_size = 8, n = 1, device_ids = [4]
193-
[238627] rank = 0, world_size = 8, n = 1, device_ids = [0]
194-
```
195-
196-
Similarly, it can be launched with a single process that spans all 8 GPUs using:
197-
198-
```sh
199-
python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=1 example.py --local_world_size=1
200-
```
201-
202-
that in turn produces the following output
203-
204-
```sh
205-
[262816] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '1'}
206-
[262816]: world_size = 1, rank = 0, backend=nccl
207-
[262816] rank = 0, world_size = 1, n = 8, device_ids = [0, 1, 2, 3, 4, 5, 6, 7]
149+
[238633] rank = 6, world_size = 8
150+
[238628] rank = 1, world_size = 8
151+
[238632] rank = 5, world_size = 8
152+
[238634] rank = 7, world_size = 8
153+
[238629] rank = 2, world_size = 8
154+
[238630] rank = 3, world_size = 8
155+
[238631] rank = 4, world_size = 8
156+
[238627] rank = 0, world_size = 8
208157
```
209158
210159
# Conclusions
211160
212-
As the author of a distributed data parallel application, your code needs to be aware of two types of resources: compute nodes and the GPUs within each node. The process of setting up bookkeeping to track how the set of GPUs is mapped to the processes of your application can be tedious and error-prone. We hope that by structuring your application as shown in this example and using the launcher, the mechanics of setting up distributed training can be significantly simplified.
161+
As the author of a distributed data parallel application, your code needs to be aware of two types of resources: compute nodes and the GPUs within each node. The process of setting up bookkeeping to track how the set of GPUs is mapped to the processes of your application can be tedious and error-prone. We hope that by structuring your application as shown in this example and using `torchrun`, the mechanics of setting up distributed training can be significantly simplified.

distributed/ddp/example.py

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,37 @@ def forward(self, x):
2222
return self.net2(self.relu(self.net1(x)))
2323

2424

25-
def demo_basic(local_world_size, local_rank):
26-
27-
# setup devices for this process. For local_world_size = 2, num_gpus = 8,
28-
# rank 0 uses GPUs [0, 1, 2, 3] and
29-
# rank 1 uses GPUs [4, 5, 6, 7].
30-
n = torch.cuda.device_count() // local_world_size
31-
device_ids = list(range(local_rank * n, (local_rank + 1) * n))
25+
def demo_basic(rank):
3226

3327
print(
3428
f"[{os.getpid()}] rank = {dist.get_rank()}, "
35-
+ f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids} \n", end=''
36-
)
29+
+ f"world_size = {dist.get_world_size()}"
30+
)
3731

38-
model = ToyModel().cuda(device_ids[0])
39-
ddp_model = DDP(model, device_ids)
32+
model = ToyModel().to(rank)
33+
ddp_model = DDP(model, device_ids=[rank])
4034

4135
loss_fn = nn.MSELoss()
4236
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
4337

4438
optimizer.zero_grad()
4539
outputs = ddp_model(torch.randn(20, 10))
46-
labels = torch.randn(20, 5).to(device_ids[0])
40+
labels = torch.randn(20, 5).to(rank)
4741
loss_fn(outputs, labels).backward()
4842
optimizer.step()
4943

44+
print(f"training completed in rank {rank}!")
5045

51-
def spmd_main(local_world_size, local_rank):
46+
47+
def main():
5248
# These are the parameters used to initialize the process group
5349
env_dict = {
5450
key: os.environ[key]
55-
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
51+
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "LOCAL_RANK", "WORLD_SIZE", "LOCAL_WORLD_SIZE")
5652
}
53+
rank = int(env_dict['RANK'])
54+
local_rank = int(env_dict['LOCAL_RANK'])
55+
local_world_size = int(env_dict['LOCAL_WORLD_SIZE'])
5756

5857
if sys.platform == "win32":
5958
# Distributed package only covers collective communications with Gloo
@@ -80,18 +79,10 @@ def spmd_main(local_world_size, local_rank):
8079
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()} \n", end=''
8180
)
8281

83-
demo_basic(local_world_size, local_rank)
82+
demo_basic(rank)
8483

8584
# Tear down the process group
8685
dist.destroy_process_group()
8786

88-
8987
if __name__ == "__main__":
90-
parser = argparse.ArgumentParser()
91-
# This is passed in via launch.py
92-
parser.add_argument("--local_rank", type=int, default=0)
93-
# This needs to be explicitly passed in
94-
parser.add_argument("--local_world_size", type=int, default=1)
95-
args = parser.parse_args()
96-
# The main entry point is called directly without using subprocess
97-
spmd_main(args.local_world_size, args.local_rank)
88+
main()

0 commit comments

Comments
 (0)