|
| 1 | +# Design Doc: Distributed Training |
| 2 | + |
| 3 | +## Objective |
| 4 | + |
| 5 | +In [this slides](https://www.slideshare.net/cxwangyi/paddlepaddle-a-complete-solution-for-businesses), we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries. |
| 6 | + |
| 7 | +This poses technical challenges to PaddlePaddle: |
| 8 | + |
| 9 | +1. Support fault-recovery. |
| 10 | +1. Support both offline and online training. |
| 11 | +1. [Serverless computing](https://en.wikipedia.org/wiki/Serverless_computing) of distributed training. |
| 12 | + |
| 13 | + |
| 14 | +## Training Job |
| 15 | + |
| 16 | +A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes: |
| 17 | + |
| 18 | +1. the *master process*, which dispatches tasks to |
| 19 | +1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via |
| 20 | +1. one or more *parameter server processes*, where each holds a shard of the global model. |
| 21 | + |
| 22 | +Their relation is illustrated in the following graph: |
| 23 | + |
| 24 | +<img src="src/paddle-model-sharding.png"/> |
| 25 | + |
| 26 | +### Master Process |
| 27 | + |
| 28 | +The master process will: |
| 29 | + |
| 30 | +- Partition a dataset into [tasks](#task) and dispatch tasks to trainers. |
| 31 | +- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass. |
| 32 | + |
| 33 | + |
| 34 | +#### Task |
| 35 | + |
| 36 | +A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size. |
| 37 | + |
| 38 | +#### Task Queue |
| 39 | + |
| 40 | +The master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues. |
| 41 | + |
| 42 | +<img src="src/paddle-task-queues.png"/> |
| 43 | + |
| 44 | +- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks. |
| 45 | +- The pending queue holds tasks that are currently training by trainers. |
| 46 | +- the done queue holds tasks that are already trained. |
| 47 | + |
| 48 | +The life cycle of a single task is illustrated below: |
| 49 | + |
| 50 | +<img src="src/paddle-task-states.png"/> |
| 51 | + |
| 52 | +1. When a new pass of training starts, all tasks will be placed in the todo queue. |
| 53 | +1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion. |
| 54 | +1. The trainer will work on its tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer. |
| 55 | +1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above a threshold, the task is likely to cause a trainer to crash, so it will be discarded. |
| 56 | +1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and reset the timeout counter of all tasks to zero. |
| 57 | + |
| 58 | +### Trainer Process |
| 59 | + |
| 60 | +The trainer process will: |
| 61 | + |
| 62 | +- Receive tasks from the master. |
| 63 | +- Work on the tasks: calculate and upload gradient to parameter servers, and update local model by downloading new parameters from parameter servers. |
| 64 | + |
| 65 | +### Parameter Server Process |
| 66 | + |
| 67 | +Parameter server processes hold the parameters collaboratively. The parameters are partitioned on different parameter servers. |
| 68 | + |
| 69 | +The parameter server will: |
| 70 | + |
| 71 | +- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters. |
| 72 | +- Periodically save its parameters to distributed file system by overriding the previous save. |
| 73 | + |
| 74 | +### Optimization Algorithms |
| 75 | + |
| 76 | +The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm: |
| 77 | + |
| 78 | +- Synchronous Stochastic Gradient Descent (sync-SGD) |
| 79 | + |
| 80 | + Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch. |
| 81 | + |
| 82 | +- Asynchronous Stochastic Gradient Descent (async-SGD) |
| 83 | + |
| 84 | + There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient: |
| 85 | + |
| 86 | + - Each trainer uploads its accumulated gradient every n mini-batches. |
| 87 | + - Every m mini-batches, the trainer downloads new parameters from parameter server. |
| 88 | + - n and m do not have to be equal. |
| 89 | + |
| 90 | +## Fault Tolerant |
| 91 | + |
| 92 | +The training job will pause if the master processes is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery). |
| 93 | + |
| 94 | +The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm: |
| 95 | + |
| 96 | +- sync-SGD |
| 97 | + |
| 98 | + TODO |
| 99 | + |
| 100 | +- async-SGD |
| 101 | + |
| 102 | + Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running. |
| 103 | + |
| 104 | +## Fault Recovery |
| 105 | + |
| 106 | +PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameters are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file. |
| 107 | + |
| 108 | +Now we will introduce how each process recovers from a failure, the graph below shows how etcd is used: |
| 109 | + |
| 110 | +<img src="src/paddle-etcd.png"/> |
| 111 | + |
| 112 | +### Master Process |
| 113 | + |
| 114 | +When the master is started by the Kubernetes, it executes the following steps at startup: |
| 115 | + |
| 116 | +1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations. |
| 117 | +1. Recovers the task queues from etcd if they already exist, otherwise, the master will create them. |
| 118 | +1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers. |
| 119 | +1. Starts dispatching the tasks to the trainers, and updates task queue using an etcd transaction to ensure lock is held during the update. |
| 120 | + |
| 121 | +The master process will kill itself if its etcd lease expires. |
| 122 | + |
| 123 | +When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes. |
| 124 | + |
| 125 | +### Trainer Process |
| 126 | + |
| 127 | +When the trainer is started by the Kubernetes, it executes the following steps at startup: |
| 128 | + |
| 129 | +1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until the count of parameter servers reaches the desired count. |
| 130 | +1. Generates a unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline. |
| 131 | +1. Waits for tasks from the master to start training. |
| 132 | + |
| 133 | +If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again. |
| 134 | + |
| 135 | +### Parameter Server Process |
| 136 | + |
| 137 | +When the parameter server is started by Kubernetes, it executes the following steps at startup: |
| 138 | + |
| 139 | +1. Read desired total number of parameter servers from etcd `/ps_desired` |
| 140 | +1. Search through etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name. |
| 141 | + |
| 142 | + The desired number of parameter servers is 3: |
| 143 | + |
| 144 | + <img src="src/paddle-ps-0.png"/> |
| 145 | + |
| 146 | + The third parameter server joined: |
| 147 | + |
| 148 | + <img src="src/paddle-ps-1.png"/> |
| 149 | + |
| 150 | +1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index). |
| 151 | +1. Now the parameter server is ready for the trainers' requests. |
| 152 | + |
| 153 | +If the parameter server's etcd lease expires, the parameter server will kill itself. |
| 154 | + |
| 155 | + |
| 156 | +## Dynamic Scaling |
| 157 | + |
| 158 | +### Trainer Scaling |
| 159 | + |
| 160 | +TODO |
| 161 | + |
| 162 | +### Parameter Server Scaling |
| 163 | + |
| 164 | +Not planned for v1. |
| 165 | + |
| 166 | +## Training Dataset Format |
| 167 | + |
| 168 | +TODO |
| 169 | + |
| 170 | +## User Interface |
| 171 | + |
| 172 | +TODO |
0 commit comments