@@ -156,6 +156,7 @@ TEST(SendRecvOp, CPUDense) {
156
156
std::thread server_thread (StartServerNet, false , &initialized);
157
157
while (!initialized) {
158
158
}
159
+
159
160
static_cast <paddle::operators::ListenAndServOp *>(listen_and_serv_op.get ())
160
161
->WaitServerReady ();
161
162
@@ -175,77 +176,77 @@ TEST(SendRecvOp, CPUDense) {
175
176
std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
176
177
attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
177
178
attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
178
- auto send_op = f::OpRegistry::CreateOp (
179
- " send" , {{" X" , {" x1" }}},
180
- {{" Out" , {" Out" }}, attrs);
181
- send_op->Run (scope, place);
182
-
183
- auto in_var = scope.Var (" x1" );
184
- auto tensor = in_var->GetMutable <f::LoDTensor>();
185
- float *expected = tensor->data <float >();
186
- auto out_var = scope.Var (" Out" );
187
- auto target = out_var->GetMutable <f::LoDTensor>();
188
- // x1 * 2 == x0
189
- EXPECT_NE (target->memory_size (), size_t (0 ));
190
- float *actual = target->data <float >();
191
- for (int64_t i = 0 ; i < target->numel (); ++i) {
192
- EXPECT_EQ (expected[i] * 2 , actual[i]);
193
- }
194
- listen_and_serv_op->Stop ();
195
- server_thread.join ();
196
- listen_and_serv_op.reset (nullptr );
197
- paddle::operators::ListenAndServOp::ResetPort ();
179
+ const f::VariableNameMap &inputs = {{" X" , {" x1" }}};
180
+ const f::VariableNameMap &outputs = {{" Out" , {" Out" }}};
181
+
182
+ auto send_op = f::OpRegistry::CreateOp (" send" , inputs, outputs, attrs);
183
+ send_op->Run (scope, place);
184
+
185
+ auto in_var = scope.Var (" x1" );
186
+ auto tensor = in_var->GetMutable <f::LoDTensor>();
187
+ float *expected = tensor->data <float >();
188
+ auto out_var = scope.Var (" Out" );
189
+ auto target = out_var->GetMutable <f::LoDTensor>();
190
+ // x1 * 2 == x0
191
+ EXPECT_NE (target->memory_size (), size_t (0 ));
192
+ float *actual = target->data <float >();
193
+ for (int64_t i = 0 ; i < target->numel (); ++i) {
194
+ EXPECT_EQ (expected[i] * 2 , actual[i]);
195
+ }
196
+ listen_and_serv_op->Stop ();
197
+ server_thread.join ();
198
+ listen_and_serv_op.reset (nullptr );
199
+ paddle::operators::ListenAndServOp::ResetPort ();
198
200
}
199
201
200
202
TEST (SendRecvOp, CPUSparse) {
201
- std::atomic<bool > initialized;
202
- initialized = false ;
203
- std::thread server_thread (StartServerNet, true , &initialized);
204
- while (!initialized) {
205
- }
206
- auto *listen_and_serv_op_ptr =
207
- static_cast <paddle::operators::ListenAndServOp *>(
208
- listen_and_serv_op.get ());
209
- ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
210
- listen_and_serv_op_ptr->WaitServerReady ();
211
-
212
- // local net
213
- f::Scope scope;
214
- p::CPUPlace place;
215
- p::CPUDeviceContext ctx (place);
216
- InitSelectedRowsInScope (place, &scope);
217
- scope.Var (" RPC_CLIENT_VAR" );
218
- f::AttributeMap attrs;
219
- selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
220
- std::string endpoint =
221
- paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
222
- attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
223
- attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
224
- auto send_op = f::OpRegistry::CreateOp (" send" , {{" X" , {" x1" }}},
225
- {{" Out" , {" Out" }}}, attrs);
226
- send_op->Run (scope, place);
227
-
228
- auto x0 = scope.Var (" x0" )->GetMutable <f::SelectedRows>();
229
- auto x1 = scope.Var (" x1" )->GetMutable <f::SelectedRows>();
230
- auto out = scope.Var (" Out" )->GetMutable <f::SelectedRows>();
231
- auto actual = out->mutable_value ();
232
-
233
- std::unique_ptr<f::SelectedRows> expect{new f::SelectedRows ()};
234
- auto expect_value = expect->mutable_value ();
235
- expect_value->mutable_data <float >(f::make_ddim ({5 , 10 }), place);
236
-
237
- m::SelectedRowsAdd<p::CPUDeviceContext, float > add_functor;
238
- add_functor (ctx, *x0, *x1, expect.get ());
239
-
240
- EXPECT_EQ (actual->numel (), expect_value->numel ());
241
- EXPECT_EQ (out->rows ().size (), x0->rows ().size () + x1->rows ().size ());
242
-
243
- for (int64_t i = 0 ; i < expect_value->numel (); ++i) {
244
- EXPECT_EQ (expect_value->mutable_data <float >(place)[i],
245
- actual->mutable_data <float >(place)[i]);
246
- }
247
- listen_and_serv_op->Stop ();
248
- server_thread.join ();
249
- listen_and_serv_op.reset ();
250
- paddle::operators::ListenAndServOp::ResetPort ();
203
+ std::atomic<bool > initialized;
204
+ initialized = false ;
205
+ std::thread server_thread (StartServerNet, true , &initialized);
206
+ while (!initialized) {
207
+ }
208
+ auto *listen_and_serv_op_ptr =
209
+ static_cast <paddle::operators::ListenAndServOp *>(
210
+ listen_and_serv_op.get ());
211
+ ASSERT_TRUE (listen_and_serv_op_ptr != nullptr );
212
+ listen_and_serv_op_ptr->WaitServerReady ();
213
+
214
+ // local net
215
+ f::Scope scope;
216
+ p::CPUPlace place;
217
+ p::CPUDeviceContext ctx (place);
218
+ InitSelectedRowsInScope (place, &scope);
219
+ scope.Var (" RPC_CLIENT_VAR" );
220
+ f::AttributeMap attrs;
221
+ selected_port = listen_and_serv_op_ptr->GetSelectedPort ();
222
+ std::string endpoint = paddle::string::Sprintf (" 127.0.0.1:%d" , selected_port);
223
+ attrs.insert ({" endpoints" , std::vector<std::string>({endpoint})});
224
+ attrs.insert ({" epmap" , std::vector<std::string>({endpoint})});
225
+ auto send_op = f::OpRegistry::CreateOp (" send" , {{" X" , {" x1" }}},
226
+ {{" Out" , {" Out" }}}, attrs);
227
+ send_op->Run (scope, place);
228
+
229
+ auto x0 = scope.Var (" x0" )->GetMutable <f::SelectedRows>();
230
+ auto x1 = scope.Var (" x1" )->GetMutable <f::SelectedRows>();
231
+ auto out = scope.Var (" Out" )->GetMutable <f::SelectedRows>();
232
+ auto actual = out->mutable_value ();
233
+
234
+ std::unique_ptr<f::SelectedRows> expect{new f::SelectedRows ()};
235
+ auto expect_value = expect->mutable_value ();
236
+ expect_value->mutable_data <float >(f::make_ddim ({5 , 10 }), place);
237
+
238
+ m::SelectedRowsAdd<p::CPUDeviceContext, float > add_functor;
239
+ add_functor (ctx, *x0, *x1, expect.get ());
240
+
241
+ EXPECT_EQ (actual->numel (), expect_value->numel ());
242
+ EXPECT_EQ (out->rows ().size (), x0->rows ().size () + x1->rows ().size ());
243
+
244
+ for (int64_t i = 0 ; i < expect_value->numel (); ++i) {
245
+ EXPECT_EQ (expect_value->mutable_data <float >(place)[i],
246
+ actual->mutable_data <float >(place)[i]);
247
+ }
248
+ listen_and_serv_op->Stop ();
249
+ server_thread.join ();
250
+ listen_and_serv_op.reset ();
251
+ paddle::operators::ListenAndServOp::ResetPort ();
251
252
}
0 commit comments