Skip to content

Commit b8a1798

Browse files
reyoungYang Yang(Tony)
authored andcommitted
Feature/parallel for bug fix (#7474)
* Fix ParallelDo not support empty input gradient * Polish ParallelDo and fix several bugs * Fix CI * Fix CI
1 parent c5067a6 commit b8a1798

File tree

4 files changed

+128
-92
lines changed

4 files changed

+128
-92
lines changed

paddle/framework/lod_tensor.cc

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -291,23 +291,32 @@ std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
291291
const std::vector<platform::Place> places) const {
292292
check_memory_size();
293293
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
294-
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
295-
"Batch size should be divided by places size");
296-
297-
std::vector<LoDTensor> lods;
298-
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
299-
int begin = place_idx * dims()[0] / places.size();
300-
int end = (place_idx + 1) * dims()[0] / places.size();
294+
size_t result_size = std::min(static_cast<size_t>(dims()[0]), places.size());
295+
size_t remainder = dims()[0] % places.size();
296+
297+
std::vector<LoDTensor> results;
298+
results.reserve(result_size);
299+
300+
int step_width = static_cast<int>(dims()[0] / result_size);
301+
for (size_t i = 0; i < result_size; ++i) {
302+
int begin = static_cast<int>(i * step_width);
303+
int end = static_cast<int>((i + 1) * step_width);
304+
if (i + 1 == places.size()) { // last
305+
end += remainder;
306+
}
301307

302308
auto src = Slice(begin, end);
303-
auto &dst_place = places[place_idx];
309+
auto &dst_place = places[i];
304310
LoDTensor dst;
305-
framework::Copy(src, dst_place, &dst);
306-
307-
lods.emplace_back(dst);
311+
if (!(dst_place == place())) {
312+
framework::Copy(src, dst_place, &dst);
313+
} else { // It is no need to copy if src_place and dst_place are same.
314+
dst.ShareDataWith(src);
315+
}
316+
results.emplace_back(dst);
308317
}
309318

310-
return lods;
319+
return results;
311320
}
312321

