@@ -113,23 +113,21 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
113
113
op->SetAttrMap (attrs);
114
114
}
115
115
116
- void StartServerNet (bool is_sparse) {
116
+ void StartServerNet (bool is_sparse, std::atomic< bool > *initialized ) {
117
117
f::Scope scope;
118
118
p::CPUPlace place;
119
119
if (is_sparse) {
120
120
InitSelectedRowsInScope (place, &scope);
121
121
} else {
122
122
InitTensorsInScope (place, &scope);
123
123
}
124
-
125
124
// sub program run in listen_and_serv_op, for simple test we use sum
126
125
f::ProgramDesc program;
127
126
const auto &root_block = program.Block (0 );
128
127
auto *optimize_block = program.AppendBlock (root_block);
129
128
auto *prefetch_block = program.AppendBlock (root_block);
130
129
// X for server side tensors, RX for received tensors, must be of same shape.
131
130
AddOp (" sum" , {{" X" , {" x0" , " x1" }}}, {{" Out" , {" Out" }}}, {}, optimize_block);
132
-
133
131
f::AttributeMap attrs;
134
132
attrs.insert ({" endpoint" , std::string (" 127.0.0.1:0" )});
135
133
attrs.insert ({" Fanin" , 1 });
@@ -141,12 +139,16 @@ void StartServerNet(bool is_sparse) {
141
139
attrs.insert ({" sync_mode" , true });
142
140
listen_and_serv_op =
143
141
f::OpRegistry::CreateOp (" listen_and_serv" , {{" X" , {" x1" }}}, {}, attrs);
142
+ *initialized = true ;
144
143
listen_and_serv_op->Run (scope, place);
145
144
LOG (INFO) << " server exit" ;
146
145
}
147
146
148
147
TEST (SendRecvOp, CPUDense) {
149
- std::thread server_thread (StartServerNet, false );
148
+ std::atomic<bool > initialized{false };
149
+ std::thread server_thread (StartServerNet, false , &initialized);
150
+ while (!initialized) {
151
+ }
150
152
sleep (5 ); // wait server to start
151
153
// local net
152
154
f::Scope scope;
@@ -156,9 +158,11 @@ TEST(SendRecvOp, CPUDense) {
156
158
scope.Var (" RPC_CLIENT_VAR" );
157
159
158
160
f::AttributeMap attrs;
159
- selected_port = static_cast <paddle::operators::ListenAndServOp *>(
160
- listen_and_serv_op.get ())
161
- ->GetSelectedPort ();
161
+ auto *listen_and_serv_op_ptr =
162
+ static_cast <paddle::operators::ListenAndServOp *>(
163
+ listen_and_serv_op.get ());
164
+ ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
165
+ selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
162
166
std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
163
167
attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
164
168
attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
@@ -184,18 +188,24 @@ TEST(SendRecvOp, CPUDense) {
184
188
}
185
189
186
190
TEST (SendRecvOp, CPUSparse) {
187
- std::thread server_thread (StartServerNet, true );
188
- sleep (3 ); // wait server to start
191
+ std::atomic<bool > initialized;
192
+ initialized = false ;
193
+ std::thread server_thread (StartServerNet, true , &initialized);
194
+ while (!initialized) {
195
+ }
196
+ sleep (5 ); // wait server to start
189
197
// local net
190
198
f::Scope scope;
191
199
p::CPUPlace place;
192
200
p::CPUDeviceContext ctx (place);
193
201
InitSelectedRowsInScope (place, &scope);
194
202
scope.Var (" RPC_CLIENT_VAR" );
195
203
f::AttributeMap attrs;
196
- selected_port = static_cast <paddle::operators::ListenAndServOp *>(
197
- listen_and_serv_op.get ())
198
- ->GetSelectedPort ();
204
+ auto *listen_and_serv_op_ptr =
205
+ static_cast <paddle::operators::ListenAndServOp *>(
206
+ listen_and_serv_op.get ());
207
+ ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
208
+ selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
199
209
std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
200
210
attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
201
211
attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
0 commit comments