Skip to content

Commit 4663cb2

Browse files
YanceyYancey0623
authored andcommitted
Merge pull request #11585 from Yancey1989/fix_pserver_sub_blocks
fix pserver sub-blocks
1 parent fac1d47 commit 4663cb2

File tree

11 files changed

+78
-38
lines changed

11 files changed

+78
-38
lines changed

paddle/fluid/framework/framework.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ enum AttrType {
2727
BOOLEANS = 7;
2828
BLOCK = 8;
2929
LONG = 9;
30+
BLOCKS = 10;
3031
}
3132

3233
// OpDesc describes an instance of a C++ framework::OperatorBase
@@ -46,6 +47,7 @@ message OpDesc {
4647
repeated bool bools = 11;
4748
optional int32 block_idx = 12;
4849
optional int64 l = 13;
50+
repeated int32 blocks_idx = 14;
4951
};
5052

5153
message Var {

paddle/fluid/framework/op_desc.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) {
211211
need_update_ = true;
212212
}
213213

214+
void OpDesc::SetBlocksAttr(const std::string &name,
215+
std::vector<BlockDesc *> blocks) {
216+
this->attrs_[name] = blocks;
217+
need_update_ = true;
218+
}
219+
214220
void OpDesc::SetAttrMap(
215221
const std::unordered_map<std::string, Attribute> &attr_map) {
216222
attrs_ = attr_map;
@@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor<void> {
305311
void operator()(const std::vector<bool> &v) const {
306312
VectorToRepeated(v, attr_->mutable_bools());
307313
}
314+
void operator()(const std::vector<BlockDesc *> &v) const {
315+
std::vector<int> blocks_idx;
316+
for (auto blk : v) {
317+
blocks_idx.push_back(blk->ID());
318+
}
319+
VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx());
320+
}
308321
void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); }
309322
void operator()(int64_t v) const { attr_->set_l(v); }
310323
void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); }

