Skip to content

Commit f84af2f

Browse files
Update cluster_quick_start.rst (PaddlePaddle#1060)
* Update cluster_quick_start.rst
1 parent 9f18368 commit f84af2f

File tree

1 file changed

+132
-192
lines changed

1 file changed

+132
-192
lines changed

doc/fluid/user_guides/howto/training/cluster_quick_start.rst

Lines changed: 132 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -3,195 +3,135 @@
33
分布式训练快速开始
44
==================
55

6-
准备工作
7-
--------
8-
9-
在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle
10-
的分布式训练任务,在开始之前,请按如下步骤做些准备工作:
11-
12-
1. 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 ``*.paddlepaddle.com``
13-
来表示节点的主机名称,您可以根据实际情况修改它。
14-
15-
2. 在开始之前确保已经阅读过 :ref:`install_steps`
16-
并且可以在集群的所有节点上可以正常运行 PaddlePaddle。
17-
18-
样例代码
19-
-------
20-
21-
下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 ``PSERVER`` 节点以及
22-
2个 ``TRAINER`` 节点的分布式训练任务,您可以将本段代码保存为 ``dist_train.py`` 运行。
23-
24-
.. code:: python
25-
26-
import os
27-
import paddle
28-
import paddle.fluid as fluid
29-
30-
# train reader
31-
BATCH_SIZE = 20
32-
EPOCH_NUM = 30
33-
BATCH_SIZE = 8
34-
35-
train_reader = paddle.batch(
36-
paddle.reader.shuffle(
37-
paddle.dataset.uci_housing.train(), buf_size=500),
38-
batch_size=BATCH_SIZE)
39-
40-
def train():
41-
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
42-
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
43-
y_predict = fluid.layers.fc(input=x, size=1, act=None)
44-
45-
loss = fluid.layers.square_error_cost(input=y_predict, label=y)
46-
avg_loss = fluid.layers.mean(loss)
47-
opt = fluid.optimizer.SGD(learning_rate=0.001)
48-
opt.minimize(avg_loss)
49-
50-
place = fluid.CPUPlace()
51-
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
52-
exe = fluid.Executor(place)
53-
54-
# fetch distributed training environment setting
55-
training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
56-
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
57-
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
58-
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
59-
eplist = []
60-
for ip in pserver_ips.split(","):
61-
eplist.append(':'.join([ip, port]))
62-
pserver_endpoints = ",".join(eplist)
63-
trainers = int(os.getenv("PADDLE_TRAINERS"))
64-
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
65-
66-
t = fluid.DistributeTranspiler()
67-
t.transpile(
68-
trainer_id = trainer_id,
69-
pservers = pserver_endpoints,
70-
trainers = trainers)
71-
72-
if training_role == "PSERVER":
73-
pserver_prog = t.get_pserver_program(current_endpoint)
74-
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
75-
exe.run(startup_prog)
76-
exe.run(pserver_prog)
77-
elif training_role == "TRAINER":
78-
trainer_prog = t.get_trainer_program()
79-
exe.run(fluid.default_startup_program())
80-
81-
for epoch in range(EPOCH_NUM):
82-
for batch_id, batch_data in enumerate(train_reader()):
83-
avg_loss_value, = exe.run(trainer_prog,
84-
feed=feeder.feed(batch_data),
85-
fetch_list=[avg_loss])
86-
if (batch_id + 1) % 10 == 0:
87-
print("Epoch: {0}, Batch: {1}, loss: {2}".format(
88-
epoch, batch_id, avg_loss_value[0]))
89-
# destory the resource of current trainer node in pserver server node
90-
exe.close()
91-
else:
92-
raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
93-
94-
train()
95-
96-
环境变量说明
97-
-----------
98-
99-
在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:
100-
101-
.. list-table::
102-
:header-rows: 1
103-
104-
* - 环境变量
105-
- 数据类型
106-
- 样例
107-
- 描述
108-
* - :code:`PADDLE_TRAINING_ROLE`
109-
- str
110-
- :code:`PSERVER,TRAINER`
111-
- 当前训练节点角色
112-
* - :code:`PADDLE_PSERVER_IPS`
113-
- str
114-
- :code:`ps0.paddlepaddle.com,ps1.paddlepaddle.com`
115-
- 分布式训练任务中所有 PSERVER 节点的 IP 地址或 hostname, 使用","分隔
116-
* - :code:`PADDLE_PSERVER_PORT`
117-
- int
118-
- 6174
119-
- PSERVER 进程监听的端口
120-
* - :code:`PADDLE_TRAINERS`
121-
- int
122-
- 2
123-
- 分布式训练任务中 trainer 节点的数量
124-
* - :code:`PADDLE_CURRENT_IP`
125-
- str
126-
- :code:`ps0.paddlepaddle.com`
127-
- 当前 PSERVER 节点的 IP 地址或 hostname
128-
* - :code:`PADDLE_TRAINER_ID`
129-
- str
130-
- 0
131-
- 当前 TRAINER 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)
132-
133-
注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。
134-
135-
分布式训练相关 API
136-
------------------
137-
138-
DistributeTranspiler
139-
~~~~~~~~~~~~~~~~~~~~~~
140-
141-
基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(PSERVER) 以及 TRAINER,
142-
在 Fluid 中,用户只需配置单机训练所需要的网络配置, ``DistributeTranspiler`` 模块会自动地根据
143-
当前训练节点的角色将用户配置的单机网路配置改写成 PSERVER 和 TRAINER 需要运行的网络配置:
144-
145-
.. code:: python
146-
147-
t = fluid.DistributeTranspiler()
148-
t.transpile(
149-
trainer_id = trainer_id,
150-
pservers = pserver_endpoints,
151-
trainers = trainers)
152-
if PADDLE_TRAINING_ROLE == "TRAINER":
153-
# fetch the trainer program and execute it
154-
trainer_prog = t.get_trainer_program()
155-
...
156-
157-
elif PADDLE_TRAINER_ROLE == "PSERVER":
158-
# fetch the pserver program and execute it
159-
pserver_prog = t.get_pserver_program(current_endpoint)
160-
...
161-
162-
exe.close()
163-
~~~~~~~~~~~~~~
164-
165-
PSERVER 节点中会保存所有 TRAINER 节点的状态信息,在 TRAINER 结束训练时需要调用 ``exe.close()``
166-
通知所有 PSERVER 节点释放当前 TRAINER 节点的资源:
167-
168-
.. code:: python
169-
170-
exe = fluid.Executor(fluid.CPUPlace())
171-
# training process ...
172-
exe.close() # notify PServer to destory the resource
173-
174-
注意:所有的trainer在退出时都需要调用exe.close()。
175-
176-
177-
启动分布式训练任务
178-
--------------------
179-
180-
.. list-table::
181-
:header-rows: 1
182-
183-
* - 启动节点
184-
- 启动命令
185-
- 说明
186-
* - ps0.paddlepaddle.com
187-
- :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
188-
- 启动 PSERVER 节点
189-
* - ps1.paddlepaddle.com
190-
- :code:`PADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
191-
- 启动 PSERVER 节点
192-
* - trainer0.paddlepaddle.com
193-
- :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
194-
- 启动第0号 TRAINER 节点
195-
* - trainer1.paddlepaddle.com
196-
- :code:`PADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py`
197-
- 启动第1号 TRAINER 节点
6+
使用Fleet API进行分布式训练
7+
---------------------------
8+
9+
从Paddle Fluid `Release 1.5.1 <https://github.com/PaddlePaddle/Paddle/releases/tag/v1.5.1>`_ 开始,官方推荐使用Fleet API进行分布式训练,关于Fleet API的介绍可以参考 `Fleet Design Doc <https://github.com/PaddlePaddle/Fleet>`_
10+
11+
12+
准备条件
13+
^^^^^^^^
14+
15+
16+
*
17+
[x] 成功安装Paddle Fluid,如果尚未安装,请参考 `快速开始 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/beginners_guide/quick_start_cn.html>`_
18+
19+
*
20+
[x] 学会最基本的单机训练方法,请参考 `单机训练 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/single_node.html>`_ 中描述的单卡训练,进行学习
21+
22+
点击率预估任务
23+
^^^^^^^^^^^^^^
24+
25+
本文使用一个简单的示例,点击率预估任务,来说明如何使用Fleet API进行分布式训练的配置方法,并利用单机环境模拟分布式环境给出运行示例。示例的源码来自 `CTR with Fleet <https://github.com/PaddlePaddle/Fleet/tree/develop/examples/ctr>`_
26+
27+
28+
为了方便学习,这里给出的示例是单机与多机混合的代码,用户可以通过不同的启动命令进行单机或多机任务的启动。获取数据的部分,以及对数据预处理的逻辑可以参考 `CTR with Fleet <https://github.com/PaddlePaddle/Fleet/tree/develop/examples/ctr>`_ 的源码和说明,这里不做过多描述。
29+
30+
.. code-block:: python
31+
32+
from __future__ import print_function
33+
from args import parse_args
34+
import os
35+
import paddle.fluid as fluid
36+
import sys
37+
from network_conf import ctr_dnn_model_dataset
38+
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
39+
40+
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
41+
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
42+
43+
dense_feature_dim = 13
44+
sparse_feature_dim = 10000001
45+
batch_size = 100
46+
thread_num = 10
47+
embedding_size = 10
48+
args = parse_args()
49+
50+
def main_function(is_local):
51+
# common code for local training and distributed training
52+
dense_input = fluid.layers.data(
53+
name="dense_input", shape=[dense_feature_dim], dtype='float32')
54+
55+
sparse_input_ids = [
56+
fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1,
57+
dtype="int64") for i in range(1, 27)]
58+
59+
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
60+
dataset = fluid.DatasetFactory().create_dataset()
61+
dataset.set_use_var([dense_input] + sparse_input_ids + [label])
62+
pipe_command = "python criteo_reader.py %d" % sparse_feature_dim
63+
dataset.set_pipe_command(pipe_command)
64+
dataset.set_batch_size(batch_size)
65+
dataset.set_thread(thread_num)
66+
67+
whole_filelist = ["raw_data/part-%d" % x
68+
for x in range(len(os.listdir("raw_data")))]
69+
70+
dataset.set_filelist(whole_filelist)
71+
loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(
72+
dense_input, sparse_input_ids, label, embedding_size,
73+
sparse_feature_dim)
74+
75+
exe = fluid.Executor(fluid.CPUPlace())
76+
def train_loop(epoch=20):
77+
for i in range(epoch):
78+
exe.train_from_dataset(program=fluid.default_main_program(),
79+
dataset=dataset,
80+
fetch_list=[auc_var],
81+
fetch_info=["auc"],
82+
debug=False)
83+
# local training
84+
def local_train():
85+
optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
86+
optimizer.minimize(loss)
87+
exe.run(fluid.default_startup_program())
88+
train_loop()
89+
90+
# distributed training
91+
def dist_train():
92+
role = role_maker.PaddleCloudRoleMaker()
93+
fleet.init(role)
94+
strategy = DistributeTranspilerConfig()
95+
strategy.sync_mode = False
96+
optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
97+
optimizer = fleet.distributed_optimizer(optimizer, strategy)
98+
optimizer.minimize(loss)
99+
100+
if fleet.is_server():
101+
fleet.init_server()
102+
fleet.run_server()
103+
elif fleet.is_worker():
104+
fleet.init_worker()
105+
exe.run(fluid.default_startup_program())
106+
train_loop()
107+
if is_local:
108+
local_train()
109+
else:
110+
dist_train()
111+
112+
if __name__ == '__main__':
113+
main_function(args.is_local)
114+
115+
116+
* 说明:示例中使用的IO方法是dataset,想了解具体的文档和用法请参考 `Dataset API <hhttps://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/api_cn/dataset_cn.html>`_ 。示例中使用的 ``train_from_dataset`` 接口,想了解具体的文档和使用方法请参考 `Executor API <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/api_cn/executor_cn.html>`_ 。示例中的 ``from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet`` 表示引入参数服务器架构进行分布式训练,如果想更进一步了解Fleet API的更多选项和示例,请参考 `Fleet API <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/fleet_api_howto_cn.html>`_
117+
118+
119+
单机训练启动命令
120+
~~~~~~~~~~~~~~~~
121+
122+
.. code-block:: bash
123+
124+
python train.py --is_local 1
125+
126+
单机模拟分布式训练的启动命令
127+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
128+
129+
在单机模拟多机训练的启动命令,这里我们用到了paddle内置的一个启动器launch_ps,用户可以指定worker和server的数量进行参数服务器任务的启动
130+
131+
.. code-block:: bash
132+
133+
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py
134+
135+
任务运行的日志在工作目录的logs目录下可以查看,当您能够使用单机模拟分布式训练,可以进行真正的多机分布式训练。我们建议用户直接参考 `百度云运行分布式任务的示例 <https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/deploy_ctr_on_baidu_cloud_cn.html>`_
136+
137+

0 commit comments

Comments
 (0)