Skip to content

Commit 4d4322a

Browse files
authored
merge fluid dist tests (#8573)
* merge fluid dist tests * update cmake
1 parent ec33832 commit 4d4322a

20 files changed

+495
-1682
lines changed

python/paddle/fluid/tests/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ endforeach()
77

88
add_subdirectory(unittests)
99
add_subdirectory(book)
10-
add_subdirectory(book_distribute)
1110
add_subdirectory(book_memory_optimization)

python/paddle/fluid/tests/book/test_fit_a_line.py

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
import unittest
2020
import math
2121
import sys
22+
import os
2223

2324

24-
def train(use_cuda, save_dirname):
25+
def train(use_cuda, save_dirname, is_local):
2526
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
2627

2728
y_predict = fluid.layers.fc(input=x, size=1, act=None)
@@ -32,7 +33,7 @@ def train(use_cuda, save_dirname):
3233
avg_cost = fluid.layers.mean(cost)
3334

3435
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
35-
sgd_optimizer.minimize(avg_cost)
36+
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
3637

3738
BATCH_SIZE = 20
3839

@@ -42,27 +43,57 @@ def train(use_cuda, save_dirname):
4243
batch_size=BATCH_SIZE)
4344

4445
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
45-
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
4646
exe = fluid.Executor(place)
4747

48-
exe.run(fluid.default_startup_program())
49-
50-
PASS_NUM = 100
51-
for pass_id in range(PASS_NUM):
52-
for data in train_reader():
53-
avg_loss_value, = exe.run(fluid.default_main_program(),
54-
feed=feeder.feed(data),
55-
fetch_list=[avg_cost])
56-
print(avg_loss_value)
57-
if avg_loss_value[0] < 10.0:
58-
if save_dirname is not None:
59-
fluid.io.save_inference_model(save_dirname, ['x'],
60-
[y_predict], exe)
61-
return
62-
if math.isnan(float(avg_loss_value)):
63-
sys.exit("got NaN loss, training failed.")
64-
raise AssertionError("Fit a line cost is too large, {0:2.2}".format(
65-
avg_loss_value[0]))
48+
def train_loop(main_program):
49+
feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
50+
exe.run(fluid.default_startup_program())
51+
52+
PASS_NUM = 100
53+
for pass_id in range(PASS_NUM):
54+
for data in train_reader():
55+
avg_loss_value, = exe.run(main_program,
56+
feed=feeder.feed(data),
57+
fetch_list=[avg_cost])
58+
print(avg_loss_value)
59+
if avg_loss_value[0] < 10.0:
60+
if save_dirname is not None:
61+
fluid.io.save_inference_model(save_dirname, ['x'],
62+
[y_predict], exe)
63+
return
64+
if math.isnan(float(avg_loss_value)):
65+
sys.exit("got NaN loss, training failed.")
66+
raise AssertionError("Fit a line cost is too large, {0:2.2}".format(
67+
avg_loss_value[0]))
68+
69+
if is_local:
70+
train_loop(fluid.default_main_program())
71+
else:
72+
port = os.getenv("PADDLE_INIT_PORT", "6174")
73+
pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip...
74+
eplist = []
75+
for ip in pserver_ips.split(","):
76+
eplist.append(':'.join([ip, port]))
77+
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
78+
trainers = int(os.getenv("TRAINERS"))
79+
current_endpoint = os.getenv("POD_IP") + ":" + port
80+
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
81+
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
82+
t = fluid.DistributeTranspiler()
83+
t.transpile(
84+
optimize_ops,
85+
params_grads,
86+
trainer_id,
87+
pservers=pserver_endpoints,
88+
trainers=trainers)
89+
if training_role == "PSERVER":
90+
pserver_prog = t.get_pserver_program(current_endpoint)
91+
pserver_startup = t.get_startup_program(current_endpoint,
92+
pserver_prog)
93+
exe.run(pserver_startup)
94+
exe.run(pserver_prog)
95+
elif training_role == "TRAINER":
96+
train_loop(t.get_trainer_program())
6697

6798

6899
def infer(use_cuda, save_dirname=None):
@@ -94,14 +125,14 @@ def infer(use_cuda, save_dirname=None):
94125
print("infer results: ", results[0])
95126

96127

97-
def main(use_cuda):
128+
def main(use_cuda, is_local=True):
98129
if use_cuda and not fluid.core.is_compiled_with_cuda():
99130
return
100131

101132
# Directory for saving the trained model
102133
save_dirname = "fit_a_line.inference.model"
103134

104-
train(use_cuda, save_dirname)
135+
train(use_cuda, save_dirname, is_local)
105136
infer(use_cuda, save_dirname)
106137

107138

python/paddle/fluid/tests/book/test_image_classification.py

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import sys
2222
import numpy
2323
import unittest
24+
import os
2425

2526

2627
def resnet_cifar10(input, depth=32):
@@ -92,7 +93,7 @@ def conv_block(input, num_filter, groups, dropouts):
9293
return fc2
9394

9495

95-
def train(net_type, use_cuda, save_dirname):
96+
def train(net_type, use_cuda, save_dirname, is_local):
9697
classdim = 10
9798
data_shape = [3, 32, 32]
9899

