Skip to content

Commit f54efd0

Browse files
authored
Merge pull request #12660 from typhoonzero/polish_dist_unittests
polish dist unitest
2 parents 51cc80c + 069ff14 commit f54efd0

File tree

7 files changed

+377
-528
lines changed

7 files changed

+377
-528
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import numpy as np
16+
import argparse
17+
import time
18+
import math
19+
20+
import paddle
21+
import paddle.fluid as fluid
22+
import paddle.fluid.profiler as profiler
23+
from paddle.fluid import core
24+
import unittest
25+
from multiprocessing import Process
26+
import os
27+
import signal
28+
from functools import reduce
29+
from test_dist_base import TestDistRunnerBase, runtime_main
30+
31+
DTYPE = "float32"
32+
paddle.dataset.mnist.fetch()
33+
34+
# Fix seed for test
35+
fluid.default_startup_program().random_seed = 1
36+
fluid.default_main_program().random_seed = 1
37+
38+
39+
def cnn_model(data):
40+
conv_pool_1 = fluid.nets.simple_img_conv_pool(
41+
input=data,
42+
filter_size=5,
43+
num_filters=20,
44+
pool_size=2,
45+
pool_stride=2,
46+
act="relu",
47+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant()))
48+
conv_pool_2 = fluid.nets.simple_img_conv_pool(
49+
input=conv_pool_1,
50+
filter_size=5,
51+
num_filters=50,
52+
pool_size=2,
53+
pool_stride=2,
54+
act="relu",
55+
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Constant()))
56+
57+
SIZE = 10
58+
input_shape = conv_pool_2.shape
59+
param_shape = [reduce(lambda a, b: a * b, input_shape[1:], 1)] + [SIZE]
60+
scale = (2.0 / (param_shape[0]**2 * SIZE))**0.5
61+
62+
predict = fluid.layers.fc(
63+
input=conv_pool_2,
64+
size=SIZE,
65+
act="softmax",
66+
param_attr=fluid.param_attr.ParamAttr(
67+
initializer=fluid.initializer.NormalInitializer(
68+
loc=0.0, scale=scale, seed=1)))
69+
return predict
70+
71+
72+
class TestDistMnist2x2(TestDistRunnerBase):
73+
def get_model(self, batch_size=2):
74+
# Input data
75+
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
76+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
77+
78+
# Train program
79+
predict = cnn_model(images)
80+
cost = fluid.layers.cross_entropy(input=predict, label=label)
81+
avg_cost = fluid.layers.mean(x=cost)
82+
83+
# Evaluator
84+
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
85+
batch_acc = fluid.layers.accuracy(
86+
input=predict, label=label, total=batch_size_tensor)
87+
88+
inference_program = fluid.default_main_program().clone()
89+
# Optimization
90+
opt = fluid.optimizer.AdamOptimizer(
91+
learning_rate=0.001, beta1=0.9, beta2=0.999)
92+
93+
# Reader
94+
train_reader = paddle.batch(
95+
paddle.dataset.mnist.train(), batch_size=batch_size)
96+
test_reader = paddle.batch(
97+
paddle.dataset.mnist.test(), batch_size=batch_size)
98+
opt.minimize(avg_cost)
99+
return inference_program, avg_cost, train_reader, test_reader, batch_acc, predict
100+
101+
102+
if __name__ == "__main__":
103+
runtime_main(TestDistMnist2x2)

python/paddle/fluid/tests/unittests/dist_se_resnext.py

Lines changed: 39 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import os
2828
import sys
2929
import signal
30+
from test_dist_base import TestDistRunnerBase, runtime_main
3031

3132
# Fix seed for test
3233
fluid.default_startup_program().random_seed = 1
@@ -196,161 +197,52 @@ def squeeze_excitation(self, input, num_channels, reduction_ratio):
196197
return scale
197198

198199

199-
def get_model(batch_size):
200-
# Input data
201-
image = fluid.layers.data(name="data", shape=[3, 224, 224], dtype='float32')
202-
label = fluid.layers.data(name="int64", shape=[1], dtype='int64')
200+
class DistSeResneXt2x2(TestDistRunnerBase):
201+
def get_model(self, batch_size=2):
202+
# Input data
203+
image = fluid.layers.data(
204+
name="data", shape=[3, 224, 224], dtype='float32')
205+
label = fluid.layers.data(name="int64", shape=[1], dtype='int64')
203206

204-
# Train program
205-
model = SE_ResNeXt(layers=50)
206-
out = model.net(input=image, class_dim=102)
207-
cost = fluid.layers.cross_entropy(input=out, label=label)
207+
# Train program
208+
model = SE_ResNeXt(layers=50)
209+
out = model.net(input=image, class_dim=102)
210+
cost = fluid.layers.cross_entropy(input=out, label=label)
208211

209-
avg_cost = fluid.layers.mean(x=cost)
210-
acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1)
211-
acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5)
212+
avg_cost = fluid.layers.mean(x=cost)
213+
acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1)
214+
acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5)
212215

213-
# Evaluator
214-
test_program = fluid.default_main_program().clone(for_test=True)
216+
# Evaluator
217+
test_program = fluid.default_main_program().clone(for_test=True)
215218

216-
# Optimization
217-
total_images = 6149 # flowers
218-
epochs = [30, 60, 90]
219-
step = int(total_images / batch_size + 1)
219+
# Optimization
220+
total_images = 6149 # flowers
221+
epochs = [30, 60, 90]
222+
step = int(total_images / batch_size + 1)
220223

