File tree Expand file tree Collapse file tree 3 files changed +10
-12
lines changed Expand file tree Collapse file tree 3 files changed +10
-12
lines changed Original file line number Diff line number Diff line change @@ -47,19 +47,15 @@ Executor::Executor(const platform::Place& place) : place_(place) {}
47
47
48
48
#ifdef PADDLE_WITH_DISTRIBUTE
49
49
void Executor::BeginPass () {
50
- auto client = ::paddle::operators::distributed::RPCClient::GetInstance<
51
- ::paddle::operators::distributed::GRPCClient>();
52
-
53
- client->SendBeginPass ();
54
- client->Wait ();
50
+ ::paddle::operators::distributed::RPCClient::GetInstance<
51
+ ::paddle::operators::distributed::GRPCClient>()
52
+ ->SendBeginPass ();
55
53
}
56
54
57
55
void Executor::EndPass () {
58
- auto client = ::paddle::operators::distributed::RPCClient::GetInstance<
59
- ::paddle::operators::distributed::GRPCClient>();
60
-
61
- client->SendEndPass ();
62
- client->Wait ();
56
+ ::paddle::operators::distributed::RPCClient::GetInstance<
57
+ ::paddle::operators::distributed::GRPCClient>()
58
+ ->SendEndPass ();
63
59
}
64
60
#endif
65
61
Original file line number Diff line number Diff line change @@ -40,13 +40,15 @@ void GRPCClient::SendBeginPass() {
40
40
VLOG (3 ) << " send begin pass to: " << it.first ;
41
41
this ->AsyncSendBeginPass (it.first );
42
42
}
43
+ this ->Wait ();
43
44
}
44
45
45
46
void GRPCClient::SendEndPass () {
46
47
for (auto & it : channels_) {
47
48
VLOG (3 ) << " send end pass to " << it.first ;
48
49
this ->AsyncSendEndPass (it.first );
49
50
}
51
+ this ->Wait ();
50
52
}
51
53
52
54
GRPCClient::~GRPCClient () {
Original file line number Diff line number Diff line change @@ -67,7 +67,7 @@ void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) {
67
67
void RPCServer::BeginPass () {
68
68
VLOG (4 ) << " RPCServer begin increase pass barrier" ;
69
69
{
70
- std::unique_lock<std::mutex> locl (mutex_);
70
+ std::unique_lock<std::mutex> lock (mutex_);
71
71
client_num_++;
72
72
VLOG (4 ) << " increase client_num to: " << client_num_;
73
73
}
@@ -77,7 +77,7 @@ void RPCServer::BeginPass() {
77
77
void RPCServer::EndPass () {
78
78
VLOG (4 ) << " RPCServer begin increase pass barrier" ;
79
79
{
80
- std::unique_lock<std::mutex> locl (mutex_);
80
+ std::unique_lock<std::mutex> lock (mutex_);
81
81
client_num_--;
82
82
VLOG (4 ) << " decrease client_num to: " << client_num_;
83
83
if (cur_cond_.load () == rpc_cond_map_[kRequestGet ]) {
You can’t perform that action at this time.
0 commit comments