Skip to content

Commit f63368d

Browse files
authored
Add async dist tests (#12798)
* add async dist tests * update delta * fix transformer test * refine rmsprop transpile * update * fix dist seresnet
1 parent 9f33222 commit f63368d

File tree

9 files changed

+100
-55
lines changed

9 files changed

+100
-55
lines changed

python/paddle/fluid/tests/unittests/dist_mnist.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,17 @@ def cnn_model(data):
4646
pool_size=2,
4747
pool_stride=2,
4848
act="relu",
49-
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant()))
49+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
50+
value=0.3)))
5051
conv_pool_2 = fluid.nets.simple_img_conv_pool(
5152
input=conv_pool_1,
5253
filter_size=5,
5354
num_filters=50,
5455
pool_size=2,
5556
pool_stride=2,
5657
act="relu",
57-
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant()))
58+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant(
59+
value=0.2)))
5860

5961
SIZE = 10
6062
input_shape = conv_pool_2.shape
@@ -66,8 +68,7 @@ def cnn_model(data):
6668
size=SIZE,
6769
act="softmax",
6870
param_attr=fluid.param_attr.ParamAttr(
69-
initializer=fluid.initializer.NormalInitializer(
70-
loc=0.0, scale=scale, seed=1)))
71+
initializer=fluid.initializer.Constant(value=0.1)))
7172
return predict
7273

7374

python/paddle/fluid/tests/unittests/dist_se_resnext.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ def net(self, input, class_dim=1000):
129129
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
130130
drop = fluid.layers.dropout(x=pool, dropout_prob=0.2)
131131
stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0)
132-
out = fluid.layers.fc(input=drop, size=class_dim, act='softmax')
132+
out = fluid.layers.fc(
133+
input=drop,
134+
size=class_dim,
135+
act='softmax',
136+
param_attr=fluid.ParamAttr(
137+
initializer=fluid.initializer.Constant(value=0.2)))
133138
return out
134139

135140
def shortcut(self, input, ch_out, stride):
@@ -179,7 +184,7 @@ def conv_bn_layer(self,
179184
act=None,
180185
# avoid pserver CPU init differs from GPU
181186
param_attr=fluid.ParamAttr(
182-
initializer=fluid.initializer.Constant()),
187+
initializer=fluid.initializer.Constant(value=0.2)),
183188
bias_attr=False)
184189
return fluid.layers.batch_norm(input=conv, act=act)
185190

@@ -228,10 +233,8 @@ def get_model(self, batch_size=2):
228233
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
229234

230235
optimizer = fluid.optimizer.Momentum(
231-
# FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed.
232-
#learning_rate=fluid.layers.piecewise_decay(
233-
# boundaries=bd, values=lr),
234-
learning_rate=base_lr,
236+
learning_rate=fluid.layers.piecewise_decay(
237+
boundaries=bd, values=lr),
235238
momentum=0.9,
236239
regularization=fluid.regularizer.L2Decay(1e-4))
237240
optimizer.minimize(avg_cost)

python/paddle/fluid/tests/unittests/dist_transformer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,18 @@ def main(role="pserver",
265265

266266

267267
if __name__ == "__main__":
268-
if len(sys.argv) != 7:
268+
if len(sys.argv) != 8:
269269
print(
270-
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]"
270+
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
271271
)
272272
role = sys.argv[1]
273273
endpoints = sys.argv[2]
274274
trainer_id = int(sys.argv[3])
275275
current_endpoint = sys.argv[4]
276276
trainers = int(sys.argv[5])
277277
is_dist = True if sys.argv[6] == "TRUE" else False
278+
# FIXME(typhoonzero): refine this test.
279+
is_async = True if sys.argv[7] == "TRUE" else False
278280
main(
279281
role=role,
280282
endpoints=endpoints,

python/paddle/fluid/tests/unittests/test_dist_base.py

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def get_model(self, batch_size=2):
3030
"get_model should be implemented by child classes.")
3131

3232
def get_transpiler(self, trainer_id, main_program, pserver_endpoints,
33-
trainers):
33+
trainers, sync_mode):
3434
# NOTE: import fluid until runtime, or else forking processes will cause error.
3535
import paddle
3636
import paddle.fluid as fluid
@@ -39,33 +39,44 @@ def get_transpiler(self, trainer_id, main_program, pserver_endpoints,
3939
trainer_id=trainer_id,
4040
program=main_program,
4141
pservers=pserver_endpoints,
42-
trainers=trainers)
42+
trainers=trainers,
43+
sync_mode=sync_mode)
4344
return t
4445

