Skip to content

Commit b6dc3a5

Browse files
committed
Add DataBalanceOpHandle to MultiDeviceSSAGragh
1 parent a1f1a5e commit b6dc3a5

File tree

7 files changed

+229
-5
lines changed

7 files changed

+229
-5
lines changed

paddle/fluid/framework/details/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ else()
2525
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
2626
endif()
2727

28+
cc_library(data_balance_op_handle SRCS data_balance_op_handle.cc DEPS op_handle_base scope lod_tensor)
2829
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory variable_visitor)
2930
cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base scope)
3031

3132
cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
32-
scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle)
33+
scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle)
3334

3435

3536
cc_library(ssa_graph_builder_factory SRCS ssa_graph_builder_factory.cc DEPS multi_devices_graph_builder ssa_graph_printer ssa_graph_checker)
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/framework/details/data_balance_op_handle.h"
16+
#include <algorithm>
17+
#include "paddle/fluid/framework/details/container_cast.h"
18+
19+
namespace paddle {
20+
namespace framework {
21+
namespace details {
22+
23+
DataBalanceOpHandle::DataBalanceOpHandle(
24+
const std::vector<Scope *> &local_scopes,
25+
const std::vector<platform::Place> &places)
26+
: local_scopes_(local_scopes), places_(places) {}
27+
28+
std::string DataBalanceOpHandle::Name() const { return "data balance"; }
29+
30+
std::vector<std::array<int, 3>> DataBalanceOpHandle::GetBalancePlan(
31+
const std::vector<int> &device_sizes) {
32+
int device_num = device_sizes.size();
33+
int total_size = 0;
34+
int empty_num = 0;
35+
std::vector<std::array<int, 2>> size_device_vec;
36+
size_device_vec.reserve(device_num);
37+
for (int i = 0; i < device_num; ++i) {
38+
if (device_sizes[i] == 0) {
39+
++empty_num;
40+
}
41+
total_size += device_sizes[i];
42+
size_device_vec.push_back({{device_sizes[i], i}});
43+
}
44+
std::vector<std::array<int, 3>> res;
45+
if (empty_num == 0) {
46+
// No need to do data balance.
47+
return res;
48+
}
49+
if (total_size < device_num) {
50+
// No enough data.
51+
PADDLE_THROW("There is no next data.");
52+
}
53+
std::sort(size_device_vec.begin(), size_device_vec.end(),
54+
[](const std::array<int, 2> &a, const std::array<int, 2> &b) {
55+
return a[0] > b[0];
56+
});
57+
int expected_device_size = total_size / device_num;
58+
int src_idx = 0;
59+
for (int dst_idx = device_num - empty_num; dst_idx < device_num; ++dst_idx) {
60+
if (size_device_vec[src_idx][0] <= expected_device_size) {
61+
++src_idx;
62+
PADDLE_ENFORCE_LT(src_idx, device_num - empty_num);
63+
}
64+
size_device_vec[src_idx][0] -= expected_device_size;
65+
size_device_vec[dst_idx][0] += expected_device_size;
66+
res.push_back({{size_device_vec[src_idx][1], size_device_vec[dst_idx][1],
67+
expected_device_size}});
68+
}
69+
return res;
70+
}
71+
72+
void DataBalanceOpHandle::RunImpl() {
73+
if (places_.size() == 1) {
74+
return;
75+
}
76+
auto in_var_handles = DynamicCast<VarHandle>(inputs_);
77+
auto out_var_handles = DynamicCast<VarHandle>(outputs_);
78+
PADDLE_ENFORCE(in_var_handles.size() % places_.size() == 0);
79+
PADDLE_ENFORCE_EQ(
80+
in_var_handles.size(), out_var_handles.size(),
81+
"The NoDummyInputSize and NoDummyOutputSize should be equal.");
82+
int data_num = in_var_handles.size() / places_.size();
83+
WaitInputVarGenerated();
84+
85+
std::vector<std::vector<LoDTensor *>> lod_tensors;
86+
std::vector<int> device_sizes;
87+
for (int i = 0; i < static_cast<int>(in_var_handles.size()); ++i) {
88+
PADDLE_ENFORCE_EQ(in_var_handles[i]->name_, out_var_handles[i]->name_,
89+
"The name of input and output should be equal.");
90+
int place_idx = i / data_num;
91+
int data_idx = i % data_num;
92+
auto *local_scope =
93+
local_scopes_[place_idx]->FindVar(kLocalExecScopeName)->Get<Scope *>();
94+
auto *tensor_var = local_scope->FindVar(in_var_handles[i]->name_);
95+
PADDLE_ENFORCE(tensor_var->IsType<LoDTensor>());
96+
auto *tensor = tensor_var->GetMutable<LoDTensor>();
97+
PADDLE_ENFORCE(places_[place_idx] == tensor->place());
98+
lod_tensors[data_idx].push_back(tensor);
99+
int ins_size =
100+
tensor->lod().empty() ? tensor->dims()[0] : tensor->NumElements();
101+
if (data_idx == 0) {
102+
device_sizes.emplace_back(ins_size);
103+
} else {
104+
PADDLE_ENFORCE_EQ(ins_size, device_sizes.at(place_idx));
105+
}
106+
}
107+
const auto &balance_plan = GetBalancePlan(device_sizes);
108+
109+
for (const auto &trans : balance_plan) {
110+
for (int data_idx = 0; data_idx < data_num; ++data_idx) {
111+
LoDTensor *src_tensor = lod_tensors[data_idx][trans[0]];
112+
LoDTensor *dst_tensor = lod_tensors[data_idx][trans[1]];
113+
int trans_ins_size = trans[2];
114+
LoD src_lod = src_tensor->lod();
115+
int src_ins_size =
116+
src_lod.empty() ? src_tensor->dims()[0] : src_tensor->NumElements();
117+
int cut_point = src_ins_size - trans_ins_size;
118+
if (!src_lod.empty()) {
119+
for (auto &level : src_lod) {
120+
cut_point = level[cut_point];
121+
}
122+
}
123+
TensorCopySync(src_tensor->Slice(cut_point, src_tensor->dims()[0]),
124+
dst_tensor->place(), dst_tensor);
125+
src_tensor->ShareDataWith(src_tensor->Slice(0, cut_point));
126+
if (!src_lod.empty()) {
127+
dst_tensor->set_lod(SliceInLevel(
128+
src_lod, 0, src_ins_size - trans_ins_size, src_ins_size));
129+
src_tensor->set_lod(
130+
SliceInLevel(src_lod, 0, 0, src_ins_size - trans_ins_size));
131+
}
132+
}
133+
}
134+
}
135+
136+
} // namespace details
137+
} // namespace framework
138+
} // namespace paddle
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <string>
18+
#include <vector>
19+
#include "paddle/fluid/framework/details/op_handle_base.h"
20+
#include "paddle/fluid/framework/lod_tensor.h"
21+
#include "paddle/fluid/framework/scope.h"
22+
23+
namespace paddle {
24+
namespace framework {
25+
namespace details {
26+
27+
struct DataBalanceOpHandle : public OpHandleBase {
28+
public:
29+
DataBalanceOpHandle(const std::vector<Scope *> &local_scopes,
30+
const std::vector<platform::Place> &places);
31+
32+
std::string Name() const override;
33+
34+
bool IsMultiDeviceTransfer() override { return false; };
35+
36+
protected:
37+
void RunImpl() override;
38+
39+
private:
40+
// std::vector<(src_dev_id, dst_dev_id, trans_size)>
41+
std::vector<std::array<int, 3>> GetBalancePlan(
42+
const std::vector<int> &batch_size_per_device);
43+
44+
const std::vector<Scope *> &local_scopes_;
45+
const std::vector<platform::Place> &places_;
46+
};
47+
48+
} // namespace details
49+
} // namespace framework
50+
} // namespace paddle