221-
bd = [step * e for e in epochs]
222-
base_lr = 0.1
223-
lr = []
224-
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
224+
bd = [step * e for e in epochs]
225+
base_lr = 0.1
226+
lr = []
227+
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
225228

226-
optimizer = fluid.optimizer.Momentum(
227-
# FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed.
228-
#learning_rate=fluid.layers.piecewise_decay(
229-
# boundaries=bd, values=lr),
230-
learning_rate=base_lr,
231-
momentum=0.9,
232-
regularization=fluid.regularizer.L2Decay(1e-4))
233-
optimizer.minimize(avg_cost)
229+
optimizer = fluid.optimizer.Momentum(
230+
# FIXME(typhoonzero): add back LR decay once ParallelExecutor fixed.
231+
#learning_rate=fluid.layers.piecewise_decay(
232+
# boundaries=bd, values=lr),
233+
learning_rate=base_lr,
234+
momentum=0.9,
235+
regularization=fluid.regularizer.L2Decay(1e-4))
236+
optimizer.minimize(avg_cost)
234237

235-
# Reader
236-
train_reader = paddle.batch(
237-
paddle.dataset.flowers.train(), batch_size=batch_size)
238-
test_reader = paddle.batch(
239-
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
238+
# Reader
239+
train_reader = paddle.batch(
240+
paddle.dataset.flowers.train(), batch_size=batch_size)
241+
test_reader = paddle.batch(
242+
paddle.dataset.flowers.test(use_xmap=False), batch_size=batch_size)
240243

241-
return test_program, avg_cost, train_reader, test_reader, acc_top1, out
242-
243-
244-
def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers):
245-
t = fluid.DistributeTranspiler()
246-
t.transpile(
247-
trainer_id=trainer_id,
248-
program=main_program,
249-
pservers=pserver_endpoints,
250-
trainers=trainers)
251-
return t
252-
253-
254-
class DistSeResneXt2x2:
255-
def run_pserver(self, pserver_endpoints, trainers, current_endpoint,
256-
trainer_id):
257-
get_model(batch_size=2)
258-
t = get_transpiler(trainer_id,
259-
fluid.default_main_program(), pserver_endpoints,
260-
trainers)
261-
pserver_prog = t.get_pserver_program(current_endpoint)
262-
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
263-
place = fluid.CPUPlace()
264-
exe = fluid.Executor(place)
265-
exe.run(startup_prog)
266-
exe.run(pserver_prog)
267-
268-
def _wait_ps_ready(self, pid):
269-
retry_times = 20
270-
while True:
271-
assert retry_times >= 0, "wait ps ready failed"
272-
time.sleep(3)
273-
print("waiting ps ready: ", pid)
274-
try:
275-
# the listen_and_serv_op would touch a file which contains the listen port
276-
# on the /tmp directory until it was ready to process all the RPC call.
277-
os.stat("/tmp/paddle.%d.port" % pid)
278-
return
279-
except os.error:
280-
retry_times -= 1
281-
282-
def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True):
283-
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = get_model(
284-
batch_size=2)
285-
if is_dist:
286-
t = get_transpiler(trainer_id,
287-
fluid.default_main_program(), endpoints,
288-
trainers)
289-
trainer_prog = t.get_trainer_program()
290-
else:
291-
trainer_prog = fluid.default_main_program()
292-
293-
startup_exe = fluid.Executor(place)
294-
startup_exe.run(fluid.default_startup_program())
295-
296-
strategy = fluid.ExecutionStrategy()
297-
strategy.num_threads = 1
298-
strategy.allow_op_delay = False
299-
exe = fluid.ParallelExecutor(
300-
True, loss_name=avg_cost.name, exec_strategy=strategy)
301-
302-
feed_var_list = [
303-
var for var in trainer_prog.global_block().vars.values()
304-
if var.is_data
305-
]
306-
307-
feeder = fluid.DataFeeder(feed_var_list, place)
308-
reader_generator = test_reader()
309-
310-
data = next(reader_generator)
311-
first_loss, = exe.run(fetch_list=[avg_cost.name],
312-
feed=feeder.feed(data))
313-
print(first_loss)
314-
315-
for i in six.moves.xrange(5):
316-
data = next(reader_generator)
317-
loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data))
318-
319-
data = next(reader_generator)
320-
last_loss, = exe.run(fetch_list=[avg_cost.name], feed=feeder.feed(data))
321-
print(last_loss)
322-
323-
324-
def main(role="pserver",
325-
endpoints="127.0.0.1:9123",
326-
trainer_id=0,
327-
current_endpoint="127.0.0.1:9123",
328-
trainers=1,
329-
is_dist=True):
330-
model = DistSeResneXt2x2()
331-
if role == "pserver":
332-
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id)
333-
else:
334-
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
335-
) else fluid.CPUPlace()
336-
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist)
244+
return test_program, avg_cost, train_reader, test_reader, acc_top1, out
337245

338246

339247
if __name__ == "__main__":
340-
if len(sys.argv) != 7:
341-
print(
342-
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]"
343-
)
344-
role = sys.argv[1]
345-
endpoints = sys.argv[2]
346-
trainer_id = int(sys.argv[3])
347-
current_endpoint = sys.argv[4]
348-
trainers = int(sys.argv[5])
349-
is_dist = True if sys.argv[6] == "TRUE" else False
350-
main(
351-
role=role,
352-
endpoints=endpoints,
353-
trainer_id=trainer_id,
354-
current_endpoint=current_endpoint,
355-
trainers=trainers,
356-
is_dist=is_dist)
248+
runtime_main(DistSeResneXt2x2)

0 commit comments

Comments
 (0)