45-
def run_pserver(self, pserver_endpoints, trainers, current_endpoint,
46-
trainer_id):
46+
def run_pserver(self,
47+
pserver_endpoints,
48+
trainers,
49+
current_endpoint,
50+
trainer_id,
51+
sync_mode=True):
4752
import paddle
4853
import paddle.fluid as fluid
4954
self.get_model(batch_size=2)
5055
t = self.get_transpiler(trainer_id,
5156
fluid.default_main_program(), pserver_endpoints,
52-
trainers)
57+
trainers, sync_mode)
5358
pserver_prog = t.get_pserver_program(current_endpoint)
5459
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
5560
place = fluid.CPUPlace()
5661
exe = fluid.Executor(place)
5762
exe.run(startup_prog)
5863
exe.run(pserver_prog)
5964

60-
def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True):
65+
def run_trainer(self,
66+
place,
67+
endpoints,
68+
trainer_id,
69+
trainers,
70+
is_dist=True,
71+
sync_mode=True):
6172
import paddle
6273
import paddle.fluid as fluid
6374
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
6475
self.get_model(batch_size=2)
6576
if is_dist:
6677
t = self.get_transpiler(trainer_id,
6778
fluid.default_main_program(), endpoints,
68-
trainers)
79+
trainers, sync_mode)
6980
trainer_prog = t.get_trainer_program()
7081
else:
7182
trainer_prog = fluid.default_main_program()
@@ -106,44 +117,53 @@ def runtime_main(test_class):
106117
import paddle.fluid as fluid
107118
import paddle.fluid.core as core
108119

109-
if len(sys.argv) != 7:
120+
if len(sys.argv) != 8:
110121
print(
111-
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]"
122+
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
112123
)
113124
role = sys.argv[1]
114125
endpoints = sys.argv[2]
115126
trainer_id = int(sys.argv[3])
116127
current_endpoint = sys.argv[4]
117128
trainers = int(sys.argv[5])
118129
is_dist = True if sys.argv[6] == "TRUE" else False
130+
sync_mode = True if sys.argv[7] == "TRUE" else False
119131

120132
model = test_class()
121133
if role == "pserver":
122-
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id)
134+
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id,
135+
sync_mode)
123136
else:
124137
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
125138
) else fluid.CPUPlace()
126-
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist)
139+
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist,
140+
sync_mode)
127141

128142

129143
import paddle.compat as cpt
130144

131145

132146
class TestDistBase(unittest.TestCase):
147+
def _setup_config(self):
148+
raise NotImplementedError("tests should have _setup_config implemented")
149+
133150
def setUp(self):
134151
self._trainers = 2
135152
self._pservers = 2
136153
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
137154
self._python_interp = "python"
155+
self._sync_mode = True
156+
self._setup_config()
138157

139158
def start_pserver(self, model_file, check_error_log):
159+
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
140160
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
141-
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE" % \
161+
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
142162
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
143-
self._trainers)
144-
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE" % \
163+
self._trainers, sync_mode_str)
164+
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
145165
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
146-
self._trainers)
166+
self._trainers, sync_mode_str)
147167

148168
ps0_pipe = subprocess.PIPE
149169
ps1_pipe = subprocess.PIPE
@@ -195,9 +215,10 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
195215
# Run local to get a base line
196216
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
197217
env_local.update(required_envs)
198-
local_cmd = "%s %s trainer %s 0 %s %d FLASE" % \
218+
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
219+
local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \
199220
(self._python_interp, model_file,
200-
"127.0.0.1:1234", "127.0.0.1:1234", 1)
221+
"127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str)
201222
if not check_error_log:
202223
local_proc = subprocess.Popen(
203224
local_cmd.split(" "),
@@ -226,12 +247,12 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
226247
self._wait_ps_ready(ps1.pid)
227248

228249
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
229-
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE" % \
250+
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \
230251
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
231-
self._trainers)
232-
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE" % \
252+
self._trainers, sync_mode_str)
253+
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \
233254
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
234-
self._trainers)
255+
self._trainers, sync_mode_str)
235256

236257
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
237258
env1 = {"CUDA_VISIBLE_DEVICES": "1"}

python/paddle/fluid/tests/unittests/test_dist_mnist.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,21 @@
1717
from test_dist_base import TestDistBase
1818

1919

20-
class TestDistSeResneXt2x2(TestDistBase):
20+
class TestDistMnist2x2(TestDistBase):
21+
def _setup_config(self):
22+
self._sync_mode = True
23+
2124
def test_se_resnext(self):
2225
self.check_with_place("dist_mnist.py", delta=1e-7)
2326

