Skip to content

Commit 73650a8

Browse files
author
Yancey
authored
Merge pull request #10342 from Yancey1989/refine_distribute_transpiler_api
Refine distribute transpiler api
2 parents f63ff90 + e9737d6 commit 73650a8

10 files changed

+40
-63
lines changed

benchmark/cluster/vgg16/vgg16_fluid.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,6 @@ def train_loop(exe, trainer_prog):
240240

241241
t = fluid.DistributeTranspiler()
242242
t.transpile(
243-
optimize_ops,
244-
params_grads,
245243
trainer_id=args.task_index,
246244
pservers=args.ps_hosts,
247245
trainers=trainers)

python/paddle/fluid/distribute_transpiler.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ def split_dense_variable(var_list,
137137

138138
class DistributeTranspiler:
139139
def transpile(self,
140-
optimize_ops,
141-
params_grads,
142140
trainer_id,
143141
program=None,
144142
pservers="127.0.0.1:6174",
@@ -169,11 +167,6 @@ def transpile(self,
169167
4. append ops that should run on current server instance.
170168
5. add listen_and_serv op
171169
172-
:param optimize_ops: op list of optimization, should be the
173-
return value of Optimizer.minimize
174-
:type optimize_ops: list
175-
:param params_grads: list of tuple(weight, gradient)
176-
:type params_grads: list
177170
:param trainer_id: one unique id for each trainer in a job.
178171
:type trainer_id: int
179172
:param program: program to transpile, default is default_main_program
@@ -194,14 +187,14 @@ def transpile(self,
194187
program = default_main_program()
195188
self.origin_program = program
196189
self.trainer_num = trainers
197-
self.optimize_ops = optimize_ops
198190
self.sync_mode = sync_mode
199191
# TODO(typhoonzero): currently trainer_id is fetched from cluster system
200192
# like Kubernetes, we should port this to use etcd later when developing
201193
# fluid distributed training with fault-tolerance.
202194
self.trainer_id = trainer_id
203195
pserver_endpoints = pservers.split(",")
204196
self.pserver_endpoints = pserver_endpoints
197+
self.optimize_ops, params_grads = self._get_optimize_pass()
205198

206199
# process lookup_table_op
207200
# 1. check all lookup_table_op is distributed
@@ -408,11 +401,8 @@ def get_pserver_program(self, endpoint):
408401
# HACK: optimization global ops only used to scale beta1 and beta2
409402
# replace it with dependency engine.
410403
for op in self.optimize_ops:
411-
if op.type == "scale":
412-
for in_name in op.input_arg_names:
413-
if in_name.startswith("beta1_pow_acc") or \
414-
in_name.startswith("beta2_pow_acc"):
415-
global_ops.append(op)
404+
if self._is_adam_connected_op(op):
405+
global_ops.append(op)
416406

417407
def __append_optimize_op__(op, block, grad_to_block_id):
418408
if self._is_opt_op(op):
@@ -1147,3 +1137,32 @@ def _get_lr_ops(self):
11471137
# we only need to append op for once
11481138
break
11491139
return lr_ops
1140+
1141+
def _get_optimize_pass(self):
1142+
block = self.origin_program.global_block()
1143+
opt_ops = []
1144+
params_grads = []
1145+
for op in block.ops:
1146+
if self._is_opt_op(op):
1147+
opt_ops.append(op)
1148+
params_grads.append((self.origin_program.global_block().var(
1149+
op.input("Param")[0]),
1150+
self.origin_program.global_block().var(
1151+
op.input("Grad")[0])))
1152+
elif self._is_adam_connected_op(op):
1153+
opt_ops.append(op)
1154+
else:
1155+
pass
1156+
return opt_ops, params_grads
1157+
1158+
def _is_adam_connected_op(self, op):
1159+
"""
1160+
A hack function to determinate whether the input operator
1161+
is connected to optimize operator.
1162+
"""
1163+
if op.type == "scale":
1164+
for in_name in op.input_arg_names:
1165+
if in_name.startswith("beta1_pow_acc") or \
1166+
in_name.startswith("beta2_pow_acc"):
1167+
return True
1168+
return False

python/paddle/fluid/tests/book/test_fit_a_line.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,7 @@ def train_loop(main_program):
8080
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
8181
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
8282
t = fluid.DistributeTranspiler()
83-
t.transpile(
84-
optimize_ops,
85-
params_grads,
86-
trainer_id,
87-
pservers=pserver_endpoints,
88-
trainers=trainers)
83+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
8984
if training_role == "PSERVER":
9085
pserver_prog = t.get_pserver_program(current_endpoint)
9186
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_image_classification.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,7 @@ def train_loop(main_program):
189189
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
190190
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
191191
t = fluid.DistributeTranspiler()
192-
t.transpile(
193-
optimize_ops,
194-
params_grads,
195-
trainer_id,
196-
pservers=pserver_endpoints,
197-
trainers=trainers)
192+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
198193
if training_role == "PSERVER":
199194
pserver_prog = t.get_pserver_program(current_endpoint)
200195
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_label_semantic_roles.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,7 @@ def train_loop(main_program):
259259
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
260260
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
261261
t = fluid.DistributeTranspiler()
262-
t.transpile(
263-
optimize_ops,
264-
params_grads,
265-
trainer_id,
266-
pservers=pserver_endpoints,
267-
trainers=trainers)
262+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
268263
if training_role == "PSERVER":
269264
pserver_prog = t.get_pserver_program(current_endpoint)
270265
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_machine_translation.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,7 @@ def train_loop(main_program):
231231
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
232232
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
233233
t = fluid.DistributeTranspiler()
234-
t.transpile(
235-
optimize_ops,
236-
params_grads,
237-
trainer_id,
238-
pservers=pserver_endpoints,
239-
trainers=trainers)
234+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
240235
if training_role == "PSERVER":
241236
pserver_prog = t.get_pserver_program(current_endpoint)
242237
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_recognize_digits.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,7 @@ def train_loop(main_program):
162162
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
163163
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
164164
t = fluid.DistributeTranspiler()
165-
t.transpile(
166-
optimize_ops,
167-
params_grads,
168-
trainer_id,
169-
pservers=pserver_endpoints,
170-
trainers=trainers)
165+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
171166
if training_role == "PSERVER":
172167
pserver_prog = t.get_pserver_program(current_endpoint)
173168
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_recommender_system.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,7 @@ def train_loop(main_program):
261261
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
262262
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
263263
t = fluid.DistributeTranspiler()
264-
t.transpile(
265-
optimize_ops,
266-
params_grads,
267-
trainer_id,
268-
pservers=pserver_endpoints,
269-
trainers=trainers)
264+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
270265
if training_role == "PSERVER":
271266
pserver_prog = t.get_pserver_program(current_endpoint)
272267
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_understand_sentiment.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,7 @@ def train_loop(main_program):
213213
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
214214
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
215215
t = fluid.DistributeTranspiler()
216-
t.transpile(
217-
optimize_ops,
218-
params_grads,
219-
trainer_id,
220-
pservers=pserver_endpoints,
221-
trainers=trainers)
216+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
222217
if training_role == "PSERVER":
223218
pserver_prog = t.get_pserver_program(current_endpoint)
224219
pserver_startup = t.get_startup_program(current_endpoint,

python/paddle/fluid/tests/book/test_word2vec.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,7 @@ def train_loop(main_program):
145145
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
146146
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
147147
t = fluid.DistributeTranspiler()
148-
t.transpile(
149-
optimize_ops,
150-
params_grads,
151-
trainer_id,
152-
pservers=pserver_endpoints,
153-
trainers=trainers)
148+
t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
154149
if training_role == "PSERVER":
155150
pserver_prog = t.get_pserver_program(current_endpoint)
156151
pserver_startup = t.get_startup_program(current_endpoint,

0 commit comments

Comments
 (0)