Skip to content

Commit a2c017d

Browse files
committed
1. merge simple_dist_transpiler to distribute_transpiler
2. add align_var_to_block argument to func transpile 3. remove concat and spilt if align_var_to_block is False 4. unittests for simple_dist_transpiler
1 parent 580340e commit a2c017d

File tree

3 files changed

+156
-263
lines changed

3 files changed

+156
-263
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest
16+
17+
import paddle.fluid as fluid
18+
import paddle.fluid.core as core
19+
import paddle.fluid.layers as layers
20+
from paddle.fluid.transpiler.distribute_transpiler import delete_ops
21+
import numpy as np
22+
23+
24+
class TestSimpleDistTranspiler(unittest.TestCase):
25+
def setUp(self):
26+
self.trainer_id = 0
27+
self.trainers = 2
28+
self.pservers = 2
29+
self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175"
30+
self.current_pserver_ep = "127.0.0.1:6175"
31+
32+
def net_conf(self):
33+
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
34+
35+
y_predict = fluid.layers.fc(input=x,
36+
size=1000,
37+
act=None,
38+
param_attr=fluid.ParamAttr(name='fc_w'))
39+
40+
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
41+
42+
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
43+
avg_cost = fluid.layers.mean(cost)
44+
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1)
45+
46+
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
47+
return optimize_ops, params_grads
48+
49+
def test_simple_transpiler(self):
50+
np.random.seed(1)
51+
52+
trainer = self.get_trainer()
53+
pserver, startup = self.get_pserver(self.current_pserver_ep)
54+
self.assertEqual([op.type for op in trainer.global_block().ops],
55+
self.get_expect_trainer_ops())
56+
57+
self.assertEqual(len(pserver.blocks), 2)
58+
# block0: listen_and_serv
59+
self.assertEqual([op.type for op in pserver.blocks[0].ops],
60+
["listen_and_serv"])
61+
# block1: optimize pass
62+
self.assertEqual([op.type for op in pserver.blocks[1].ops],
63+
["sum", "scale", "sgd"])
64+
65+
print("xxx", [op.output_arg_names for op in startup.global_block().ops])
66+
# confirm startup program
67+
self.assertEqual([op.type for op in startup.global_block().ops],
68+
["fill_constant", "uniform_random", "uniform_random"])
69+
70+
# the variable #fc_w will NOT be splited
71+
fc_w_var = startup.global_block().var("fc_w@GRAD")
72+
self.assertEqual(fc_w_var.shape, (1000, 1000))
73+
74+
fc_w_var = startup.global_block().var("[email protected]_0")
75+
self.assertEqual(fc_w_var.shape, (1000, 1000))
76+
77+
def get_main_program(self):
78+
main = fluid.Program()
79+
80+
with fluid.program_guard(main):
81+
self.net_conf()
82+
83+
return main
84+
85+
def get_expect_trainer_ops(self):
86+
trainer = fluid.Program()
87+
88+
with fluid.program_guard(trainer):
89+
optimize_ops, params_grads = self.net_conf()
90+
91+
delete_ops(trainer.global_block(), optimize_ops)
92+
ops = [op.type for op in trainer.global_block().ops] + [
93+
"send_vars", "send_barrier", "recv", "recv", "fetch_barrier"
94+
]
95+
ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars")
96+
return ops
97+
98+
def get_trainer(self):
99+
return self._transpiler_instance().get_trainer_program()
100+
101+
def get_pserver(self, ep):
102+
t = self._transpiler_instance()
103+
pserver = t.get_pserver_program(ep)
104+
startup = t.get_startup_program(ep, pserver)
105+
return pserver, startup
106+
107+
def _transpiler_instance(self):
108+
main = self.get_main_program()
109+
t = fluid.DistributeTranspiler()
110+
t.transpile(
111+
self.trainer_id,
112+
program=main,
113+
pservers=self.pserver_eps,
114+
trainers=self.trainers,
115+
align_var_to_block=False)
116+
return t
117+
118+
119+
if __name__ == "__main__":
120+
unittest.main()

python/paddle/fluid/transpiler/distribute_transpiler.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import print_function
1616

1717
import math
18+
import numpy as np
1819

