Skip to content

Commit 866fcb0

Browse files
authored
Merge pull request #12171 from typhoonzero/fix_pserver_with_condition_block
fix pserver with condition block
2 parents d24fd2c + 32d8190 commit 866fcb0

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ static void ParallelExecuteBlocks(
6161
framework::Async([&executor, &prepared, &program, &scope, idx]() {
6262
int run_block = idx; // thread local
6363
try {
64+
VLOG(3) << "running server block: " << run_block
65+
<< "pointer: " << prepared[run_block].get();
6466
executor->RunPreparedContext(prepared[run_block].get(), scope);
6567
} catch (const std::exception &e) {
6668
LOG(ERROR) << "run sub program error " << e.what();
@@ -107,12 +109,14 @@ void ListenAndServOp::RunSyncLoop(
107109
PADDLE_ENFORCE_GE(num_blocks, 2,
108110
"server program should have at least 2 blocks");
109111

110-
std::vector<int> optimize_blocks_idx;
111-
for (auto blk : optimize_blocks) {
112-
optimize_blocks_idx.push_back(blk->ID());
112+
// Prepare all the server block
113+
std::vector<int> optimize_blocks_list;
114+
for (size_t i = 1; i < program->Size(); ++i) {
115+
optimize_blocks_list.push_back(i);
113116
}
114-
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx);
115-
// Insert placeholder for block0 which holds current op itself.
117+
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_list);
118+
// Insert placeholder for block0 which holds current op itself,
119+
// NOTE the first block in `optimize_prepared` should never be ran.
116120
optimize_prepared.insert(
117121
optimize_prepared.begin(),
118122
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));

python/paddle/fluid/tests/unittests/test_dist_transpiler.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,10 +304,50 @@ def test_transpiler(self):
304304
# TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer
305305

306306

307-
# FIXME(typhoonzero): need to add test for async case:
308-
# see https://github.com/PaddlePaddle/Paddle/issues/11691
309-
class TestAsyncSGD(TranspilerTest):
310-
pass
307+
class TestL2DecayWithPiecewise(TranspilerTest):
308+
def net_conf(self):
309+
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
310+
y_predict = fluid.layers.fc(input=x,
311+
size=1000,
312+
act=None,
313+
param_attr=fluid.ParamAttr(name='fc_w'),
314+
bias_attr=fluid.ParamAttr(name='fc_b'))
315+
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
316+
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
317+
avg_cost = fluid.layers.mean(cost)
318+
base_lr = 1.0
319+
bd = [1, 10, 20, 30]
320+
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
321+
sgd_optimizer = fluid.optimizer.Momentum(
322+
learning_rate=fluid.layers.piecewise_decay(
323+
boundaries=bd, values=lr),
324+
momentum=0.9,
325+
regularization=fluid.regularizer.L2Decay(1e-4))
326+
sgd_optimizer.minimize(avg_cost)
327+
return
328+
329+
def test_transpiler(self):
330+
pserver, startup = self.get_pserver(self.pserver1_ep)
331+
trainer = self.get_trainer()
332+
333+
self.assertEqual(len(pserver.blocks), 9)
334+
self.assertEqual([op.type for op in pserver.blocks[1].ops], [
335+
"increment", "cast", "fill_constant", "fill_constant", "less_than",
336+
"logical_not", "conditional_block", "fill_constant",
337+
"fill_constant", "less_than", "logical_not", "logical_and",
338+
"logical_and", "conditional_block", "fill_constant",
339+
"fill_constant", "less_than", "logical_not", "logical_and",
340+
"logical_and", "conditional_block", "fill_constant",
341+
"fill_constant", "less_than", "logical_not", "logical_and",
342+
"logical_and", "conditional_block", "fill_constant",
343+
"conditional_block"
344+
])
345+
self.assertEqual(
346+
[op.type for op in pserver.blocks[7].ops],
347+
["sum", "scale", "scale", "elementwise_add", "momentum"])
348+
self.assertEqual(
349+
[op.type for op in pserver.blocks[8].ops],
350+
["sum", "scale", "scale", "elementwise_add", "momentum"])
311351

312352

313353
if __name__ == "__main__":

python/paddle/fluid/transpiler/distribute_transpiler.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,13 +461,16 @@ def __clone_lr_op_sub_block__(op, program, lr_block):
461461
per_opt_block = pserver_program.create_block(pre_block_idx)
462462
optimize_blocks.append(per_opt_block)
463463
# append grad merging ops before clip and weight decay
464+
# cases may like:
465+
# L2Decay op -> clip op -> optimize
464466
for _, op in enumerate(self.optimize_ops):
465467
# find the origin @GRAD var before clipping
466468
grad_varname_for_block = __op_have_grad_input__(op)
467469
if ufind.is_connected(op, opt_op) and grad_varname_for_block:
468470
merged_var = self._append_pserver_grad_merge_ops(
469471
per_opt_block, grad_varname_for_block, endpoint,
470472
grad_to_block_id, self.origin_program)
473+
break # append optimize op once then append other ops.
471474
for _, op in enumerate(self.optimize_ops):
472475
# optimizer is connected to itself
473476
if ufind.is_connected(op, opt_op) and op not in global_ops:

0 commit comments

Comments
 (0)