paddle/fluid/framework/op_desc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class OpDesc {
7777

7878
void SetBlockAttr(const std::string &name, BlockDesc *block);
7979

80+
void SetBlocksAttr(const std::string &name, std::vector<BlockDesc *> blocks);
81+
8082
Attribute GetAttr(const std::string &name) const;
8183

8284
Attribute GetNullableAttr(const std::string &name) const;

paddle/fluid/framework/type_defs.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ using VariableNameMap = std::map<std::string, std::vector<std::string>>;
3535
using Attribute =
3636
boost::variant<boost::blank, int, float, std::string, std::vector<int>,
3737
std::vector<float>, std::vector<std::string>, bool,
38-
std::vector<bool>, BlockDesc*, int64_t>;
38+
std::vector<bool>, BlockDesc*, int64_t,
39+
std::vector<BlockDesc*>>;
3940

4041
using AttributeMap = std::unordered_map<std::string, Attribute>;
4142

paddle/fluid/operators/listen_and_serv_op.cc

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,16 @@ void ListenAndServOp::RunSyncLoop(
101101
framework::Scope *recv_scope,
102102
const std::vector<int> &prefetch_block_id_list) const {
103103
size_t num_blocks = program->Size();
104+
auto optimize_blocks =
105+
Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks);
104106
PADDLE_ENFORCE_GE(num_blocks, 2,
105107
"server program should have at least 2 blocks");
106108

107-
std::vector<int> optimize_block_id_list;
108-
for (int blkid = 1; blkid < num_blocks; ++blkid) {
109-
if (std::find(prefetch_block_id_list.begin(), prefetch_block_id_list.end(),
110-
blkid) == prefetch_block_id_list.end()) {
111-
optimize_block_id_list.push_back(blkid);
112-
}
109+
std::vector<int> optimize_blocks_idx;
110+
for (auto blk : optimize_blocks) {
111+
optimize_blocks_idx.push_back(blk->ID());
113112
}
114-
auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list);
113+
auto optimize_prepared = executor->Prepare(*program, optimize_blocks_idx);
115114
// Insert placeholder for block0 which holds current op itself.
116115
optimize_prepared.insert(
117116
optimize_prepared.begin(),
@@ -134,14 +133,14 @@ void ListenAndServOp::RunSyncLoop(
134133
// and this will still work.
135134
// The optimize blocks which have the same parent ID would run parallel
136135
// TODO(Yancey1989): need to use ParallelExecutor for future
137-
int32_t last_parent_blkid = program->Block(1).Parent();
136+
int32_t last_parent_blkid = optimize_blocks[0]->Parent();
138137
std::vector<size_t> parallel_blkids;
139-
parallel_blkids.push_back(1);
138+
parallel_blkids.push_back(optimize_blocks[0]->ID());
140139
double ts = GetTimestamp();
141-
for (size_t i = 1; i < optimize_block_id_list.size(); ++i) {
140+
for (size_t i = 1; i < optimize_blocks.size(); ++i) {
142141
// skip the first optimize block because it is already in the
143142
// parallel_blkids.
144-
int blkid = optimize_block_id_list[i];
143+
int blkid = optimize_blocks[i]->ID();
145144
if (program->Block(blkid).Parent() != last_parent_blkid) {
146145
ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared,
147146
program, recv_scope);
@@ -259,8 +258,11 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
259258
rpc_service_->RegisterRPC(distributed::kRequestPrefetch,
260259
request_prefetch_handler_.get());
261260

262-
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
263-
auto *program = optimize_block->Program();
261+
auto optimize_blocks =
262+
Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks);
263+
PADDLE_ENFORCE(optimize_blocks.size() >= 1,
264+
"optimize blocks should be 1 at least on the pserver side.");
265+
auto *program = optimize_blocks[0]->Program();
264266
framework::Executor executor(dev_place);
265267

266268
// prepare for prefetch
@@ -337,8 +339,9 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
337339
"a map from grad name to it's optimize block id")
338340
.SetDefault({});
339341
AddAttr<bool>("sync_mode", "if works at sync_mode or not").SetDefault(true);
340-
AddAttr<framework::BlockDesc *>(kOptimizeBlock,
341-
"BlockID to run on server side.");
342+
AddAttr<std::vector<framework::BlockDesc *>>(
343+
kOptimizeBlocks, "Optimize blocks to run on server side.")
344+
.SetDefault({});
342345
AddAttr<std::vector<std::string>>(kPrefetchVarNameToBlockId,
343346
"prefetch blocks to run on server side.")
344347
.SetDefault({});

paddle/fluid/operators/listen_and_serv_op.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ limitations under the License. */
3030
namespace paddle {
3131
namespace operators {
3232

33-
constexpr char kOptimizeBlock[] = "OptimizeBlock";
33+
constexpr char kOptimizeBlocks[] = "optimize_blocks";
3434
constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id";
3535

3636
void RunServer(std::shared_ptr<distributed::RPCServer> service);

paddle/fluid/operators/send_recv_op_test.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) {
129129
// sub program run in listen_and_serv_op, for simple test we use sum
130130
f::ProgramDesc program;
131131
const auto &root_block = program.Block(0);
132+
std::vector<framework::BlockDesc *> optimize_blocks;
132133
auto *optimize_block = program.AppendBlock(root_block);
134+
optimize_blocks.push_back(optimize_block);
135+
133136
auto *prefetch_block = program.AppendBlock(root_block);
134137
// X for server side tensors, RX for received tensors, must be of same shape.
135138
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block,
@@ -139,7 +142,7 @@ void StartServerNet(bool is_sparse, std::atomic<bool> *initialized) {
139142
attrs.insert({"Fanin", 1});
140143
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
141144
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
142-
attrs.insert({"OptimizeBlock", optimize_block});
145+
attrs.insert({"optimize_blocks", optimize_blocks});
143146
attrs.insert({"PrefetchBlock", prefetch_block});
144147
attrs.insert({"grad_to_block_id", std::vector<std::string>({""})});
145148
attrs.insert({"sync_mode", true});

paddle/fluid/pybind/protobuf.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ void BindOpDesc(pybind11::module *m) {
268268
.value("STRINGS", pd::proto::AttrType::STRINGS)
269269
.value("BOOL", pd::proto::AttrType::BOOLEAN)
270270
.value("BOOLS", pd::proto::AttrType::BOOLEANS)
271-
.value("BLOCK", pd::proto::AttrType::BLOCK);
271+
.value("BLOCK", pd::proto::AttrType::BLOCK)
272+
.value("BLOCKS", pd::proto::AttrType::BLOCKS);
272273

273274
pybind11::class_<pd::OpDesc> op_desc(*m, "OpDesc", "");
274275
op_desc
@@ -293,6 +294,7 @@ void BindOpDesc(pybind11::module *m) {
293294
.def("set_attr", &pd::OpDesc::SetAttr)
294295
.def("attr", &pd::OpDesc::GetAttr)
295296
.def("set_block_attr", &pd::OpDesc::SetBlockAttr)
297+
.def("set_blocks_attr", &pd::OpDesc::SetBlocksAttr)
296298
.def("set_serialized_attr",
297299
[](pd::OpDesc &self, const std::string &name,
298300
const pybind11::bytes &seriralized) {

python/paddle/fluid/framework.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -558,15 +558,20 @@ def find_name(var_list, name):
558558
if (attr_name not in self.attrs) or (
559559
self.attrs[attr_name] is None):
560560
continue
561-
if isinstance(self.attrs[attr_name], Block):
561+
attr_val = self.attrs[attr_name]
562+
if isinstance(attr_val, Block):
562563
self.desc.set_block_attr(attr_name,
563564
self.attrs[attr_name].desc)
564-
elif isinstance(self.attrs[attr_name], core.BlockDesc) or \
565-
isinstance(self.attrs[attr_name], core.ProgramDesc):
565+
elif isinstance(attr_val, list) and attr_val and \
566+
all(isinstance(v, Block) for v in attr_val):
567+
self.desc.set_blocks_attr(attr_name,
568+
[v.desc for v in attr_val])
569+
elif isinstance(attr_val, core.BlockDesc) or \
570+
isinstance(attr_val, core.ProgramDesc):
566571
self.desc.set_serialized_attr(
567-
attr_name, self.attrs[attr_name].serialize_to_string())
572+
attr_name, attr_val.serialize_to_string())
568573
else:
569-
self.desc.set_attr(attr_name, self.attrs[attr_name])
574+
self.desc.set_attr(attr_name, attr_val)
570575
self.desc.check_attrs()
571576
if self.has_kernel(type):
572577
self.desc.infer_var_type(self.block.desc)
@@ -715,6 +720,9 @@ def set_attr(self, name, val):
715720
self.attrs[name] = val
716721
if isinstance(val, Block):
717722
self.desc.set_block_attr(name, val.desc)
723+
elif isinstance(val, list) and val and all(
724+
isinstance(v, Block) for v in val):
725+
self.desc.set_blocks_attr(name, [v.desc for v in val])
718726
elif isinstance(val, core.BlockDesc) or \
719727
isinstance(val, core.ProgramDesc):
720728
self.desc.set_serialized_attr(name, val.serialize_to_string())

python/paddle/fluid/layers/io.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ def complete_op(self):
186186
main_program = self.helper.main_program
187187
current_block = main_program.current_block()
188188
parent_block = self.parent_block()
189-
empty_block = Program().global_block()
190189

191190
parent_block.append_op(
192191
type='listen_and_serv',
@@ -195,8 +194,9 @@ def complete_op(self):
195194
attrs={
196195
'endpoint': self.endpoint,
197196
'Fanin': self.fan_in,
198-
'OptimizeBlock': current_block,
199-
'PrefetchBlock': empty_block,
197+
'optimize_blocks': [
198+
current_block
199+
], # did not support multiple optimize blocks in layers
200200
'sync_mode': True, # did not support async now in layers
201201
'grad_to_block_id': [""]
202202
})

0 commit comments

Comments
 (0)