Skip to content

Commit e7ac709

Browse files
committed
done
1 parent a131c73 commit e7ac709

File tree

5 files changed

+44
-40
lines changed

5 files changed

+44
-40
lines changed

paddle/fluid/operators/detail/grpc_server.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ void AsyncGRPCServer::WaitClientGet(int count) {
208208
}
209209
}
210210

211-
bool AsyncGRPCServer::WaitServerReady() {
211+
void AsyncGRPCServer::WaitServerReady() {
212212
std::unique_lock<std::mutex> lock(this->mutex_ready_);
213-
condition_ready_.wait(lock, [&] { return this->ready_ == 1; });
213+
condition_ready_.wait(lock, [=] { return this->ready_ == 1; });
214214
}
215215

216216
void AsyncGRPCServer::RunSyncUpdate() {

paddle/fluid/operators/detail/grpc_server.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class AsyncGRPCServer final {
4747
explicit AsyncGRPCServer(const std::string &address, bool sync_mode)
4848
: address_(address), sync_mode_(sync_mode), ready_(0) {}
4949

50-
bool WaitServerReady();
50+
void WaitServerReady();
5151
void RunSyncUpdate();
5252

5353
// functions to sync server barrier status.
@@ -120,7 +120,7 @@ class AsyncGRPCServer final {
120120
framework::Executor *executor_;
121121
int selected_port_;
122122

123-
std::mutext mutex_ready_;
123+
std::mutex mutex_ready_;
124124
std::condition_variable condition_ready_;
125125
int ready_;
126126
};

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -80,28 +80,35 @@ static void ParallelExecuteBlocks(
8080
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
8181
}
8282

83-
static void SavePort(std::shared_ptr<detail::AsyncGRPCServer> rpc_service) {
84-
std::ofstream port_file;
85-
port_file.open("/tmp/paddle.selected_port");
86-
port_file << rpc_service->GetSelectedPort();
87-
port_file.close();
88-
}
83+
std::atomic_int ListenAndServOp::selected_port_{0};
8984

9085
ListenAndServOp::ListenAndServOp(const std::string &type,
9186
const framework::VariableNameMap &inputs,
9287
const framework::VariableNameMap &outputs,
9388
const framework::AttributeMap &attrs)
9489
: OperatorBase(type, inputs, outputs, attrs) {}
9590

96-
int ListenAndServOp::GetSelectedPort() const {
97-
return rpc_service_->GetSelectedPort();
98-
}
99-
10091
void ListenAndServOp::Stop() {
10192
rpc_service_->Push(LISTEN_TERMINATE_MESSAGE);
10293
server_thread_->join();
10394
}
10495

96+
void ListenAndServOp::SavePort(const std::string &file_path) const {
97+
// NOTE: default write file to /tmp/paddle.selected_port
98+
selected_port_ = rpc_service_->GetSelectedPort();
99+
100+
std::ofstream port_file;
101+
port_file.open(file_path);
102+
port_file << selected_port_.load();
103+
port_file.close();
104+
VLOG(4) << "selected port written to " << file_path;
105+
}
106+
107+
void ListenAndServOp::WaitServerReady() {
108+
while (selected_port_.load() == 0) {
109+
}
110+
}
111+
105112
void ListenAndServOp::RunSyncLoop(framework::Executor *executor,
106113
framework::ProgramDesc *program,
107114
framework::Scope *recv_scope,
@@ -265,23 +272,6 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
265272
} // while(true)
266273
}
267274

268-
void ListenAndServOp::StartServerThread() {
269-
server_thread_.reset(new std::thread(
270-
std::bind(&ListenAndServOp::ServerThreadEntry, this, rpc_service_)));
271-
}
272-
273-
void ListenAndServOp::ServerThreadEntry(
274-
std::shared_ptr<detail::AsyncGRPCServer> service) {
275-
service->RunSyncUpdate();
276-
VLOG(4) << "RunServer thread end";
277-
278-
{
279-
std::lock_guard<std::mutex> lock(this->barrier_mutex_);
280-
barrier_cond_step_ = cond;
281-
}
282-
barrier_condition_.notify_all();
283-
}
284-
285275
void ListenAndServOp::RunImpl(const framework::Scope &scope,
286276
const platform::Place &dev_place) const {
287277
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
@@ -315,9 +305,10 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
315305
// start the server listening after all member initialized.
316306
server_thread_.reset(new std::thread(RunServer, rpc_service_));
317307
VLOG(3) << "wait server thread to become ready...";
308+
rpc_service_->WaitServerReady();
318309

319310
// Write to a file of server selected port for python use.
320-
SavePort(rpc_service_);
311+
SavePort();
321312
if (sync_mode) {
322313
RunSyncLoop(&executor, program, &recv_scope, prefetch_block);
323314
} else {

paddle/fluid/operators/listen_and_serv_op.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ limitations under the License. */
1515
#pragma once
1616

1717
#include <stdint.h>
18+
#include <atomic>
1819
#include <ostream>
1920
#include <string>
2021

@@ -39,8 +40,6 @@ class ListenAndServOp : public framework::OperatorBase {
3940
const framework::VariableNameMap& outputs,
4041
const framework::AttributeMap& attrs);
4142

42-
int GetSelectedPort() const;
43-
4443
void RunSyncLoop(framework::Executor* executor,
4544
framework::ProgramDesc* program,
4645
framework::Scope* recv_scope,
@@ -51,20 +50,25 @@ class ListenAndServOp : public framework::OperatorBase {
5150
framework::Scope* recv_scope,
5251
framework::BlockDesc* prefetch_block) const;
5352

54-
void StartServerThread();
53+
void SavePort(
54+
const std::string& file_path = "/tmp/paddle.selected_port") const;
55+
56+
void WaitServerReady();
5557

56-
void ServerThreadEntry();
58+
int GetSelectedPort() { return selected_port_; }
5759

5860
void Stop() override;
5961

6062
void RunImpl(const framework::Scope& scope,
6163
const platform::Place& dev_place) const override;
6264

65+
static void ResetPort() { selected_port_ = 0; }
66+
6367
protected:
6468
mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
6569
mutable std::shared_ptr<std::thread> server_thread_;
66-
std::mutext server_ready_mutex_;
67-
std::condition_variable server_ready_;
70+
// FIXME(wuyi): it's static so that the operator can be cloned.
71+
static std::atomic_int selected_port_;
6872
};
6973

7074
} // namespace operators

paddle/fluid/operators/send_recv_op_test.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
116116
void StartServerNet(bool is_sparse) {
117117
f::Scope scope;
118118
p::CPUPlace place;
119+
VLOG(4) << "before init tensor";
119120
if (is_sparse) {
120121
InitSelectedRowsInScope(place, &scope);
121122
} else {
@@ -129,6 +130,7 @@ void StartServerNet(bool is_sparse) {
129130
auto *prefetch_block = program.AppendBlock(root_block);
130131
// X for server side tensors, RX for received tensors, must be of same shape.
131132
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
133+
VLOG(4) << "before attr";
132134

133135
f::AttributeMap attrs;
134136
attrs.insert({"endpoint", std::string("127.0.0.1:0")});
@@ -139,15 +141,19 @@ void StartServerNet(bool is_sparse) {
139141
attrs.insert({"PrefetchBlock", prefetch_block});
140142
attrs.insert({"grad_to_block_id", std::vector<std::string>({""})});
141143
attrs.insert({"sync_mode", true});
144+
VLOG(4) << "before init op";
142145
listen_and_serv_op =
143146
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
147+
VLOG(4) << "before run op";
144148
listen_and_serv_op->Run(scope, place);
145149
LOG(INFO) << "server exit";
146150
}
147151

148152
TEST(SendRecvOp, CPUDense) {
149153
std::thread server_thread(StartServerNet, false);
150-
sleep(5); // wait server to start
154+
// wait server to start
155+
static_cast<paddle::operators::ListenAndServOp *>(listen_and_serv_op.get())
156+
->WaitServerReady();
151157
// local net
152158
f::Scope scope;
153159
p::CPUPlace place;
@@ -181,11 +187,13 @@ TEST(SendRecvOp, CPUDense) {
181187
listen_and_serv_op->Stop();
182188
server_thread.join();
183189
listen_and_serv_op.reset(nullptr);
190+
paddle::operators::ListenAndServOp::ResetPort();
184191
}
185192

186193
TEST(SendRecvOp, CPUSparse) {
187194
std::thread server_thread(StartServerNet, true);
188-
sleep(3); // wait server to start
195+
static_cast<paddle::operators::ListenAndServOp *>(listen_and_serv_op.get())
196+
->WaitServerReady();
189197
// local net
190198
f::Scope scope;
191199
p::CPUPlace place;
@@ -226,4 +234,5 @@ TEST(SendRecvOp, CPUSparse) {
226234
listen_and_serv_op->Stop();
227235
server_thread.join();
228236
listen_and_serv_op.reset();
237+
paddle::operators::ListenAndServOp::ResetPort();
229238
}

0 commit comments

Comments
 (0)