Skip to content

Commit dd27552

Browse files
author
Helin Wang
committed
refine according to comments
1 parent 57a7b7a commit dd27552

14 files changed

+24
-40
lines changed

doc/design/dist/README.md

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Distributed Training Design Doc
1+
# Design Doc: Distributed Training
22

33
## Objective
44

@@ -22,43 +22,17 @@ A training job will be created once user asks Paddle cloud to train a model. The
2222

2323
One training job will only have one master process, typically multiple trainer processes and parameter server processes. Their relation is illustrated in the following graph:
2424

25-
<img src="src/paddle-model-sharding.png" height="400"/>
25+
<img src="src/paddle-model-sharding.png"/>
2626

2727
### Master Process
2828

2929
The master process will:
3030

31-
- Do [parameter server selection](#parameter-server-selection).
3231
- Shard dataset into [tasks](#task) and dispatch tasks to trainers.
3332
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.
3433

3534
Now we will explain the concepts mentioned above:
3635

37-
#### Selection Request
38-
39-
The selection request is a request that the master sends to a parameter server candidate, making it a parameter server available to the trainers. It contains information such as the parameter server index, the optimizaiton algorithm, the parameter save period, and the path for saving parameters.
40-
41-
#### Parameter Server Selection
42-
43-
The parameter server selection process selects parameter servers from parameter server candidates. It ensures the parameter servers that the trainers see are in consistent order, since the trainer needs to decide the parameter placement according to the consistent order.
44-
45-
The selection process is as follows:
46-
47-
- The master watches `/ps_candidate/` prefix in etcd. When a parameter server candidate joins and there is not enough parameter servers, the master will remove the candidate's entry in `/ps_candidate/` and send a [selection reqeust](#selection-request) to the candidate. Upon receiving the request, the candidate will set key `/ps/<index>` in etcd with a lease to make itself available for the trainers. The `<index>` is from the selection request.
48-
49-
The graph below shows a parameter server candidate come online and then being selected, available for the trainers:
50-
51-
<img src="src/paddle-ps-0-can.png" width="650"/>
52-
<img src="src/paddle-ps-0-sel.png" width="650"/>
53-
54-
- The master watches `/ps/` prefix in etcd. When a selected parameter server went offline, the master will select a not yet selected parameter server candidate by sending the selection request to fill the missing parameter server spot.
55-
56-
The graph below shows one parameter server is missing, the cluster management system created a new parameter server. The new parameter server announced itself as a candidate. Then the master filled the missing parameter server spot with the new candidate.
57-
58-
<img src="src/paddle-ps-new-can.png" width="650"/>
59-
<img src="src/paddle-ps-new-sel.png" width="650"/>
60-
61-
6236
#### Task
6337

6438
A task is a piece of sharded data to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.
@@ -67,15 +41,15 @@ A task is a piece of sharded data to be trained. The total number of tasks will
6741

6842
Master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues.
6943

70-
<img src="src/paddle-task-queues.png" height="400"/>
44+
<img src="src/paddle-task-queues.png"/>
7145

7246
- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks.
7347
- The pending queue holds tasks that are currently training by trainers.
7448
- the done queue holds tasks that are already trained.
7549

7650
The life cycle of a single task is illustrated below:
7751

78-
<img src="src/paddle-task-states.png" height="400"/>
52+
<img src="src/paddle-task-states.png"/>
7953

8054
1. When a new pass of training starts, all tasks will be placed in the todo queue.
8155
1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion.
@@ -117,7 +91,7 @@ The communication pattern between the trainers and the parameter servers depends
11791

11892
## Fault Tolerant
11993

120-
The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by the cluster management system and recover in few minutes. Please refer to [fault recovery](#fault-recovery).
94+
The training job will pause if the master process is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery).
12195

12296
The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm:
12397

@@ -133,13 +107,13 @@ The training job will continue to make progress if there is at least one trainin
133107

134108
PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameter are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file.
135109

136-
Now we will introduce how each process recovers from failure:
110+
Now we will introduce how each process recovers from failure, the graph below provides an illustration:
137111

138-
<img src="src/paddle-etcd.png" width="650"/>
112+
<img src="src/paddle-etcd.png"/>
139113

140114
### Master Process
141115

142-
When the master is started by the cluster management system, it executes the following steps at startup:
116+
When the master is started by the Kubernetes, it executes the following steps at startup:
143117

144118
1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations.
145119
1. Recovers the task queues from etcd if they already exists, otherwise the master will create them.
@@ -148,11 +122,11 @@ When the master is started by the cluster management system, it executes the fol
148122

149123
The master process will kill itself if its etcd lease expires.
150124

151-
When the master process is dead for any reason, the cluster management system will restart it. It will be online again with all states recovered from etcd in few minutes.
125+
When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes.
152126

153127
### Trainer Process
154128

155-
When the trainer is started by the cluster management system, it executes the following steps at startup:
129+
When the trainer is started by the Kubernetes, it executes the following steps at startup:
156130

157131
1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until count of parameter servers reaches the desired count.
158132
1. Generates an unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline.
@@ -162,13 +136,23 @@ If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` agai
162136

163137
### Parameter Server Process
164138

165-
When the parameter server is started by the cluster management system, it executes the following steps at startup:
139+
When the parameter server is started by Kubernetes, it executes the following steps at startup:
140+
141+
1. Read desired total number of parameter servers from etcd `/ps_desired`
142+
1. Search though etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name.
143+
144+
The desired number of parameter servers is 3:
145+
146+
<img src="src/paddle-ps-0.png"/>
147+
148+
The third parameter server joined:
149+
150+
<img src="src/paddle-ps-1.png"/>
166151

167-
1. Generates an unique ID, and sets key `/ps_candidate/<unique ID>` with its contact address as value and waits for the master's [selection request](#selection-request)
168-
1. Upon receiving master server's [selection request](#selection-request). The parameter server can load parameters if there are already saved parameters in the save path from selection request. Then Creates key `/ps/<index>` with its contact address as value.
152+
1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index).
169153
1. Now the parameter server is ready for the trainers' requests.
170154

171-
If the parameter server's etcd lease expires, the parameter server will save its parameters to the given save path and kill itself.
155+
If the parameter server's etcd lease expires, the parameter server will kill itself.
172156

173157

174158
## Dynamic Scaling

doc/design/dist/src/arch.png

-6.1 KB
Loading
158 Bytes
Binary file not shown.
-13.9 KB
Loading
-28.9 KB
Binary file not shown.
-31.1 KB
Binary file not shown.
18.9 KB
Loading
24.7 KB
Loading
-3.88 KB
Binary file not shown.
-40.3 KB
Binary file not shown.

0 commit comments

Comments
 (0)