paddle/fluid/framework/details/multi_devices_graph_builder.cc

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
2121
#include "paddle/fluid/framework/details/broadcast_op_handle.h"
2222
#include "paddle/fluid/framework/details/computation_op_handle.h"
23+
#include "paddle/fluid/framework/details/data_balance_op_handle.h"
2324
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
2425
#include "paddle/fluid/framework/details/reduce_op_handle.h"
2526
#include "paddle/fluid/framework/details/rpc_op_handle.h"
@@ -217,6 +218,11 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
217218
// gradients.
218219
CreateComputationalOps(&result, *op, places_.size());
219220

221+
if (op->Type() == "read") {
222+
const auto &data_var_names = op->Output("Out");
223+
InsertDataBalanceOp(&result, data_var_names);
224+
}
225+
220226
if (!is_forwarding && places_.size() > 1) {
221227
// Currently, we assume that once gradient is generated, it can be
222228
// broadcast, and each gradient is only broadcast once.
@@ -360,6 +366,24 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(SSAGraph *result,
360366
}
361367
}
362368

369+
void MultiDevSSAGraphBuilder::InsertDataBalanceOp(
370+
SSAGraph *result, const std::vector<std::string> &datas) const {
371+
result->ops_.emplace_back(new DataBalanceOpHandle(local_scopes_, places_));
372+
auto *op_handle = result->ops_.back().get();
373+
for (size_t i = 0; i < places_.size(); ++i) {
374+
auto &p = places_[i];
375+
SetCommunicationContext(op_handle, p);
376+
for (const std::string &d_name : datas) {
377+
auto &vars = result->vars_[i][d_name];
378+
PADDLE_ENFORCE(!vars.empty());
379+
op_handle->AddInput(vars.back().get());
380+
auto var = new VarHandle(vars.size(), i, d_name, p);
381+
vars.emplace_back(var);
382+
op_handle->AddOutput(var);
383+
}
384+
}
385+
}
386+
363387
bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
364388
const std::string &og,
365389
std::unordered_set<std::string> *og_has_been_broadcast) const {
@@ -509,7 +533,8 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(SSAGraph *result,
509533
op_dev_id = GetVarDeviceID(op.InputArgumentNames()[0]);
510534
// the variable name which contains .block means it was splited by
511535
// split_byref op
512-
// so that we can balance the variable blocks to all the pserver instances.
536+
// so that we can balance the variable blocks to all the pserver
537+
// instances.
513538
if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce &&
514539
op.InputArgumentNames()[0].find(".block") == std::string::npos) {
515540
op_dev_id = GetAppropriateDeviceID(op.InputArgumentNames());

paddle/fluid/framework/details/multi_devices_graph_builder.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
101101

102102
void InsertAllReduceOp(SSAGraph *result, const std::string &og) const;
103103

104+
void InsertDataBalanceOp(SSAGraph *result,
105+
const std::vector<std::string> &datas) const;
106+
104107
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
105108
size_t src_dev_id) const;
106109

paddle/fluid/framework/lod_tensor.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
6868
// only print first ten elements
6969
int64_t size = t.numel() < 10 ? t.numel() : 10;
7070
for (int64_t i = 0; i < size; ++i) {
71-
if (t.type().hash_code() == typeid(float).hash_code()) {
71+
if (t.type().hash_code() == typeid(float).hash_code()) { // NOLINT
7272
os << t.data<float>()[i] << " ";
7373
} else if (t.type().hash_code() == typeid(int64_t).hash_code()) {
7474
os << t.data<int64_t>()[i] << " ";
@@ -89,6 +89,7 @@ std::string LoDToString(const LoD &lod) {
8989
LoD SliceInLevel(const LoD &in, size_t level, size_t elem_begin,
9090
size_t elem_end) {
9191
PADDLE_ENFORCE_LT(level, in.size());
92+
PADDLE_ENFORCE_LT(elem_begin, elem_end);
9293
PADDLE_ENFORCE_LT(elem_end, in[level].size());
9394

9495
LoD res;

paddle/fluid/operators/read_op.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,15 @@ class ReadOp : public framework::OperatorBase {
6666
std::vector<std::string> out_arg_names = Outputs("Out");
6767
std::vector<framework::LoDTensor> ins;
6868
reader->ReadNext(&ins);
69-
PADDLE_ENFORCE(!ins.empty(), "There is no next data.");
69+
if (ins.empty()) {
70+
ins.resize(out_arg_names.size());
71+
for (auto& tensor : ins) {
72+
// data type is not important for subsequent DataBalanceOpHandle
73+
tensor.mutable_data<float>(framework::make_ddim({0}), dev_place);
74+
}
75+
}
7076
PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size());
71-
for (size_t i = 0; i < ins.size(); ++i) {
77+
for (size_t i = 0; i < out_arg_names.size(); ++i) {
7278
auto* out =
7379
scope.FindVar(out_arg_names[i])->GetMutable<framework::LoDTensor>();
7480
out->ShareDataWith(ins[i]);

0 commit comments

Comments
 (0)