Skip to content

Commit 3206b17

Browse files
authored
Merge pull request #13224 from jacquesqiao/cherry-pick-async-handle-complete-message
fix async mode handle COMPLETE_MESSAGE (#13212)
2 parents 196c9f9 + 78d98c4 commit 3206b17

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

paddle/fluid/operators/distributed/request_handler_impl.cc

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,6 @@ bool RequestSendHandler::Handle(const std::string& varname,
3939
const std::string& out_var_name) {
4040
VLOG(4) << "RequestSendHandler:" << varname;
4141

42-
// Async
43-
if (!sync_mode_) {
44-
rpc_server_->Profiler().OneStep();
45-
try {
46-
executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(),
47-
scope);
48-
} catch (std::exception& e) {
49-
LOG(ERROR) << "async: run sub program error " << e.what();
50-
return false;
51-
}
52-
return true;
53-
}
54-
5542
// Sync
5643
if (varname == BATCH_BARRIER_MESSAGE) {
5744
VLOG(3) << "sync: recv BATCH_BARRIER_MESSAGE";
@@ -60,17 +47,31 @@ bool RequestSendHandler::Handle(const std::string& varname,
6047
VLOG(3) << "sync: recv complete message";
6148
rpc_server_->Complete();
6249
} else {
63-
VLOG(3) << "sync: received var_name: " << varname;
64-
rpc_server_->WaitCond(kRequestSend);
65-
VLOG(3) << "sync: processing received var: " << varname;
66-
67-
if (invar == nullptr) {
68-
LOG(FATAL) << "sync: Can not find server side var: " << varname;
69-
return false;
70-
}
71-
if (invar->IsType<framework::SelectedRows>()) {
72-
std::unique_lock<std::mutex> lock(mutex_sparse_vars_);
73-
sparse_vars_.push_back(invar);
50+
// Async
51+
if (!sync_mode_) {
52+
VLOG(3) << "async process var: " << varname;
53+
rpc_server_->Profiler().OneStep();
54+
try {
55+
executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(),
56+
scope);
57+
} catch (std::exception& e) {
58+
LOG(ERROR) << "async: run sub program error " << e.what();
59+
return false;
60+
}
61+
return true;
62+
} else { // sync
63+
rpc_server_->WaitCond(kRequestSend);
64+
VLOG(3) << "sync: processing received var: " << varname;
65+
66+
if (invar == nullptr) {
67+
LOG(FATAL) << "sync: Can not find server side var: " << varname;
68+
return false;
69+
}
70+
71+
if (invar->IsType<framework::SelectedRows>()) {
72+
std::unique_lock<std::mutex> lock(mutex_sparse_vars_);
73+
sparse_vars_.push_back(invar);
74+
}
7475
}
7576
}
7677
return true;

0 commit comments

Comments
 (0)