@@ -117,7 +118,7 @@ def train(net_type, use_cuda, save_dirname):
117118
test_program = fluid.default_main_program().clone()
118119

119120
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
120-
optimizer.minimize(avg_cost)
121+
optimize_ops, params_grads = optimizer.minimize(avg_cost)
121122

122123
BATCH_SIZE = 128
123124
PASS_NUM = 1
@@ -133,38 +134,68 @@ def train(net_type, use_cuda, save_dirname):
133134
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
134135
exe = fluid.Executor(place)
135136
feeder = fluid.DataFeeder(place=place, feed_list=[images, label])
136-
exe.run(fluid.default_startup_program())
137-
138-
loss = 0.0
139-
for pass_id in range(PASS_NUM):
140-
for batch_id, data in enumerate(train_reader()):
141-
exe.run(feed=feeder.feed(data))
142-
143-
if (batch_id % 10) == 0:
144-
acc_list = []
145-
avg_loss_list = []
146-
for tid, test_data in enumerate(test_reader()):
147-
loss_t, acc_t = exe.run(program=test_program,
148-
feed=feeder.feed(test_data),
149-
fetch_list=[avg_cost, acc])
150-
if math.isnan(float(loss_t)):
151-
sys.exit("got NaN loss, training failed.")
152-
acc_list.append(float(acc_t))
153-
avg_loss_list.append(float(loss_t))
154-
break # Use 1 segment for speeding up CI
155-
156-
acc_value = numpy.array(acc_list).mean()
157-
avg_loss_value = numpy.array(avg_loss_list).mean()
158-
159-
print(
160-
'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'.
161-
format(pass_id, batch_id + 1,
162-
float(avg_loss_value), float(acc_value)))
163-
164-
if acc_value > 0.01: # Low threshold for speeding up CI
165-
fluid.io.save_inference_model(save_dirname, ["pixel"],
166-
[predict], exe)
167-
return
137+
138+
def train_loop(main_program):
139+
exe.run(fluid.default_startup_program())
140+
loss = 0.0
141+
for pass_id in range(PASS_NUM):
142+
for batch_id, data in enumerate(train_reader()):
143+
exe.run(main_program, feed=feeder.feed(data))
144+
145+
if (batch_id % 10) == 0:
146+
acc_list = []
147+
avg_loss_list = []
148+
for tid, test_data in enumerate(test_reader()):
149+
loss_t, acc_t = exe.run(program=test_program,
150+
feed=feeder.feed(test_data),
151+
fetch_list=[avg_cost, acc])
152+
if math.isnan(float(loss_t)):
153+
sys.exit("got NaN loss, training failed.")
154+
acc_list.append(float(acc_t))
155+
avg_loss_list.append(float(loss_t))
156+
break # Use 1 segment for speeding up CI
157+
158+
acc_value = numpy.array(acc_list).mean()
159+
avg_loss_value = numpy.array(avg_loss_list).mean()
160+
161+
print(
162+
'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'.
163+
format(pass_id, batch_id + 1,
164+
float(avg_loss_value), float(acc_value)))
165+
166+
if acc_value > 0.01: # Low threshold for speeding up CI
167+
fluid.io.save_inference_model(save_dirname, ["pixel"],
168+
[predict], exe)
169+
return
170+
171+
if is_local:
172+
train_loop(fluid.default_main_program())
173+
else:
174+
port = os.getenv("PADDLE_INIT_PORT", "6174")
175+
pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip...
176+
eplist = []
177+
for ip in pserver_ips.split(","):
178+
eplist.append(':'.join([ip, port]))
179+
pserver_endpoints = ",".join(eplist) # ip:port,ip:port...
180+
trainers = int(os.getenv("TRAINERS"))
181+
current_endpoint = os.getenv("POD_IP") + ":" + port
182+
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
183+
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
184+
t = fluid.DistributeTranspiler()
185+
t.transpile(
186+
optimize_ops,
187+
params_grads,
188+
trainer_id,
189+
pservers=pserver_endpoints,
190+
trainers=trainers)
191+
if training_role == "PSERVER":
192+
pserver_prog = t.get_pserver_program(current_endpoint)
193+
pserver_startup = t.get_startup_program(current_endpoint,
194+
pserver_prog)
195+
exe.run(pserver_startup)
196+
exe.run(pserver_prog)
197+
elif training_role == "TRAINER":
198+
train_loop(t.get_trainer_program())
168199

169200

170201
def infer(use_cuda, save_dirname=None):
@@ -196,14 +227,14 @@ def infer(use_cuda, save_dirname=None):
196227
print("infer results: ", results[0])
197228

198229

199-
def main(net_type, use_cuda):
230+
def main(net_type, use_cuda, is_local=True):
200231
if use_cuda and not fluid.core.is_compiled_with_cuda():
201232
return
202233

203234
# Directory for saving the trained model
204235
save_dirname = "image_classification_" + net_type + ".inference.model"
205236

206-
train(net_type, use_cuda, save_dirname)
237+
train(net_type, use_cuda, save_dirname, is_local)
207238
infer(use_cuda, save_dirname)
208239

209240

0 commit comments

Comments
 (0)