Skip to content

Commit ec2aed2

Browse files
authored
Merge pull request #13102 from typhoonzero/merge_dist_deps_fixes
Cherrypick dist fixes
2 parents 2aad01f + 4392695 commit ec2aed2

17 files changed

+2171
-391
lines changed

paddle/fluid/API.spec

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path
5555
paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,))
5656
paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
5757
paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
58-
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,))
58+
paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
59+
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
5960
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
60-
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True))
61+
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
6162
paddle.fluid.InferenceTranspiler.__init__
6263
paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
6364
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
@@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array
329330
paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None)
330331
paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,))
331332
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
332-
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,))
333+
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
334+
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
333335
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
334-
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True))
336+
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
335337
paddle.fluid.transpiler.InferenceTranspiler.__init__
336338
paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
337339
paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))

paddle/fluid/framework/details/multi_devices_graph_pass.cc

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
736736
.emplace(varname, op_dev_id);
737737
}
738738
} else {
739-
PADDLE_ENFORCE(
739+
PADDLE_THROW(
740740
"the distribute training related op should be in [split_byref, "
741741
"concat].");
742742
}
@@ -746,17 +746,26 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result,
746746
node->Op()->Type());
747747

748748
CreateComputationalOp(result, node, op_dev_id);
749-
if (node->Op()->Type() == "concat") {
750-
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(),
751-
"fetch_barrier");
749+
}
750+
751+
void SetOpInputsAllPlaces(ir::Graph *result, ir::Node *node, int num_places) {
752+
auto *op_handle = result->Get<GraphOps>(kGraphOps).back().get();
753+
for (ir::Node *input : node->inputs) {
754+
VarHandle *var = nullptr;
755+
for (int place_offset = 0; place_offset < num_places; ++place_offset) {
756+
auto &var_holders = result->Get<GraphVars>(kGraphVars)[place_offset];
757+
auto &var_holder = var_holders[input->Name()];
758+
if (!var_holder.empty()) {
759+
var = var_holder.rbegin()->get();
760+
op_handle->AddInput(var);
761+
}
762+
}
752763
}
753764
}
754765

755766
// Create RPC related op handles that connects its in ops and out ops.
756767
void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
757768
ir::Node *node) const {
758-
// FIXME(typhoonzero): Cleanup this deps for both sync mode and async mode
759-
// put them into transpiler.
760769
int op_dev_id = -1;
761770
if (node->Op()->Type() == "send") {
762771
// TODO(paddle-dev): getting the first var is not safe.
@@ -791,8 +800,6 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
791800
}
792801
auto recv_param_grad = boost::get<std::vector<std::string>>(
793802
node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName()));
794-
// FIXME(typhoonzero): assume each recv op output one param
795-
// Use the same place as send.
796803
if (recv_param_grad.size() == 2U) {
797804
op_dev_id = GetVarDeviceID(*result, recv_param_grad[1]);
798805
VLOG(10) << "recv param " << recv_param_grad[0]
@@ -806,34 +813,44 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result,
806813
.emplace(varname, op_dev_id);
807814
}
808815
} else {
809-
// send_barrier and fetch_barrier op can be scheduled on device 0
816+
// send_barrier, fetch_barrier will run on place 0;
810817
op_dev_id = 0;
811818
}
812819

813820
PADDLE_ENFORCE(op_dev_id != -1, "can not find the right place for rpc op: %s",
814821
node->Op()->Type());
815-
816822
result->Get<GraphOps>(kGraphOps).emplace_back(new RPCOpHandle(
817823
result->CreateOpNode(node->Op()), *node->Op(), local_scopes_[op_dev_id],
818824
node->Op()->Type(), places_[op_dev_id]));
819825

