Skip to content

Commit a615ad4

Browse files
authored
Add test for dist and memopt (#13049)
* add test for dist and memopt * update transformer too
1 parent 515a756 commit a615ad4

File tree

3 files changed

+91
-113
lines changed

3 files changed

+91
-113
lines changed

python/paddle/fluid/tests/unittests/dist_transformer.py

Lines changed: 17 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,16 +1667,6 @@ def get_model(is_dist, is_async):
16671667
return sum_cost, avg_cost, predict, token_num, local_lr_scheduler
16681668

16691669

1670-
def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers):
1671-
t = fluid.DistributeTranspiler()
1672-
t.transpile(
1673-
trainer_id=trainer_id,
1674-
program=main_program,
1675-
pservers=pserver_endpoints,
1676-
trainers=trainers)
1677-
return t
1678-
1679-
16801670
def update_args():
16811671
src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath)
16821672
trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath)
@@ -1691,69 +1681,46 @@ def update_args():
16911681

16921682

16931683
class DistTransformer2x2(TestDistRunnerBase):
1694-
def run_pserver(self, pserver_endpoints, trainers, current_endpoint,
1695-
trainer_id, sync_mode):
1696-
get_model(True, not sync_mode)
1697-
t = get_transpiler(trainer_id,
1698-
fluid.default_main_program(), pserver_endpoints,
1699-
trainers)
1700-
pserver_prog = t.get_pserver_program(current_endpoint)
1701-
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
1684+
def run_pserver(self, args):
1685+
get_model(True, not args.sync_mode)
1686+
t = self.get_transpiler(args.trainer_id,
1687+
fluid.default_main_program(), args.endpoints,
1688+
args.trainers, args.sync_mode)
1689+
pserver_prog = t.get_pserver_program(args.current_endpoint)
1690+
startup_prog = t.get_startup_program(args.current_endpoint,
1691+
pserver_prog)
17021692

17031693
place = fluid.CPUPlace()
17041694
exe = fluid.Executor(place)
17051695
exe.run(startup_prog)
17061696
exe.run(pserver_prog)
17071697

1708-
def _wait_ps_ready(self, pid):
1709-
retry_times = 20
1710-
while True:
1711-
assert retry_times >= 0, "wait ps ready failed"
1712-
time.sleep(3)
1713-
try:
1714-
# the listen_and_serv_op would touch a file which contains the listen port
1715-
# on the /tmp directory until it was ready to process all the RPC call.
1716-
os.stat("/tmp/paddle.%d.port" % pid)
1717-
return
1718-
except os.error:
1719-
retry_times -= 1
1720-
1721-
def run_trainer(self,
1722-
place,
1723-
endpoints,
1724-
trainer_id,
1725-
trainers,
1726-
is_dist=True,
1727-
sync_mode=True):
1698+
def run_trainer(self, place, args):
17281699

17291700
sum_cost, avg_cost, predict, token_num, local_lr_scheduler = get_model(
1730-
is_dist, not sync_mode)
1701+
args.is_dist, not args.sync_mode)
17311702

1732-
if is_dist:
1733-
t = get_transpiler(trainer_id,
1734-
fluid.default_main_program(), endpoints,
1735-
trainers)
1703+
if args.is_dist:
1704+
t = self.get_transpiler(args.trainer_id,
1705+
fluid.default_main_program(),
1706+
args.endpoints, args.trainers,
1707+
args.sync_mode)
17361708
trainer_prog = t.get_trainer_program()
17371709
TrainTaskConfig.batch_size = 10
17381710
TrainTaskConfig.train_file_pattern = TrainTaskConfig.data_path + "train.tok.clean.bpe.32000.en-de.train_{}".format(
1739-
trainer_id)
1711+
args.trainer_id)
17401712
else:
17411713
TrainTaskConfig.batch_size = 20
17421714
trainer_prog = fluid.default_main_program()
17431715

17441716
startup_exe = fluid.Executor(place)
17451717

1746-
TrainTaskConfig.local = not is_dist
1718+
TrainTaskConfig.local = not args.is_dist
17471719

17481720
train_loop(startup_exe, trainer_prog, 1, sum_cost, avg_cost,
17491721
local_lr_scheduler, token_num, predict)
17501722

17511723

17521724
if __name__ == "__main__":
1753-
if len(sys.argv) != 8:
1754-
print(
1755-
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
1756-
)
1757-
17581725
update_args()
17591726
runtime_main(DistTransformer2x2)

python/paddle/fluid/tests/unittests/test_dist_base.py

Lines changed: 65 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import six
2222
import signal
2323
import subprocess
24-
import six
24+
import argparse
2525

2626

2727
class TestDistRunnerBase(object):
@@ -43,40 +43,35 @@ def get_transpiler(self, trainer_id, main_program, pserver_endpoints,
4343
sync_mode=sync_mode)
4444
return t
4545