1920
from ps_dispatcher import RoundRobin, HashName, PSDispatcher
2021
from .. import core, framework
@@ -103,15 +104,15 @@ def split_dense_variable(var_list, service_count, min_block_size=8192):
103104
104105
We need to have a minimal block size so that the calculations in
105106
the parameter server side can gain better performance. By default
106-
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
107+
minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
107108
108109
Args:
109110
var_list (list): List of variables.
110111
service_count (int): Numel of pserver services. A pserver may have two
111112
or more listening ports.
112113
min_block_size (int): Minimum splitted block size.
113114
Returns:
114-
blocks (list[(varname, block_id, current_block_size)]): A list
115+
blocks (list[(varname, block_id, current_block_size)]): A list
115116
of VarBlocks. Each VarBlock specifies a shard of the var.
116117
"""
117118
blocks = []
@@ -171,6 +172,7 @@ def transpile(self,
171172
program=None,
172173
pservers="127.0.0.1:6174",
173174
trainers=1,
175+
align_var_to_block=True,
174176
split_method=RoundRobin,
175177
sync_mode=True):
176178
"""
@@ -183,7 +185,8 @@ def transpile(self,
183185
parameter servers.
184186
185187
Steps to transpile trainer:
186-
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
188+
1. split variable to multiple blocks, aligned by product(dim[1:]) (width)
189+
if align_var_to_block is True
187190
2. rename splited grad variables to add trainer_id suffix ".trainer_%d".
188191
3. modify trainer program add split_op to each grad variable.
189192
4. append send_op to send splited variables to server and fetch
@@ -293,9 +296,18 @@ def transpile(self,
293296
for index in range(len(self.pserver_endpoints))
294297
]
295298

296-
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
297-
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
299+
if align_var_to_block:
300+
grad_blocks = split_dense_variable(grad_list,
301+
len(pserver_endpoints))
302+
param_blocks = split_dense_variable(param_list,
303+
len(pserver_endpoints))
304+
else:
305+
# when we do NOT align var to block, we will always split params
306+
# grads into one block.
307+
grad_blocks = split_dense_variable(grad_list, 1)
308+
param_blocks = split_dense_variable(param_list, 1)
298309
assert (len(grad_blocks) == len(param_blocks))
310+
299311
# step2: Create new vars for the parameters and gradients blocks and
300312
# add ops to do the split.
301313
param_var_mapping = self._create_vars_from_blocklist(program,
@@ -325,8 +337,22 @@ def transpile(self,
325337
# step 3.1: insert send op to send gradient vars to parameter servers
326338
ps_dispatcher.reset()
327339
send_vars = []
328-
for orig_varname, splited_vars in grad_var_mapping.items():
340+
341+
# in general cases, the number of pservers is times of 2, and this
342+
# will lead to uneven distribution among weights and bias:
343+
# fc_w@GRAD_trainer_0, fc_w@GRAD_trainer_1 --> pserver1
344+
# fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2
345+
# shuffle the map will avoid the uneven distribution above
346+
grad_var_mapping_items = grad_var_mapping.items()
347+
if not align_var_to_block:
348+
np.random.shuffle(grad_var_mapping_items)
349+
350+
for orig_varname, splited_vars in grad_var_mapping_items:
329351
eplist = ps_dispatcher.dispatch(splited_vars)
352+
353+
if not align_var_to_block:
354+
assert (len(splited_vars) == 1)
355+
330356
if len(splited_vars) == 1:
331357
orig_varname = splited_vars[0].name
332358
index = find_op_by_output_arg(program.global_block(),
@@ -374,7 +400,7 @@ def transpile(self,
374400
for i, ep in enumerate(eplist):
375401
self.param_grad_ep_mapping[ep]["params"].append(recv_vars[i])
376402
self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i])
377-
# step4: Concat the parameters splits together after recv.
403+
378404
for varname, splited_var in param_var_mapping.iteritems():
379405
eps = []
380406
for var in splited_var:
@@ -399,6 +425,7 @@ def transpile(self,
399425
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
400426
})
401427

428+
# step4: Concat the parameters splits together after recv.
402429
for varname, splited_var in param_var_mapping.iteritems():
403430
if len(splited_var) <= 1:
404431
continue
@@ -849,8 +876,8 @@ def _create_vars_from_blocklist(self,
849876
program (ProgramDesc): ProgramDesc which gradients blong.
850877
block_list (list[(varname, block_id, block_size)]): List of gradient blocks.
851878
add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True.
852-
Returns:
853-
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
879+
Returns:
880+
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
854881
from original var name to each var split.
855882
"""
856883

0 commit comments

Comments
 (0)