820-
// TODO(panyx0718): This might not be needed anymore.
821-
if (node->Op()->Type() == "send_barrier") {
822-
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(), "send");
823-
} else if (node->Op()->Type() == "recv") {
824-
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(),
825-
"send_barrier");
826-
} else if (node->Op()->Type() == "fetch_barrier") {
827-
ConnectOp(result, result->Get<GraphOps>(kGraphOps).back().get(), "recv");
828-
} else if (node->Op()->Type() == "send") {
829-
// do nothing
826+
if (node->Op()->Type() == "send") {
827+
CreateOpHandleIOs(result, node, op_dev_id);
830828
} else {
831-
PADDLE_THROW(
832-
"rpc op should be in ["
833-
"send, send_barrier. recv, fetch_barrier]");
834-
}
829+
// send_barrier, recv, fetch_barrier's inputs are deps var, get them from
830+
// all places
831+
auto p = places_[op_dev_id];
832+
auto *op_handle = result->Get<GraphOps>(kGraphOps).back().get();
833+
op_handle->SetDeviceContext(p,
834+
platform::DeviceContextPool::Instance().Get(p));
835835

836-
CreateOpHandleIOs(result, node, op_dev_id);
836+
SetOpInputsAllPlaces(result, node, places_.size());
837+
for (ir::Node *output : node->outputs) {
838+
int outvar_dev_id = op_dev_id;
839+
if (node->Op()->Type() == "fetch_barrier") {
840+
outvar_dev_id = GetVarDeviceID(*result, output->Name());
841+
PADDLE_ENFORCE_NE(outvar_dev_id, -1);
842+
}
843+
p = places_[outvar_dev_id];
844+
ir::Node *new_node = nullptr;
845+
if (output->Var()) {
846+
new_node = result->CreateVarNode(output->Var());
847+
} else {
848+
new_node =
849+
result->CreateEmptyNode(output->Name(), ir::Node::Type::kVariable);
850+
}
851+
CreateOpOutput(result, op_handle, new_node, p, outvar_dev_id);
852+
}
853+
}
837854
}
838855

839856
bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const {

paddle/fluid/framework/ir/graph.cc

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -132,63 +132,6 @@ Graph::Graph(const ProgramDesc &program) : program_(program) {
132132
}
133133
}
134134