46-
def run_pserver(self,
47-
pserver_endpoints,
48-
trainers,
49-
current_endpoint,
50-
trainer_id,
51-
sync_mode=True):
46+
def run_pserver(self, args):
5247
import paddle
5348
import paddle.fluid as fluid
5449
self.get_model(batch_size=2)
55-
t = self.get_transpiler(trainer_id,
56-
fluid.default_main_program(), pserver_endpoints,
57-
trainers, sync_mode)
58-
pserver_prog = t.get_pserver_program(current_endpoint)
59-
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
50+
if args.mem_opt:
51+
fluid.memory_optimize(fluid.default_main_program())
52+
t = self.get_transpiler(args.trainer_id,
53+
fluid.default_main_program(), args.endpoints,
54+
args.trainers, args.sync_mode)
55+
pserver_prog = t.get_pserver_program(args.current_endpoint)
56+
startup_prog = t.get_startup_program(args.current_endpoint,
57+
pserver_prog)
6058
place = fluid.CPUPlace()
6159
exe = fluid.Executor(place)
6260
exe.run(startup_prog)
6361
exe.run(pserver_prog)
6462

65-
def run_trainer(self,
66-
place,
67-
endpoints,
68-
trainer_id,
69-
trainers,
70-
is_dist=True,
71-
sync_mode=True):
63+
def run_trainer(self, place, args):
7264
import paddle
7365
import paddle.fluid as fluid
7466
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
75-
self.get_model(batch_size=2)
76-
if is_dist:
77-
t = self.get_transpiler(trainer_id,
78-
fluid.default_main_program(), endpoints,
79-
trainers, sync_mode)
67+
self.get_model(batch_size=2)
68+
if args.mem_opt:
69+
fluid.memory_optimize(fluid.default_main_program())
70+
if args.is_dist:
71+
t = self.get_transpiler(args.trainer_id,
72+
fluid.default_main_program(),
73+
args.endpoints, args.trainers,
74+
args.sync_mode)
8075
trainer_prog = t.get_trainer_program()
8176
else:
8277
trainer_prog = fluid.default_main_program()
@@ -117,27 +112,27 @@ def runtime_main(test_class):
117112
import paddle.fluid as fluid
118113
import paddle.fluid.core as core
119114

120-
if len(sys.argv) != 8:
121-
print(
122-
"Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist] [sync_mode]"
123-
)
124-
role = sys.argv[1]
125-
endpoints = sys.argv[2]
126-
trainer_id = int(sys.argv[3])
127-
current_endpoint = sys.argv[4]
128-
trainers = int(sys.argv[5])
129-
is_dist = True if sys.argv[6] == "TRUE" else False
130-
sync_mode = True if sys.argv[7] == "TRUE" else False
115+
parser = argparse.ArgumentParser(description='Run dist test.')
116+
parser.add_argument(
117+
'--role', type=str, required=True, choices=['pserver', 'trainer'])
118+
parser.add_argument('--endpoints', type=str, required=False, default="")
119+
parser.add_argument('--is_dist', action='store_true')
120+
parser.add_argument('--trainer_id', type=int, required=False, default=0)
121+
parser.add_argument('--trainers', type=int, required=False, default=1)
122+
parser.add_argument(
123+
'--current_endpoint', type=str, required=False, default="")
124+
parser.add_argument('--sync_mode', action='store_true')
125+
parser.add_argument('--mem_opt', action='store_true')
126+
127+
args = parser.parse_args()
131128

132129
model = test_class()
133-
if role == "pserver":
134-
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id,
135-
sync_mode)
130+
if args.role == "pserver" and args.is_dist:
131+
model.run_pserver(args)
136132
else:
137133
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
138134
) else fluid.CPUPlace()
139-
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist,
140-
sync_mode)
135+
model.run_trainer(p, args)
141136

142137

143138
import paddle.compat as cpt
@@ -153,30 +148,34 @@ def setUp(self):
153148
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
154149
self._python_interp = "python"
155150
self._sync_mode = True
151+
self._mem_opt = False
156152
self._setup_config()
157153

158154
def start_pserver(self, model_file, check_error_log):
159-
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
155+
160156
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
161-
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
157+
ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist %s %s"
158+
sync_mode_str = "--sync_mode" if self._sync_mode else ""
159+
mem_opt_str = "--mem_opt" if self._mem_opt else ""
160+
ps0_cmd = ps_cmd % \
162161
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
163-
self._trainers, sync_mode_str)
164-
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE %s" % \
162+
self._trainers, sync_mode_str, mem_opt_str)
163+
ps1_cmd = ps_cmd % \
165164
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
166-
self._trainers, sync_mode_str)
165+
self._trainers, sync_mode_str, mem_opt_str)
167166

