Skip to content

Commit 0250e80

Browse files
authored
Merge pull request #8586 from Yancey1989/fix_dist_unittest
Fix send_recv unit test
2 parents 2c89d97 + 7a1d6ae commit 0250e80

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ class ListenAndServOp : public framework::OperatorBase {
129129
}
130130
if (exit_flag) {
131131
rpc_service_->ShutDown();
132+
rpc_service_->SetCond(1);
133+
break;
132134
}
133135
try {
134136
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/

paddle/fluid/operators/send_recv_op_test.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
9595
for (auto kv : outputs) {
9696
for (auto v : kv.second) {
9797
auto var = block->Var(v);
98-
var->SetDataType(f::proto::DataType::FP32);
98+
var->SetDataType(f::proto::VarType::FP32);
9999
}
100100
}
101101

@@ -122,33 +122,37 @@ void StartServerNet(bool is_sparse) {
122122

123123
// sub program run in listen_and_serv_op, for simple test we use sum
124124
f::ProgramDesc program;
125-
f::BlockDesc *block = program.MutableBlock(0);
125+
f::BlockDesc *optimize_block = program.MutableBlock(0);
126126
// X for server side tensors, RX for received tensers, must be of same shape.
127-
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block);
127+
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
128128

129129
f::AttributeMap attrs;
130130
attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
131+
attrs.insert({"Fanin", 1});
131132
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
132133
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
133-
attrs.insert({"OptimizeBlock", block});
134+
attrs.insert({"OptimizeBlock", optimize_block});
134135
listen_and_serv_op =
135-
f::OpRegistry::CreateOp("listen_and_serv", {}, {}, attrs);
136+
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
136137
listen_and_serv_op->Run(scope, place);
137138
}
138139

139140
TEST(SendRecvOp, CPUDense) {
140141
std::thread server_thread(StartServerNet, false);
141-
sleep(10); // wait server to start
142+
sleep(5); // wait server to start
142143
// local net
143144
f::Scope scope;
144145
p::CPUPlace place;
145146
InitTensorsInScope(scope, place);
147+
// create rpc client var
148+
scope.Var("RPC_CLIENT_VAR");
146149

147150
f::AttributeMap attrs;
148151
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
149152
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
150-
auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}},
151-
{{"Out", {"Out"}}}, attrs);
153+
auto send_op = f::OpRegistry::CreateOp(
154+
"send", {{"X", {"x1"}}},
155+
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
152156
send_op->Run(scope, place);
153157

154158
auto in_var = scope.Var("x1");
@@ -175,11 +179,13 @@ TEST(SendRecvOp, CPUSparse) {
175179
p::CPUPlace place;
176180
p::CPUDeviceContext ctx(place);
177181
InitSelectedRowsInScope(scope, place);
182+
scope.Var("RPC_CLIENT_VAR");
178183
f::AttributeMap attrs;
179184
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
180185
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
181-
auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}},
182-
{{"Out", {"Out"}}}, attrs);
186+
auto send_op = f::OpRegistry::CreateOp(
187+
"send", {{"X", {"x1"}}},
188+
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
183189
send_op->Run(scope, place);
184190

185191
auto x0 = scope.Var("x0")->GetMutable<f::SelectedRows>();

0 commit comments

Comments
 (0)