Skip to content

Commit ebf0027

Browse files
committed
use IOThreadPool to dispatch async update task
1 parent ea372b3 commit ebf0027

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,16 @@ static void AsyncUpdateThread(
201201
LOG(ERROR) << "Can not find server side var: " << recv_var_name;
202202
PADDLE_THROW("Can not find server side var");
203203
}
204-
try {
205-
executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(),
206-
false, false);
207-
} catch (std::exception &e) {
208-
LOG(ERROR) << "run sub program error " << e.what();
209-
}
204+
auto fs = framework::Async([var_name, &executor, &v, prepared] {
205+
try {
206+
executor->RunPreparedContext(prepared, v.second->GetMutableLocalScope(),
207+
false, false);
208+
} catch (std::exception &e) {
209+
LOG(ERROR) << "run sub program error " << e.what();
210+
}
211+
});
212+
fs.wait();
210213
}
211-
VLOG(3) << "update thread for " << var_name << " ended";
212214
}
213215

214216
void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
@@ -256,8 +258,8 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
256258
for (auto iter = grad_to_queue.begin(); iter != grad_to_queue.end(); iter++) {
257259
std::string grad_name = iter->first;
258260
VLOG(3) << "create async update thread for " << grad_name;
259-
fs.push_back(framework::Async([grad_name, &exit_flag, &executor,
260-
&grad_to_queue, &grad_to_prepared_ctx]() {
261+
fs.push_back(framework::AsyncIO([grad_name, &exit_flag, &executor,
262+
&grad_to_queue, &grad_to_prepared_ctx]() {
261263
AsyncUpdateThread(grad_name, exit_flag, grad_to_queue[grad_name],
262264
executor, grad_to_prepared_ctx[grad_name].get());
263265
}));

0 commit comments

Comments
 (0)