Skip to content

Commit 894236a

Browse files
authored
Merge pull request #6730 from tonyyang-svail/parallel_do
[WIP]: feature/parallel_do
2 parents d06bbb1 + 0156066 commit 894236a

File tree

11 files changed

+568
-23
lines changed

11 files changed

+568
-23
lines changed

paddle/framework/backward.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,8 @@ std::vector<std::unique_ptr<OpDesc>> MakeBlockBackward(
427427
VLOG(5) << "Making backward " << (*it)->Type() << " op";
428428
std::vector<std::unique_ptr<OpDesc>> op_grads;
429429

430-
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while") {
430+
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while" ||
431+
(*it)->Type() == "parallel_do") {
431432
int step_block_idx = (*it)->GetBlockAttr("sub_block");
432433
BlockDesc* backward_block = CreateStepBlock(program_desc, no_grad_vars,
433434
grad_to_var, step_block_idx);

paddle/framework/lod_tensor.cc

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) {
4343
return os;
4444
}
4545

46+
std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
47+
PADDLE_ENFORCE(platform::is_cpu_place(t.place()));
48+
PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code());
49+
50+
os << "dim: " << t.dims() << "\n";
51+
os << "lod: " << t.lod() << "\n";
52+
53+
// only print first ten elements
54+
int64_t size = t.numel() < 10 ? t.numel() : 10;
55+
for (int64_t i = 0; i < size; ++i) {
56+
os << t.data<float>()[i] << " ";
57+
}
58+
59+
return os;
60+
}
61+
4662
LoD SliceLevels(const LoD &in, size_t level_begin, size_t level_end) {
4763
LoD new_lod;
4864
new_lod.reserve(level_end - level_begin);
@@ -244,5 +260,69 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
244260
DeserializeFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
245261
}
246262

263+
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
264+
const std::vector<platform::Place> places) const {
265+
check_memory_size();
266+
// PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty())
267+
// , "Disable parallel lod for now");
268+
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
269+
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
270+
"Batch size should be divided by places size");
271+
272+
std::vector<LoDTensor> lods;
273+
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
274+
size_t begin = place_idx * dims()[0] / places.size();
275+
size_t end = (place_idx + 1) * dims()[0] / places.size();
276+
auto src = Slice(static_cast<int>(begin), static_cast<int>(end));
277+
278+
LoDTensor dst;
279+
dst.Resize(src.dims());
280+
auto &dst_place = places[place_idx];
281+
auto dst_ptr = dst.mutable_data(dst_place, src.type());
282+
283+
// TODO(tonyyang-svail):
284+
// change the following to framework::CopyFrom
285+
auto src_place = src.place();
286+
auto src_ptr = src.data<void>();
287+
auto size = src.numel() * SizeOfType(src.type());
288+
if (platform::is_cpu_place(src_place) &&
289+
platform::is_cpu_place(dst_place)) {
290+
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
291+
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
292+
} else {
293+
PADDLE_THROW("Not Implemented");
294+
}
295+
296+
lods.emplace_back(dst);
297+
}
298+
299+
return lods;
300+
}
301+
302+
void LoDTensor::MergeLoDTensor(
303+
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
304+
PADDLE_ENFORCE(platform::is_cpu_place(place));
305+
PADDLE_ENFORCE(!lod_tensors.empty());
306+
307+
framework::DDim new_dim = lod_tensors[0]->dims();
308+
std::type_index new_type = lod_tensors[0]->type();
309+
for (auto *lod : lod_tensors) {
310+
PADDLE_ENFORCE(new_dim == lod->dims());
311+
PADDLE_ENFORCE(new_type == lod->type());
312+
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
313+
}
314+
new_dim[0] *= lod_tensors.size();
315+
Resize(new_dim);
316+
317+
auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
318+
for (auto *src : lod_tensors) {
319+
auto size = src->numel() * SizeOfType(src->type());
320+
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
321+
boost::get<platform::CPUPlace>(src->place()),
322+
src->data<void>(), size);
323+
dst_ptr += size;
324+
}
325+
}
326+
247327
} // namespace framework
248328
} // namespace paddle

paddle/framework/lod_tensor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ using Vector = thrust::host_vector<
5858
using LoD = std::vector<Vector<size_t>>;
5959

6060
std::ostream& operator<<(std::ostream& os, const LoD& lod);
61+
std::ostream& operator<<(std::ostream& os, const LoDTensor& t);
6162

6263
/*
6364
* Slice levels from a LoD.
@@ -144,6 +145,12 @@ class LoDTensor : public Tensor {
144145
*/
145146
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);
146147

148+
std::vector<LoDTensor> SplitLoDTensor(
149+
const std::vector<platform::Place> places) const;
150+
151+
void MergeLoDTensor(const std::vector<const LoDTensor*>& lod_tensors,
152+
platform::Place place);
153+
147154
private:
148155
LoD lod_;
149156
};

paddle/framework/operator.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ static const Tensor* GetTensorFromVar(const Variable* var) {
233233
} else if (var->IsType<SelectedRows>()) {
234234
t = &(var->Get<SelectedRows>().value());
235235
} else {
236-
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
236+
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
237+
var->Type().name());
237238
}
238239
return t;
239240
}
@@ -245,7 +246,8 @@ static Tensor* GetMutableTensorFromVar(Variable* var) {
245246
} else if (var->IsType<SelectedRows>()) {
246247
t = var->GetMutable<SelectedRows>()->mutable_value();
247248
} else {
248-
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
249+
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
250+
var->Type().name());
249251
}
250252
return t;
251253
}
@@ -407,7 +409,8 @@ class RuntimeInferShapeContext : public InferShapeContext {
407409
} else if (var->IsType<SelectedRows>()) {
408410
return var->Get<SelectedRows>().GetCompleteDims();
409411
} else {
410-
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
412+
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
413+
name, var->Type().name());
411414
}
412415
}
413416

@@ -418,7 +421,8 @@ class RuntimeInferShapeContext : public InferShapeContext {
418421
} else if (var->IsType<SelectedRows>()) {
419422
var->GetMutable<SelectedRows>()->set_height(dim[0]);
420423
} else {
421-
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
424+
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
425+
name, var->Type().name());
422426
}
423427
}
424428

paddle/framework/tensor.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class Tensor {
5555
template <typename T>
5656
inline const T* data() const;
5757

58+
inline void switch_place(platform::Place new_place);
59+
5860
/**
5961
* @brief Return a pointer to mutable memory block.
6062
* @note If not exist, then allocation.
@@ -200,6 +202,15 @@ class Tensor {
200202
size_t offset_;
201203
};
202204

205+
inline void Tensor::switch_place(platform::Place new_place) {
206+
if (holder_->place() == new_place) {
207+
return;
208+
}
209+
210+
// TODO(tonyyang-svail): do memcpy here.
211+
PADDLE_THROW("Not Implemented");
212+
}
213+
203214
} // namespace framework
204215
} // namespace paddle
205216

paddle/operators/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ op_library(conv_transpose_op DEPS vol2col)
152152
op_library(gru_op DEPS sequence2batch gru_compute)
153153
op_library(recurrent_op DEPS executor)
154154
op_library(cos_sim_op DEPS cos_sim_functor)
155+
op_library(parallel_do_op DEPS executor)
155156
# FIXME(typhoonzero): save/load depends lodtensor serialization functions
156157
op_library(save_op DEPS lod_tensor)
157158
op_library(load_op DEPS lod_tensor)

0 commit comments

Comments
 (0)