@@ -95,7 +95,7 @@ void AddOp(const std::string &type, const f::VariableNameMap &inputs,
95
95
for (auto kv : outputs) {
96
96
for (auto v : kv.second ) {
97
97
auto var = block->Var (v);
98
- var->SetDataType (f::proto::DataType ::FP32);
98
+ var->SetDataType (f::proto::VarType ::FP32);
99
99
}
100
100
}
101
101
@@ -122,33 +122,37 @@ void StartServerNet(bool is_sparse) {
122
122
123
123
// sub program run in listen_and_serv_op, for simple test we use sum
124
124
f::ProgramDesc program;
125
- f::BlockDesc *block = program.MutableBlock (0 );
125
+ f::BlockDesc *optimize_block = program.MutableBlock (0 );
126
126
// X for server side tensors, RX for received tensers, must be of same shape.
127
- AddOp (" sum" , {{" X" , {" x0" , " x1" }}}, {{" Out" , {" Out" }}}, {}, block );
127
+ AddOp (" sum" , {{" X" , {" x0" , " x1" }}}, {{" Out" , {" Out" }}}, {}, optimize_block );
128
128
129
129
f::AttributeMap attrs;
130
130
attrs.insert ({" endpoint" , std::string (" 127.0.0.1:6174" )});
131
+ attrs.insert ({" Fanin" , 1 });
131
132
attrs.insert ({" ParamList" , std::vector<std::string>({" Out" })});
132
133
attrs.insert ({" GradList" , std::vector<std::string>({" x1" })});
133
- attrs.insert ({" OptimizeBlock" , block });
134
+ attrs.insert ({" OptimizeBlock" , optimize_block });
134
135
listen_and_serv_op =
135
- f::OpRegistry::CreateOp (" listen_and_serv" , {}, {}, attrs);
136
+ f::OpRegistry::CreateOp (" listen_and_serv" , {{ " X " , { " x1 " }} }, {}, attrs);
136
137
listen_and_serv_op->Run (scope, place);
137
138
}
138
139
139
140
TEST (SendRecvOp, CPUDense) {
140
141
std::thread server_thread (StartServerNet, false );
141
- sleep (10 ); // wait server to start
142
+ sleep (5 ); // wait server to start
142
143
// local net
143
144
f::Scope scope;
144
145
p::CPUPlace place;
145
146
InitTensorsInScope (scope, place);
147
+ // create rpc client var
148
+ scope.Var (" RPC_CLIENT_VAR" );
146
149
147
150
f::AttributeMap attrs;
148
151
attrs.insert ({" endpoints" , std::vector<std::string>({" 127.0.0.1:6174" })});
149
152
attrs.insert ({" epmap" , std::vector<std::string>({" 127.0.0.1:6174" })});
150
- auto send_op = f::OpRegistry::CreateOp (" send" , {{" X" , {" x1" }}},
151
- {{" Out" , {" Out" }}}, attrs);
153
+ auto send_op = f::OpRegistry::CreateOp (
154
+ " send" , {{" X" , {" x1" }}},
155
+ {{" Out" , {" Out" }}, {" RPCClient" , {" RPC_CLIENT_VAR" }}}, attrs);
152
156
send_op->Run (scope, place);
153
157
154
158
auto in_var = scope.Var (" x1" );
@@ -175,11 +179,13 @@ TEST(SendRecvOp, CPUSparse) {
175
179
p::CPUPlace place;
176
180
p::CPUDeviceContext ctx (place);
177
181
InitSelectedRowsInScope (scope, place);
182
+ scope.Var (" RPC_CLIENT_VAR" );
178
183
f::AttributeMap attrs;
179
184
attrs.insert ({" endpoints" , std::vector<std::string>({" 127.0.0.1:6174" })});
180
185
attrs.insert ({" epmap" , std::vector<std::string>({" 127.0.0.1:6174" })});
181
- auto send_op = f::OpRegistry::CreateOp (" send" , {{" X" , {" x1" }}},
182
- {{" Out" , {" Out" }}}, attrs);
186
+ auto send_op = f::OpRegistry::CreateOp (
187
+ " send" , {{" X" , {" x1" }}},
188
+ {{" Out" , {" Out" }}, {" RPCClient" , {" RPC_CLIENT_VAR" }}}, attrs);
183
189
send_op->Run (scope, place);
184
190
185
191
auto x0 = scope.Var (" x0" )->GetMutable <f::SelectedRows>();
0 commit comments