Skip to content

Commit 7722baa

Browse files
committed
follow comments and clean code
1 parent c891189 commit 7722baa

File tree

8 files changed

+160
-107
lines changed

8 files changed

+160
-107
lines changed

paddle/fluid/framework/details/broadcast_op_handle.cc

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ namespace details {
2222

2323
void BroadcastOpHandle::RunImpl() {
2424
if (places_.size() == 1) return;
25-
// the input and output may have dummy var.
26-
VarHandle *in_var_handle;
2725

26+
// The input and output may have dummy vars.
27+
VarHandle *in_var_handle;
2828
{
2929
auto in_var_handles = DynamicCast<VarHandle>(inputs_);
3030
PADDLE_ENFORCE_EQ(in_var_handles.size(), 1,
@@ -53,23 +53,39 @@ void BroadcastOpHandle::RunImpl() {
5353

5454
Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
5555

56+
// NOTE(zcd): the Place of input can be get from in_tensor and in_var_handle ,
57+
// maybe they are different, because the Place that getting from in_tensor is
58+
// determined at runtime, the other is determined at building SSA graph stage.
59+
// If they are different, DataTransform should be applied. Currently, it has
60+
// not been done yet.
61+
for (auto *out_var_handle : out_var_handles) {
62+
if (*out_var_handle == *in_var_handle) {
63+
continue;
64+
}
65+
auto &out_p = out_var_handle->place_;
66+
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
67+
->FindVar(out_var_handle->name_);
68+
PADDLE_ENFORCE_NOT_NULL(out_var);
69+
PADDLE_ENFORCE_EQ(
70+
out_p.which(), in_tensor.place().which(),
71+
"Currently, Places of input and output must be all on CPU "
72+
"or all on GPU.");
73+
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
74+
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
75+
in_tensor.type());
76+
}
77+
5678
if (platform::is_cpu_place(in_tensor.place())) {
57-
for (auto *out : out_var_handles) {
58-
if (*out == *in_var_handle) {
79+
for (auto *out_var_handle : out_var_handles) {
80+
if (*out_var_handle == *in_var_handle) {
5981
continue;
6082
}
6183

62-
auto &out_p = out->place_;
63-
auto *out_var = var_scopes.at(out->scope_idx_)->FindVar(out->name_);
64-
PADDLE_ENFORCE_NOT_NULL(out_var);
65-
PADDLE_ENFORCE_EQ(out_p.which(), in_tensor.place().which(),
66-
"Places must be all on CPU or all on CUDA.");
67-
68-
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
69-
VariableVisitor::GetMutableTensor(out_var).mutable_data(out_p,
70-
in_tensor.type());
71-
84+
auto &out_p = out_var_handle->place_;
7285
auto dev_ctx = dev_ctxes_.at(out_p);
86+
auto *out_var = var_scopes.at(out_var_handle->scope_idx_)
87+
->FindVar(out_var_handle->name_);
88+
7389
RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
7490
paddle::framework::TensorCopy(
7591
in_tensor, out_p, *dev_ctx,
@@ -78,35 +94,21 @@ void BroadcastOpHandle::RunImpl() {
7894
}
7995
} else {
8096
#ifdef PADDLE_WITH_CUDA
81-
PADDLE_ENFORCE(platform::is_gpu_place(in_tensor.place()));
82-
VarHandle *out_handle;
83-
int root = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
97+
VarHandle *out_handle = nullptr;
98+
int root_id = boost::get<platform::CUDAPlace>(in_tensor.place()).device;
8499
std::vector<std::function<void()>> broadcast_calls;
85100

86-
for (size_t j = 0; j < out_var_handles.size(); ++j) {
87-
VarHandle *out_var_handle = out_var_handles[j];
101+
for (auto out_var_handle : out_var_handles) {
88102
Variable *out_var = var_scopes.at(out_var_handle->scope_idx_)
89103
->FindVar(out_var_handle->name_);
90104

91-
if (*out_var_handle != *in_var_handle) {
92-
PADDLE_ENFORCE_NOT_NULL(out_var);
93-
PADDLE_ENFORCE_EQ(out_var_handle->place_.which(),
94-
in_tensor.place().which(),
95-
"Places must be all on CPU or all on CUDA.");
96-
VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
97-
VariableVisitor::GetMutableTensor(out_var).mutable_data(
98-
out_var_handle->place_, in_tensor.type());
99-
}
105+
int dst_id =
106+
boost::get<platform::CUDAPlace>(out_var_handle->place_).device;
100107

101-
auto out_p = out_var_handle->place_;
102-
int dev_id = boost::get<platform::CUDAPlace>(out_p).device;
103-
104-
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
105-
auto stream = nccl_ctx.stream();
106-
auto comm = nccl_ctx.comm_;
108+
auto &nccl_ctx = nccl_ctxs_->at(dst_id);
107109

108110
void *send_recv_buffer = nullptr;
109-
if (root == dev_id) {
111+
if (root_id == dst_id) {
110112
send_recv_buffer = const_cast<void *>(in_tensor.data<void>());
111113
out_handle = out_var_handle;
112114
} else {
@@ -116,11 +118,13 @@ void BroadcastOpHandle::RunImpl() {
116118
}
117119

118120
int type = platform::ToNCCLDataType(in_tensor.type());
119-
broadcast_calls.emplace_back([=] {
120-
PADDLE_ENFORCE(platform::dynload::ncclBcast(
121-
send_recv_buffer, in_tensor.numel(),
122-
static_cast<ncclDataType_t>(type), root, comm, stream));
123-
});
121+
size_t numel = static_cast<size_t>(in_tensor.numel());
122+
broadcast_calls.emplace_back(
123+
[send_recv_buffer, numel, type, root_id, &nccl_ctx] {
124+
PADDLE_ENFORCE(platform::dynload::ncclBcast(
125+
send_recv_buffer, numel, static_cast<ncclDataType_t>(type),
126+
root_id, nccl_ctx.comm_, nccl_ctx.stream()));
127+
});
124128
}
125129

126130
this->RunAndRecordEvent([&] {
@@ -130,6 +134,7 @@ void BroadcastOpHandle::RunImpl() {
130134
call();
131135
}
132136
}
137+
// TODO(zcd): Maybe the unequal operator is not appropriate here.
133138
if (*out_handle != *in_var_handle) {
134139
auto out_var = var_scopes.at(in_var_handle->scope_idx_)
135140
->FindVar(out_var_handles[0]->name_);
@@ -140,7 +145,7 @@ void BroadcastOpHandle::RunImpl() {
140145
}
141146
});
142147
#else
143-
PADDLE_THROW("CUDA is not support.");
148+
PADDLE_THROW("CUDA is not enabled.");
144149
#endif
145150
}
146151
}

paddle/fluid/framework/details/gather_op_handle.cc

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ void GatherOpHandle::RunImpl() {
3636
VarHandle *out_var_handle;
3737
{
3838
auto out_var_handles = DynamicCast<VarHandle>(outputs_);
39-
4039
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1,
4140
"The number of output should be one.");
4241
out_var_handle = out_var_handles.front();
@@ -51,43 +50,39 @@ void GatherOpHandle::RunImpl() {
5150
auto pre_in_var =
5251
var_scopes.at(in_0_handle->scope_idx_)->FindVar(in_0_handle->name_);
5352
PADDLE_ENFORCE_NOT_NULL(pre_in_var);
53+
5454
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
5555
"Currently, gather_op only can gather SelectedRows.");
5656

5757
// Wait input done, this Wait is asynchronous operation
5858
WaitInputVarGenerated(in_var_handles);
5959

60+
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
6061
std::vector<int64_t> out_rows;
6162
std::vector<Tensor> in_tensors;
6263

63-
auto &pre_in_value = pre_in_var->Get<framework::SelectedRows>();
64-
// gather the inputs
64+
// Gather the inputs
6565
for (auto *in_handle : in_var_handles) {
6666
auto *in_var =
6767
var_scopes.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
6868
PADDLE_ENFORCE_NOT_NULL(in_var);
69+
VariableVisitor::EnforceShapeAndDTypeEQ(*in_var, *pre_in_var);
6970

7071
auto &in_sr_value = in_var->Get<framework::SelectedRows>();
7172

72-
PADDLE_ENFORCE_EQ(in_sr_value.place().which(), pre_in_value.place().which(),
73-
"Places must be all on CPU or all on GPU.");
74-
PADDLE_ENFORCE_EQ(in_sr_value.value().type(), pre_in_value.value().type(),
75-
"The type of input is not consistent.");
76-
PADDLE_ENFORCE_EQ(in_sr_value.height(), pre_in_value.height(),
77-
"The height of inputs is not consistent.");
78-
PADDLE_ENFORCE_EQ(in_sr_value.GetCompleteDims(),
79-
pre_in_value.GetCompleteDims(),
80-
"The dims of inputs is not consistent.");
81-
8273
auto &in_sr_rows = in_sr_value.rows();
8374
out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end());
8475
in_tensors.emplace_back(in_sr_value.value());
8576
}
8677

87-
// write the output
78+
// TODO(zcd): The Place of var_handle is determined at building SSA graph
79+
// stage, while the Place of var is determined at runtime. If they are
80+
// different, DataTransform should be applied. Currently, it has not been done
81+
// yet.
8882
auto &out_place = out_var_handle->place_;
8983
PADDLE_ENFORCE_EQ(out_place.which(), pre_in_value.place().which(),
90-
"Places must be all on CPU or all on GPU.");
84+
"Currently, Places of input and output must be all on CPU "
85+
"or all on GPU.");
9186
auto out_var =
9287
var_scopes.at(out_var_handle->scope_idx_)->FindVar(out_var_handle->name_);
9388
PADDLE_ENFORCE_NOT_NULL(out_var);
@@ -97,19 +92,18 @@ void GatherOpHandle::RunImpl() {
9792
size_t rows = out_rows.size();
9893
DDim out_dim = pre_in_value.GetCompleteDims();
9994
out_dim[0] = static_cast<int64_t>(rows);
100-
out_value->mutable_value()->Resize(out_dim);
101-
out_value->mutable_value()->mutable_data(out_place,
102-
pre_in_value.value().type());
95+
out_value->mutable_value()->Resize(out_dim).mutable_data(
96+
out_place, pre_in_value.value().type());
10397
Tensor *out_tensor = out_value->mutable_value();
10498

10599
// copy
106100
auto dev_ctx = dev_ctxes_[out_place];
107-
RunAndRecordEvent(out_place, [in_tensors, out_tensor, dev_ctx, out_place] {
101+
RunAndRecordEvent(out_place, [in_tensors, out_tensor, &dev_ctx, out_place] {
108102
int s = 0, e = 0;
109103
for (size_t j = 0; j < in_tensors.size(); ++j) {
110104
e += in_tensors[j].dims()[0];
111105
auto sub_out = out_tensor->Slice(s, e);
112-
paddle::framework::TensorCopy(in_tensors[j], out_place, *(dev_ctx),
106+
paddle::framework::TensorCopy(in_tensors[j], out_place, *dev_ctx,
113107
&sub_out);
114108
s = e;
115109
}

paddle/fluid/framework/details/multi_devices_graph_builder.cc

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,12 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
116116
std::unordered_map<std::string, std::vector<std::unique_ptr<VarHandle>>>>(
117117
places_.size());
118118

119-
// size_t cur_device_id = 0;
120-
size_t update_sparse_gp_device_id = 0;
121-
std::vector<std::unordered_set<std::string>> var_name_on_devices;
122-
std::vector<std::unordered_set<std::string>> bcast_var_name_set;
119+
size_t cur_update_sparse_gp_dev_id = 0;
120+
std::vector<std::unordered_set<std::string>> sparse_var_name_on_devices;
121+
std::vector<std::unordered_set<std::string>> bcast_sparse_var_name_set;
123122

124-
var_name_on_devices.resize(places_.size());
125-
bcast_var_name_set.resize(places_.size());
123+
sparse_var_name_on_devices.resize(places_.size());
124+
bcast_sparse_var_name_set.resize(places_.size());
126125

127126
// Find "send" op first for split is in front of send.
128127
OpDesc *send_op = GetSendOpDesc(program);
@@ -142,13 +141,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
142141
}
143142
is_forwarding = false;
144143
} else {
145-
int op_dev_id = GetOpDeviceID(var_name_on_devices, *op);
144+
int op_dev_id = GetOpDeviceID(sparse_var_name_on_devices, *op);
146145
if (op_dev_id == -1) { // var on all device
147146
CreateComputationalOps(&result, *op, places_.size());
148147
} else {
149148
CreateComputationalOp(&result, *op, op_dev_id);
150149
for (auto &var_name : op->OutputArgumentNames()) {
151-
var_name_on_devices[op_dev_id].emplace(var_name);
150+
sparse_var_name_on_devices[op_dev_id].emplace(var_name);
152151
}
153152
}
154153

@@ -158,10 +157,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
158157
for (auto &og : op->OutputArgumentNames()) {
159158
if (IsParameterGradientOnce(og, &og_has_been_broadcast)) {
160159
if (IsSparseGradient(og)) {
161-
CreateReduceOp(&result, update_sparse_gp_device_id, og);
162-
var_name_on_devices[update_sparse_gp_device_id].emplace(og);
163-
bcast_var_name_set[update_sparse_gp_device_id].emplace(
160+
CreateReduceOp(&result, cur_update_sparse_gp_dev_id, og);
161+
sparse_var_name_on_devices[cur_update_sparse_gp_dev_id].emplace(
162+
og);
163+
bcast_sparse_var_name_set[cur_update_sparse_gp_dev_id].emplace(
164164
og.substr(0, og.size() - strlen(kGradVarSuffix)));
165+
cur_update_sparse_gp_dev_id =
166+
(cur_update_sparse_gp_dev_id + 1) % places_.size();
165167
} else {
166168
InsertNCCLAllReduceOp(&result, og);
167169
}
@@ -172,8 +174,8 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
172174
}
173175

174176
// Insert BCast Ops
175-
for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) {
176-
auto &to_bcast_set = bcast_var_name_set[dev_id];
177+
for (size_t dev_id = 0; dev_id < bcast_sparse_var_name_set.size(); ++dev_id) {
178+
auto &to_bcast_set = bcast_sparse_var_name_set[dev_id];
177179
for (auto &bcast_name : to_bcast_set) {
178180
CreateBroadcastOp(&result, bcast_name, dev_id);
179181
}
@@ -206,13 +208,14 @@ bool MultiDevSSAGraphBuilder::IsSparseGradient(const std::string &og) const {
206208
}
207209

208210
int MultiDevSSAGraphBuilder::GetOpDeviceID(
209-
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
211+
const std::vector<std::unordered_set<std::string>>
212+
&sparse_var_name_on_devices,
210213
const OpDesc &op) const {
211214
int var_dev_id = -1;
212215
for (auto &var_name : op.InputArgumentNames()) {
213216
if (var_dev_id != -1) break;
214-
for (size_t i = 0; i < var_name_on_devices.size(); ++i) {
215-
if (var_name_on_devices[i].count(var_name)) {
217+
for (size_t i = 0; i < sparse_var_name_on_devices.size(); ++i) {
218+
if (sparse_var_name_on_devices[i].count(var_name)) {
216219
var_dev_id = static_cast<int>(i);
217220
break;
218221
}

paddle/fluid/framework/details/reduce_op_handle.cc

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,30 @@ void ReduceOpHandle::RunImpl() {
5252

5353
// Wait input done, this Wait is asynchronous operation
5454
WaitInputVarGenerated(in_var_handles);
55-
auto pre_place = in_0_handle->place_;
55+
5656
std::vector<platform::Place> in_places; // used to get dev_ctx
57-
auto pre_in_tensor = VariableVisitor::GetMutableTensor(pre_in_var);
5857
for (auto *in_handle : in_var_handles) {
5958
in_places.emplace_back(in_handle->place_);
60-
6159
auto in_var =
6260
var_scopes.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
6361
PADDLE_ENFORCE_NOT_NULL(in_var);
64-
65-
auto in_tensor = VariableVisitor::GetMutableTensor(in_var);
66-
PADDLE_ENFORCE_EQ(pre_in_tensor.place().which(), in_tensor.place().which(),
67-
"Places must be all on CPU or all on GPU.");
68-
PADDLE_ENFORCE_EQ(in_tensor.type(), pre_in_tensor.type(),
69-
"The type of input is not consistent.");
62+
VariableVisitor::EnforceShapeAndDTypeEQ(*pre_in_var, *in_var);
7063
}
7164

7265
auto out_var =
7366
var_scopes.at(out_var_handle->scope_idx_)->FindVar(out_var_handle->name_);
7467
PADDLE_ENFORCE_NOT_NULL(out_var);
7568

69+
// TODO(zcd): The Place of var_handle is determined at building SSA graph
70+
// stage, while the Place of var is determined at runtime. If they are
71+
// different, DataTransform should be applied. Currently, it has not been done
72+
// yet.
73+
PADDLE_ENFORCE_EQ(
74+
VariableVisitor::GetMutableTensor(pre_in_var).place().which(),
75+
out_var_handle->place_.which(),
76+
"Currently, Places of input and output must be all on CPU or all on "
77+
"GPU.");
78+
7679
if (pre_in_var->IsType<framework::SelectedRows>()) {
7780
std::vector<const SelectedRows *> in_selected_rows =
7881
GetInputValues<SelectedRows>(in_var_handles, var_scopes);
@@ -96,31 +99,31 @@ void ReduceOpHandle::RunImpl() {
9699
out_var_handle->place_, pre_in.type());
97100

98101
auto out_p = out_var_handle->place_;
99-
int root = boost::get<platform::CUDAPlace>(out_p).device;
102+
int root_id = boost::get<platform::CUDAPlace>(out_p).device;
100103
std::vector<std::function<void()>> all_reduce_calls;
101104
for (size_t i = 0; i < var_scopes.size(); ++i) {
102105
auto &p = in_places[i];
103106
auto &lod_tensor = *lod_tensors[i];
104107

105108
int dev_id = boost::get<platform::CUDAPlace>(p).device;
106109
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
107-
auto stream = nccl_ctx.stream();
108-
auto comm = nccl_ctx.comm_;
109110

110111
void *buffer = const_cast<void *>(lod_tensor.data<void>());
111112
void *recvbuffer = nullptr;
112-
if (root == dev_id) {
113+
if (root_id == dev_id) {
113114
recvbuffer =
114115
out_var->GetMutable<framework::LoDTensor>()->mutable_data(
115116
out_var_handle->place_);
116117
}
117118

118119
int type = platform::ToNCCLDataType(lod_tensor.type());
119-
all_reduce_calls.emplace_back([=] {
120-
PADDLE_ENFORCE(platform::dynload::ncclReduce(
121-
buffer, recvbuffer, static_cast<size_t>(lod_tensor.numel()),
122-
static_cast<ncclDataType_t>(type), ncclSum, root, comm, stream));
123-
});
120+
size_t numel = static_cast<size_t>(lod_tensor.numel());
121+
all_reduce_calls.emplace_back(
122+
[buffer, recvbuffer, type, numel, root_id, &nccl_ctx] {
123+
PADDLE_ENFORCE(platform::dynload::ncclReduce(
124+
buffer, recvbuffer, numel, static_cast<ncclDataType_t>(type),
125+
ncclSum, root_id, nccl_ctx.comm_, nccl_ctx.stream()));
126+
});
124127
}
125128

126129
this->RunAndRecordEvent([&] {
@@ -130,7 +133,7 @@ void ReduceOpHandle::RunImpl() {
130133
}
131134
});
132135
#else
133-
PADDLE_THROW("CUDA is not support.");
136+
PADDLE_THROW("CUDA is not enabled.");
134137
#endif
135138
} else {
136139
PADDLE_THROW("Place should be CPUPlace or CUDAPlace.");

0 commit comments

Comments
 (0)