|
| 1 | +# Fluid Distributed Training |
| 2 | + |
| 3 | +## Introduction |
| 4 | + |
| 5 | +In this article, we'll explain how to config and run distributed training jobs with PaddlePaddle Fluid in a bare metal cluster. |
| 6 | + |
| 7 | +## Preparations |
| 8 | + |
| 9 | +### Get your cluster ready |
| 10 | + |
| 11 | +Prepare your computer nodes in the cluster. Nodes in this cluster can be of any specification that runs PaddlePaddle, and with a unique IP address assigned to it. Make sure they can communicate with each other. |
| 12 | + |
| 13 | +### Have PaddlePaddle installed |
| 14 | + |
| 15 | +PaddlePaddle must be installed on all nodes. If you have GPU cards on your nodes, be sure to properly install drivers and CUDA libraries. |
| 16 | + |
| 17 | +PaddlePaddle build and installation guide can be found from [here](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/index_en.html). |
| 18 | + |
| 19 | +### Update training script |
| 20 | + |
| 21 | +#### Non-cluster training script |
| 22 | + |
| 23 | +Let's take [Deep Learning 101](http://www.paddlepaddle.org/docs/develop/book/01.fit_a_line/index.html)'s first chapter: "fit a line" as an example. |
| 24 | + |
| 25 | +This demo's non-cluster version with fluid API is as follows: |
| 26 | + |
| 27 | +``` python |
| 28 | +import paddle.v2 as paddle |
| 29 | +import paddle.v2.fluid as fluid |
| 30 | + |
| 31 | +x = fluid.layers.data(name='x', shape=[13], dtype='float32') |
| 32 | +y_predict = fluid.layers.fc(input=x, size=1, act=None) |
| 33 | +y = fluid.layers.data(name='y', shape=[1], dtype='float32') |
| 34 | + |
| 35 | +cost = fluid.layers.square_error_cost(input=y_predict, label=y) |
| 36 | +avg_cost = fluid.layers.mean(x=cost) |
| 37 | + |
| 38 | +sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) |
| 39 | +sgd_optimizer.minimize(avg_cost) |
| 40 | + |
| 41 | +BATCH_SIZE = 20 |
| 42 | + |
| 43 | +train_reader = paddle.batch( |
| 44 | + paddle.reader.shuffle( |
| 45 | + paddle.dataset.uci_housing.train(), buf_size=500), |
| 46 | + batch_size=BATCH_SIZE) |
| 47 | + |
| 48 | +place = fluid.CPUPlace() |
| 49 | +feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) |
| 50 | +exe = fluid.Executor(place) |
| 51 | + |
| 52 | +exe.run(fluid.default_startup_program()) |
| 53 | + |
| 54 | +PASS_NUM = 100 |
| 55 | +for pass_id in range(PASS_NUM): |
| 56 | + fluid.io.save_persistables(exe, "./fit_a_line.model/") |
| 57 | + fluid.io.load_persistables(exe, "./fit_a_line.model/") |
| 58 | + for data in train_reader(): |
| 59 | + avg_loss_value, = exe.run(fluid.default_main_program(), |
| 60 | + feed=feeder.feed(data), |
| 61 | + fetch_list=[avg_cost]) |
| 62 | + |
| 63 | + if avg_loss_value[0] < 10.0: |
| 64 | + exit(0) # if avg cost less than 10.0, we think our code is good. |
| 65 | +exit(1) |
| 66 | +``` |
| 67 | + |
| 68 | +We created a simple fully connected neural networks training program and handed it to the fluid executor to run for 100 passes. |
| 69 | + |
| 70 | +Now let's try to convert it to a distributed version to run in a cluster. |
| 71 | + |
| 72 | +#### Introducing parameter server |
| 73 | + |
| 74 | +As you see from the non-cluster version of training script, there is only one role in it: the trainer, who does the computing as well as holding parameters. In cluster training, since multi-trainers are working on the same task, they need one centralized place to hold and distribute parameters. This centralized place is called the Parameter Server in PaddlePaddle. |
| 75 | + |
| 76 | + |
| 77 | + |
| 78 | +Parameter Server in fluid does not only hold parameters but is also assigned with a part of the program. Trainers communicate with parameter servers via send/receive OPs. For more tech detail, please refer to this [document](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/dist_refactor/distributed_architecture.md). |
| 79 | + |
| 80 | +Now we need to create program for both trainers and parameter servers, the question is how? |
| 81 | + |
| 82 | +#### Slice the program |
| 83 | + |
| 84 | +Fluid provides a tool called "Distribute Transpiler" to automatically convert the non-cluster program into cluster program. |
| 85 | + |
| 86 | +The idea behind this tool is to find optimize OPs and gradient parameters, slice the program into 2 pieces and connect them with send/receive OP. |
| 87 | + |
| 88 | +Optimize OPs and gradient parameters can be found from the return values of optimizer's minimize function. |
| 89 | + |
| 90 | +To put them together: |
| 91 | + |
| 92 | +``` python |
| 93 | +... #define the program, cost, and create sgd optimizer |
| 94 | + |
| 95 | +optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) #get optimize OPs and gradient parameters |
| 96 | + |
| 97 | +t = fluid.DistributeTranspiler() # create transpiler instance |
| 98 | +# slice the program into 2 pieces with optimizer_ops and gradient parameters list, as well as pserver_endpoints, which is a comma separated list of [IP:PORT] and number of trainers |
| 99 | +t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) |
| 100 | + |
| 101 | +... #create executor |
| 102 | + |
| 103 | +# in pserver, run this |
| 104 | +exe.run(fluid.default_startup_program()) |
| 105 | +#current_endpoint here means current pserver IP:PORT you wish to run on |
| 106 | +exe.run(t.get_pserver_program(current_endpoint, optimize_ops)) |
| 107 | + |
| 108 | +# in trainer, run this |
| 109 | +... # define data reader |
| 110 | +exe.run(fluid.default_startup_program()) |
| 111 | +for pass_id in range(100): |
| 112 | + for data in train_reader(): |
| 113 | + exe.run(t.get_trainer_program()) |
| 114 | + |
| 115 | + |
| 116 | +``` |
| 117 | + |
| 118 | +### E2E demo |
| 119 | + |
| 120 | +Please find the complete demo from [here](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py). In parameter server node run this in the command line: |
| 121 | + |
| 122 | +``` bash |
| 123 | +PSERVERS=192.168.1.2:6174 SERVER_ENDPOINT=192.168.1.2:6174 TRAINING_ROLE=PSERVER python notest_dist_fit_a_line.py |
| 124 | +``` |
| 125 | + |
| 126 | +*please note we assume that your parameter server runs at 192.168.1.2:6174* |
| 127 | + |
| 128 | +Wait until the prompt `Server listening on 192.168.1.2:6174` |
| 129 | + |
| 130 | +Then in 2 of your trainer node run this: |
| 131 | + |
| 132 | +``` bash |
| 133 | +PSERVERS=192.168.1.2:6174 SERVER_ENDPOINT=192.168.1.2:6174 TRAINING_ROLE=TRAINER python notest_dist_fit_a_line.py |
| 134 | +``` |
| 135 | + |
| 136 | +*the reason you need to run this command twice in 2 nodes is: in the script we set the trainer count to be 2. You can change this setting on line 50* |
| 137 | + |
| 138 | +Now you have 2 trainers and 1 parameter server up and running. |
0 commit comments