2427

28+
class TestDistMnistAsync(TestDistBase):
29+
def _setup_config(self):
30+
self._sync_mode = False
31+
32+
def test_se_resnext(self):
33+
self.check_with_place("dist_mnist.py", delta=200)
34+
35+
2536
if __name__ == "__main__":
2637
unittest.main()

python/paddle/fluid/tests/unittests/test_dist_se_resnext.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,20 @@
1818

1919

2020
class TestDistSeResneXt2x2(TestDistBase):
21+
def _setup_config(self):
22+
self._sync_mode = True
23+
2124
def test_se_resnext(self):
2225
self.check_with_place("dist_se_resnext.py", delta=1e-7)
2326

2427

28+
class TestDistSeResneXt2x2Async(TestDistBase):
29+
def _setup_config(self):
30+
self._sync_mode = False
31+
32+
def test_se_resnext(self):
33+
self.check_with_place("dist_se_resnext.py", delta=100)
34+
35+
2536
if __name__ == "__main__":
2637
unittest.main()

python/paddle/fluid/tests/unittests/test_dist_transformer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020

2121
class TestDistTransformer2x2(TestDistBase):
22+
def _setup_config(self):
23+
self._sync_mode = True
24+
2225
def test_transformer(self):
2326
# TODO(paddle-dev): check if the delta is OK.
2427
# Usually start around ~8000 and converge to ~5000

python/paddle/fluid/tests/unittests/test_dist_word2vec.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,20 @@
1818

1919

2020
class TestDistSeResneXt2x2(TestDistBase):
21+
def _setup_config(self):
22+
self._sync_mode = True
23+
2124
def test_se_resnext(self):
2225
self.check_with_place("dist_word2vec.py", delta=1e-7)
2326

2427

28+
class TestDistSeResneXt2x2Async(TestDistBase):
29+
def _setup_config(self):
30+
self._sync_mode = False
31+
32+
def test_se_resnext(self):
33+
self.check_with_place("dist_word2vec.py", delta=1)
34+
35+
2536
if __name__ == "__main__":
2637
unittest.main()

python/paddle/fluid/transpiler/distribute_transpiler.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,8 +1209,8 @@ def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
12091209
elif op_type == "momentum":
12101210
if varkey == "Velocity":
12111211
return param_shape
1212-
elif op_type == "":
1213-
if varkey == "Moment":
1212+
elif op_type == "rmsprop":
1213+
if varkey in ["Moment", "MeanSquare"]:
12141214
return param_shape
12151215
elif op_type == "sgd":
12161216
pass
@@ -1289,8 +1289,6 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint,
12891289
pserver_block = program.global_block()
12901290
new_inputs = collections.OrderedDict()
12911291

1292-
# update param/grad shape first, then other inputs like
1293-
# moment can use the updated shape
12941292
def _get_param_block(opt_op):
12951293
# param is already created on global program
12961294
param_block = None
@@ -1303,22 +1301,6 @@ def _get_param_block(opt_op):
13031301
for key in opt_op.input_names:
13041302
if key == "Grad":
13051303
new_inputs[key] = merged_var
1306-
# For RMSProp optimizer
1307-
elif key == "Moment" or key == "MeanSquare":
1308-
param_block = _get_param_block(opt_op)
1309-
if not param_block:
1310-
return
1311-
moment_var = origin_program.global_block().vars[opt_op.input(
1312-
key)[0]]
1313-
tmpvar = pserver_block.create_var(
1314-
name=moment_var.name,
1315-
persistable=moment_var.persistable,
1316-
dtype=moment_var.dtype,
1317-
# change to use same shape as param
1318-
# TODO(typhoonzero): didn't append .block in the var name,
1319-
# may affect checkpoint saving? Need to verify.
1320-
shape=param_block.shape)
1321-
new_inputs[key] = tmpvar
13221304
elif key == "Param":
13231305
param_block = _get_param_block(opt_op)
13241306
if not param_block:
@@ -1346,7 +1328,7 @@ def _get_param_block(opt_op):
13461328

13471329
for key in opt_op.input_names:
13481330
new_shape = None
1349-
if key in ["Param", "Grad", "LearningRate", "Moment", "MeanSquare"]:
1331+
if key in ["Param", "Grad", "LearningRate"]:
13501332
continue
13511333
var = self.origin_program.global_block().vars[opt_op.input(key)[0]]
13521334
# update accumulator variable shape

0 commit comments

Comments
 (0)