313322
// TODO(tonyyang-svail): make this function support LoD
@@ -318,12 +327,17 @@ void LoDTensor::MergeLoDTensor(
318327
framework::DDim new_dim = lod_tensors[0]->dims();
319328
std::type_index new_type = lod_tensors[0]->type();
320329
auto new_layout = lod_tensors[0]->layout();
330+
int64_t new_height = 0;
321331
for (auto *lod : lod_tensors) {
322-
PADDLE_ENFORCE(new_dim == lod->dims());
323-
PADDLE_ENFORCE(new_type == lod->type());
324-
PADDLE_ENFORCE(new_layout == lod->layout());
332+
new_height += lod->dims()[0];
333+
for (int i = 1; i < new_dim.size(); ++i) {
334+
PADDLE_ENFORCE_EQ(new_dim[i], lod->dims()[i]);
335+
}
336+
337+
PADDLE_ENFORCE_EQ(new_type, lod->type());
338+
PADDLE_ENFORCE_EQ(new_layout, lod->layout());
325339
}
326-
new_dim[0] *= lod_tensors.size();
340+
new_dim[0] = new_height;
327341
Resize(new_dim);
328342
set_layout(new_layout);
329343

paddle/operators/parallel_do_op.cc

Lines changed: 71 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@ static constexpr char kParallelScopes[] = "parallel_scopes";
3030

3131
static constexpr char kParallelBlock[] = "sub_block";
3232

33-
// using ParallelScopeVar = std::vector<framework::Scope *>;
3433
using LoDTensor = framework::LoDTensor;
35-
using OperatorBase = framework::OperatorBase;
3634

37-
void SplitTensorAndMoveTensorToScopes(
38-
const framework::Scope &scope,
39-
const std::vector<framework::Scope *> &sub_scopes,
35+
static void SplitTensorAndMoveTensorToScopes(
36+
const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes,
4037
const std::vector<platform::Place> &places,
4138
const std::vector<std::string> &names) {
42-
PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size());
39+
size_t num_sub_scopes = 0;
4340
for (auto &argu : names) {
4441
auto *var = scope.FindVar(argu);
4542
const auto &tensor = var->Get<LoDTensor>();
@@ -48,9 +45,21 @@ void SplitTensorAndMoveTensorToScopes(
4845
for (auto &lod : lod_tensors) {
4946
VLOG(3) << lod.dims();
5047
}
48+
if (num_sub_scopes == 0) {
49+
num_sub_scopes = lod_tensors.size();
50+
} else {
51+
PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size());
52+
}
53+
PADDLE_ENFORCE_NE(num_sub_scopes, 0);
54+
if (sub_scopes->size() == 0) {
55+
sub_scopes->reserve(num_sub_scopes);
56+
for (size_t i = 0; i < num_sub_scopes; ++i) {
57+
sub_scopes->emplace_back(&scope.NewScope());
58+
}
59+
}
5160

52-
for (size_t i = 0; i < sub_scopes.size(); ++i) {
53-
*sub_scopes[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
61+
for (size_t i = 0; i < lod_tensors.size(); ++i) {
62+
*(*sub_scopes)[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
5463
}
5564
}
5665
}
@@ -70,7 +79,7 @@ class ParallelDoOp : public framework::OperatorBase {
7079
const framework::VariableNameMap &inputs,
7180
const framework::VariableNameMap &outputs,
7281
const framework::AttributeMap &attrs)
73-
: OperatorBase(type, inputs, outputs, attrs) {}
82+
: framework::OperatorBase(type, inputs, outputs, attrs) {}
7483

7584
void Run(const framework::Scope &scope,
7685
const platform::Place &place) const override {
@@ -85,19 +94,17 @@ class ParallelDoOp : public framework::OperatorBase {
8594

8695
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
8796
->GetMutable<std::vector<framework::Scope *>>();
88-
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
89-
sub_scopes.push_back(&scope.NewScope());
90-
}
9197

9298
// split input
93-
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
99+
SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places,
94100
Inputs(kInputs));
101+
95102
// copy parameter
96103
for (auto &param : Inputs(kParameters)) {
97104
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
98105
"Only support parameter type as LoDTensor");
99106
auto &src = scope.FindVar(param)->Get<LoDTensor>();
100-
for (size_t i = 0; i < places.size(); ++i) {
107+
for (size_t i = 0; i < sub_scopes.size(); ++i) {
101108
auto &place = places[i];
102109
auto *sub_scope = sub_scopes[i];
103110
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
@@ -108,9 +115,7 @@ class ParallelDoOp : public framework::OperatorBase {
108115

109116
std::vector<std::future<void>> workers;
110117
workers.reserve(places.size());
111-
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
112-
VLOG(3) << "Run " << place_idx;
113-
118+
for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) {
114119
auto &place = places[place_idx];
115120
auto *cur_scope = sub_scopes[place_idx];
116121

@@ -157,21 +162,16 @@ ParallelDo Operator.
157162
}
158163
};
159164

160-
class ParallelDoGradOp : public OperatorBase {
165+
class ParallelDoGradOp : public framework::OperatorBase {
161166
public:
162167
ParallelDoGradOp(const std::string &type,
163168
const framework::VariableNameMap &inputs,
164169
const framework::VariableNameMap &outputs,
165170
const framework::AttributeMap &attrs)
166-
: OperatorBase(type, inputs, outputs, attrs) {}
171+
: framework::OperatorBase(type, inputs, outputs, attrs) {}
167172

168173
void Run(const framework::Scope &scope,
169174
const platform::Place &place) const override {
170-
// // get device context from pool
171-
// platform::DeviceContextPool &pool =
172-
// platform::DeviceContextPool::Instance();
173-
// auto &dev_ctx = *pool.Get(place);
174-
175175
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
176176
auto *program = block->Program();
177177

@@ -181,26 +181,16 @@ class ParallelDoGradOp : public OperatorBase {
181181
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();
182182

183183
// feed output@grad
184-
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
185-
Inputs(framework::GradVarName(kOutputs)));
184+
SplitTensorAndMoveTensorToScopes(
185+
scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes),
186+
places, Inputs(framework::GradVarName(kOutputs)));
186187
WaitOnPlaces(places);
187188

188-
// for debugging
189-
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
190-
VLOG(3) << s;
191-
VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
192-
for (auto *sub_scope : sub_scopes) {
193-
VLOG(3) << sub_scope->FindVar(s)->Get<LoDTensor>();
194-
}
195-
}
196-
197189
// exe run
198190
std::vector<std::future<void>> workers;
199-
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
200-
VLOG(3) << "Run " << place_idx;
201-
202-
auto &place = places[place_idx];
203-
auto *cur_scope = sub_scopes[place_idx];
191+
for (size_t i = 0; i < sub_scopes.size(); ++i) {
192+
auto &place = places[i];
193+
auto *cur_scope = sub_scopes[i];
204194

205195
// execute
206196
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
@@ -216,33 +206,38 @@ class ParallelDoGradOp : public OperatorBase {
216206

217207
// merge grad
218208
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
219-
VLOG(3) << "merge grad " << s;
220-
221-
auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
222-
VLOG(3) << t;
223-
224-
std::string s_buf = s + "@BUF";
225-
auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable<LoDTensor>();
226-
227-
for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) {
228-
auto &tt = sub_scopes[place_idx]->FindVar(s)->Get<LoDTensor>();
229-
VLOG(3) << place_idx;
230-
VLOG(3) << tt;
231-
framework::Copy(tt, places[0], t_buf);
209+
auto &result = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
210+
std::string tmp_name;
211+
auto *tmp = sub_scopes[0]->Var(&tmp_name)->GetMutable<LoDTensor>();
212+
213+
for (size_t i = 1; i < sub_scopes.size(); ++i) {
214+
auto &tensor_to_merge = sub_scopes[i]->FindVar(s)->Get<LoDTensor>();
215+
if (!(places[i] == places[0])) {
216+
framework::Copy(tensor_to_merge, places[0], tmp);
217+
} else {
218+
tmp->ShareDataWith(tensor_to_merge);
219+
}
232220

233221
auto sum_op = framework::OpRegistry::CreateOp(
234-
"sum", {{"X", {s, s_buf}}}, {{"Out", {s}}},
222+
"sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}},
235223
framework::AttributeMap{});
236224
sum_op->Run(*sub_scopes[0], places[0]);
237225
WaitOnPlaces(places);
238226
}
239227

240-
VLOG(3) << t;
241-
framework::Copy(t, place, scope.FindVar(s)->GetMutable<LoDTensor>());
228+
VLOG(3) << result;
229+
framework::Copy(result, place, scope.FindVar(s)->GetMutable<LoDTensor>());
242230
}
243231
}
244232
};
245233

234+
std::ostream &operator<<(std::ostream &sout,
235+
const std::vector<std::string> &strs) {
236+
std::copy(strs.begin(), strs.end(),
237+
std::ostream_iterator<std::string>(sout, ","));
238+
return sout;
239+
}
240+
246241
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
247242
public:
248243
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
@@ -283,18 +278,30 @@ class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
283278
void operator()(framework::InferShapeContext *ctx) const override {
284279
std::vector<std::string> input{kParameters, kInputs};
285280
std::vector<std::string> output{kOutputs};
286-
for (auto &s : input) {
287-
PADDLE_ENFORCE(ctx->HasInputs(s));
288-
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
289-
"Cannot find the gradient variable %s",
290-
framework::GradVarName(s));
291-
}
281+
282+
PADDLE_ENFORCE(ctx->HasInputs(kParameters));
283+
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
284+
PADDLE_ENFORCE(ctx->HasInput(kInputs));
285+
292286
for (auto &s : output) {
293287
PADDLE_ENFORCE(ctx->HasInputs(s));
294288
}
295-
for (auto &s : input) {
296-
ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
289+
290+
ctx->SetOutputsDim(framework::GradVarName(kParameters),
291+
ctx->GetInputsDim(kParameters));
292+
293+
auto i_dims = ctx->GetInputsDim(kInputs);
294+
auto ig_names = ctx->Outputs(framework::GradVarName(kInputs));
295+
296+
for (size_t i = 0; i < ig_names.size(); ++i) {
297+
auto &ig_name = ig_names[i];
298+
if (ig_name == framework::kEmptyVarName) {
299+
continue;
300+
}
301+
302+
ctx->SetDims({ig_name}, {i_dims[i]});
297303
}
304+
298305
if (ctx->HasInputs(kParameters)) {
299306
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
300307
ctx->SetOutputsDim(framework::GradVarName(kParameters),

paddle/string/to_string.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,27 @@ limitations under the License. */
1515
#pragma once
1616
#include <sstream>
1717
#include <string>
18+
#include <typeindex>
1819

1920
namespace paddle {
2021
namespace string {
22+
inline std::ostream& operator<<(std::ostream& s, const std::type_index& t) {
23+
s << t.name();
24+
return s;
25+
}
26+
2127
template <typename T>
2228
inline std::string to_string(T v) {
2329
std::ostringstream sout;
2430
sout << v;
2531
return sout.str();
2632
}
2733

34+
template <>
35+
inline std::string to_string(std::type_index t) {
36+
return t.name();
37+
}
38+
2839
// Faster std::string/const char* type
2940
template <>
3041
inline std::string to_string(std::string v) {

python/paddle/v2/fluid/tests/test_parallel_op.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,24 +151,28 @@ def _impl_(a, b, fetch_id, item_id):
151151

152152

153153
class ParallelOpTest(BaseParallelForTest):
154-
def test_simple_fc(self):
155-
def __network__():
156-
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
157-
# FIXME: This is a bug of parallel.do
158-
x.stop_gradient = False
159-
x = yield x
160-
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
161-
loss = fluid.layers.mean(x=hidden)
162-
yield loss
154+
@staticmethod
155+
def __network__():
156+
x = fluid.layers.data(shape=[784], dtype='float32', name='img')
157+
x = yield x
158+
hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
159+
loss = fluid.layers.mean(x=hidden)
160+
yield loss
163161

162+
def test_simple_fc(self):
164163
self.run_test(
165-
callback=__network__,
164+
callback=ParallelOpTest.__network__,
166165
feed={
167-
'img':
168-
numpy.random.random(size=(128 * 3, 784)).astype('float32')
166+
'img': numpy.random.random(size=(51, 784)).astype('float32')
169167
},
170168
fetch='fc1.w@GRAD')
171169

170+
def test_fc_with_tiny_data(self):
171+
self.run_test(
172+
callback=ParallelOpTest.__network__,
173+
feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
174+
fetch='fc1.w@GRAD')
175+
172176

173177
if __name__ == '__main__':
174178
unittest.main()

0 commit comments

Comments
 (0)