168167
ps0_pipe = subprocess.PIPE
169168
ps1_pipe = subprocess.PIPE
170169
if check_error_log:
171-
print("ps0_cmd:", ps0_cmd)
172-
print("ps1_cmd:", ps1_cmd)
170+
print(ps0_cmd)
171+
print(ps1_cmd)
173172
ps0_pipe = open("/tmp/ps0_err.log", "wb")
174173
ps1_pipe = open("/tmp/ps1_err.log", "wb")
175174

176175
ps0_proc = subprocess.Popen(
177-
ps0_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe)
176+
ps0_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps0_pipe)
178177
ps1_proc = subprocess.Popen(
179-
ps1_cmd.split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe)
178+
ps1_cmd.strip().split(" "), stdout=subprocess.PIPE, stderr=ps1_pipe)
180179

181180
if not check_error_log:
182181
return ps0_proc, ps1_proc, None, None
@@ -199,7 +198,7 @@ def _wait_ps_ready(self, pid):
199198
retry_times -= 1
200199

201200
def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
202-
# *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN
201+
# TODO(typhoonzero): should auto adapt GPU count on the machine.
203202
required_envs = {
204203
"PATH": os.getenv("PATH"),
205204
"PYTHONPATH": os.getenv("PYTHONPATH"),
@@ -215,18 +214,14 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
215214
# Run local to get a base line
216215
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
217216
env_local.update(required_envs)
218-
sync_mode_str = "TRUE" if self._sync_mode else "FALSE"
219-
local_cmd = "%s %s trainer %s 0 %s %d FLASE %s" % \
220-
(self._python_interp, model_file,
221-
"127.0.0.1:1234", "127.0.0.1:1234", 1, sync_mode_str)
217+
local_cmd = "%s %s --role trainer" % (self._python_interp, model_file)
222218
if not check_error_log:
223219
local_proc = subprocess.Popen(
224220
local_cmd.split(" "),
225221
stdout=subprocess.PIPE,
226222
stderr=subprocess.PIPE,
227223
env=env_local)
228224
else:
229-
print("trainer cmd:", local_cmd)
230225
err_log = open("/tmp/trainer.err.log", "wb")
231226
local_proc = subprocess.Popen(
232227
local_cmd.split(" "),
@@ -247,12 +242,17 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
247242
self._wait_ps_ready(ps1.pid)
248243

249244
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
250-
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE %s" % \
251-
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
252-
self._trainers, sync_mode_str)
253-
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE %s" % \
254-
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
255-
self._trainers, sync_mode_str)
245+
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist %s %s"
246+
sync_mode_str = "--sync_mode" if self._sync_mode else ""
247+
mem_opt_str = "--mem_opt" if self._mem_opt else ""
248+
tr0_cmd = tr_cmd % \
249+
(self._python_interp, model_file, self._ps_endpoints,
250+
0, ps0_ep,
251+
self._trainers, sync_mode_str, mem_opt_str)
252+
tr1_cmd = tr_cmd % \
253+
(self._python_interp, model_file, self._ps_endpoints,
254+
1, ps1_ep,
255+
self._trainers, sync_mode_str, mem_opt_str)
256256

257257
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
258258
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
@@ -269,12 +269,12 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
269269
tr1_pipe = open("/tmp/tr1_err.log", "wb")
270270

271271
tr0_proc = subprocess.Popen(
272-
tr0_cmd.split(" "),
272+
tr0_cmd.strip().split(" "),
273273
stdout=subprocess.PIPE,
274274
stderr=tr0_pipe,
275275
env=env0)
276276
tr1_proc = subprocess.Popen(
277-
tr1_cmd.split(" "),
277+
tr1_cmd.strip().split(" "),
278278
stdout=subprocess.PIPE,
279279
stderr=tr1_pipe,
280280
env=env1)
@@ -303,6 +303,8 @@ def check_with_place(self, model_file, delta=1e-3, check_error_log=False):
303303
# FIXME: use terminate() instead of sigkill.
304304
os.kill(ps0.pid, signal.SIGKILL)
305305
os.kill(ps1.pid, signal.SIGKILL)
306+
ps0.wait()
307+
ps1.wait()
306308
FNULL.close()
307309

308310
self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta)

python/paddle/fluid/tests/unittests/test_dist_mnist.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ def test_se_resnext(self):
2525
self.check_with_place("dist_mnist.py", delta=1e-7)
2626

2727

28+
class TestDistMnist2x2WithMemopt(TestDistBase):
29+
def _setup_config(self):
30+
self._sync_mode = True
31+
self._mem_opt = True
32+
33+
def test_se_resnext(self):
34+
self.check_with_place("dist_mnist.py", delta=1e-7)
35+
36+
2837
class TestDistMnistAsync(TestDistBase):
2938
def _setup_config(self):
3039
self._sync_mode = False

0 commit comments

Comments
 (0)