135-
std::vector<ir::Node *> send_ops;
136-
ir::Node *send_bar = nullptr;
137-
std::vector<ir::Node *> recv_ops;
138-
ir::Node *fetch_bar = nullptr;
139-
for (ir::Node *node : Nodes()) {
140-
if (node->Name() == "send") {
141-
send_ops.push_back(node);
142-
} else if (node->Name() == "send_barrier") {
143-
PADDLE_ENFORCE(!send_bar, "only has one send barrier");
144-
send_bar = node;
145-
} else if (node->Name() == "recv") {
146-
recv_ops.push_back(node);
147-
} else if (node->Name() == "fetch_barrier") {
148-
PADDLE_ENFORCE(!fetch_bar, "only has one fetch barrier");
149-
fetch_bar = node;
150-
}
151-
}
152-
if (send_bar) {
153-
for (ir::Node *send : send_ops) {
154-
ir::Node *dep_var = CreateControlDepVar();
155-
send->outputs.push_back(dep_var);
156-
dep_var->inputs.push_back(send);
157-
send_bar->inputs.push_back(dep_var);
158-
dep_var->outputs.push_back(send_bar);
159-
}
160-
for (ir::Node *recv : recv_ops) {
161-
ir::Node *dep_var = CreateControlDepVar();
162-
recv->inputs.push_back(dep_var);
163-
dep_var->outputs.push_back(recv);
164-
send_bar->outputs.push_back(dep_var);
165-
dep_var->inputs.push_back(send_bar);
166-
}
167-
}
168-
if (fetch_bar) {
169-
for (ir::Node *recv : recv_ops) {
170-
ir::Node *dep_var = CreateControlDepVar();
171-
recv->outputs.push_back(dep_var);
172-
dep_var->inputs.push_back(recv);
173-
fetch_bar->inputs.push_back(dep_var);
174-
dep_var->outputs.push_back(fetch_bar);
175-
}
176-
}
177-
178-
std::vector<std::string> send_vars = FindDistTrainSendVars(send_ops);
179-
std::vector<std::string> recv_vars = FindDistTrainRecvVars(recv_ops);
180-
for (ir::Node *node : Nodes()) {
181-
if (IsDistTrainOp(node, send_vars, recv_vars)) {
182-
if (fetch_bar && node->Name() == "concat") {
183-
ir::Node *dep_var = CreateControlDepVar();
184-
fetch_bar->outputs.push_back(dep_var);
185-
dep_var->inputs.push_back(fetch_bar);
186-
node->inputs.push_back(dep_var);
187-
dep_var->outputs.push_back(node);
188-
}
189-
}
190-
}
191-
192135
/**
193136
* We should handle write after read(WAR) and write after write(WAW) here.
194137
* Because some of the operators of the program can be executed parallelly.

paddle/fluid/operators/fetch_barrier_op.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class FetchBarrierOp : public framework::OperatorBase {
5252
class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker {
5353
public:
5454
void Make() {
55+
AddOutput("Out", "(Any) Dummy outputs, used for control dependency")
56+
.AsDuplicable();
5557
AddComment(R"DOC(
5658
SendBarrier operator
5759

paddle/fluid/operators/send_barrier_op.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ class SendBarrierOp : public framework::OperatorBase {
5656
class SendBarrierOpMaker : public framework::OpProtoAndCheckerMaker {
5757
public:
5858
void Make() {
59+
AddInput("X", "(Any) Dummy inputs, used for control dependency")
60+
.AsDuplicable();
61+
AddOutput("Out", "(Any) Dummy outputs, used for control dependency")
62+
.AsDuplicable();
5963
AddComment(R"DOC(
6064
SendBarrier operator
6165

python/paddle/fluid/layers/io.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,11 @@ def Send(endpoints, send_vars, dummy_output=None, sync=True):
246246
rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC
247247
})
248248
if sync:
249-
helper.append_op(type="send_barrier", attrs={"endpoints": endpoints})
249+
helper.append_op(
250+
type="send_barrier",
251+
inputs={"X": dummy_output},
252+
outputs={"Out": []},
253+
attrs={"endpoints": endpoints})
250254

251255

252256
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
@@ -282,7 +286,10 @@ def Recv(endpoints, get_vars, dummy_input=None, sync=True):
282286
attrs={"endpoints": endpoints,
283287
"epmap": epmap})
284288
if sync:
285-
helper.append_op(type="fetch_barrier", attrs={"endpoints": endpoints})
289+
helper.append_op(
290+
type="fetch_barrier",
291+
outputs={"Out": get_vars},
292+
attrs={"endpoints": endpoints})
286293
return get_vars
287294

288295

python/paddle/fluid/tests/unittests/dist_se_resnext.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ def net(self, input, class_dim=1000):
130130
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
131131
drop = fluid.layers.dropout(x=pool, dropout_prob=0.2)
132132
stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0)
133-
out = fluid.layers.fc(input=drop, size=class_dim, act='softmax')
133+
out = fluid.layers.fc(
134+
input=drop,
135+
size=class_dim,
136+
act='softmax',
137+
param_attr=fluid.ParamAttr(
138+
initializer=fluid.initializer.Constant(value=0.05)))
134139
return out
135140

136141
def shortcut(self, input, ch_out, stride):
@@ -180,21 +185,27 @@ def conv_bn_layer(self,
180185
act=None,
181186
# avoid pserver CPU init differs from GPU
182187
param_attr=fluid.ParamAttr(
183-
initializer=fluid.initializer.Constant()),
188+
initializer=fluid.initializer.Constant(value=0.05)),
184189
bias_attr=False)
185190
return fluid.layers.batch_norm(input=conv, act=act)
186191

187192
def squeeze_excitation(self, input, num_channels, reduction_ratio):
188193
pool = fluid.layers.pool2d(
189194
input=input, pool_size=0, pool_type='avg', global_pooling=True)
190195
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
191-
squeeze = fluid.layers.fc(input=pool,
192-
size=num_channels // reduction_ratio,
193-
act='relu')
196+
squeeze = fluid.layers.fc(
197+
input=pool,
198+
size=num_channels // reduction_ratio,
199+
param_attr=fluid.ParamAttr(
200+
initializer=fluid.initializer.Constant(value=0.05)),
201+
act='relu')
194202
stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0)
195-
excitation = fluid.layers.fc(input=squeeze,
196-
size=num_channels,
197-
act='sigmoid')
203+
excitation = fluid.layers.fc(
204+
input=squeeze,
205+
size=num_channels,
206+
param_attr=fluid.ParamAttr(
207+
initializer=fluid.initializer.Constant(value=0.05)),
208+
act='sigmoid')
198209
scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0)
199210
return scale
200211

0 commit comments

Comments
 (0)