Skip to content

Commit 1861562

Browse files
authored
Merge pull request #7715 from Yancey1989/split_selected_rows_to_multi_pserver
[WIP] Split SelectedRows to multiple pservers
2 parents 85671b8 + d0a9393 commit 1861562

File tree

5 files changed

+101
-63
lines changed

5 files changed

+101
-63
lines changed

paddle/operators/recv_op.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class RecvOp : public framework::OperatorBase {
103103

104104
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
105105
bool exit_flag = false;
106-
int64_t barrier_size = param_count * fan_in;
106+
size_t barrier_size = param_count * fan_in;
107107
while (!exit_flag) {
108108
// Get from multiple trainers, we don't care about the order in which
109109
// the gradients arrives, just add suffix 0~n and merge the gradient.

paddle/operators/split_selected_rows_op.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker {
2323
: OpProtoAndCheckerMaker(proto, op_checker) {
2424
AddInput("X", "The input SelectedRows.");
2525
AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable();
26-
AddAttr<std::vector<int>>("rows_sections", "Rows section for output.")
27-
.SetDefault(std::vector<int>({}));
2826
AddAttr<std::vector<int>>("height_sections",
2927
"Height for each output SelectedRows.")
3028
.SetDefault(std::vector<int>({}));
@@ -35,16 +33,16 @@ height_sections is only needed when need to split the dims of the original tenso
3533
3634
Example:
3735
Input:
38-
X.rows = {0, 7, 5}
36+
X.rows = {7, 5}
3937
X.height = 12
4038
Attr:
41-
rows_sections = {1, 2}
42-
height_sections = {}
39+
height_sections = {4, 8}
4340
Out:
44-
out0.rows = {0}
45-
out0.height = 12
46-
out1.rows = {7, 5}
47-
out2.height = 12
41+
out0.rows = {}
42+
out0.height = 4
43+
44+
out1.rows = {5, 7}
45+
out2.height = 8
4846
4947
)DOC");
5048
}
@@ -61,11 +59,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel {
6159

6260
std::vector<int> height_sections =
6361
ctx->Attrs().Get<std::vector<int>>("height_sections");
64-
std::vector<int> rows_sections =
65-
ctx->Attrs().Get<std::vector<int>>("rows_sections");
66-
PADDLE_ENFORCE_EQ(
67-
rows_sections.size(), ctx->Outputs("Out").size(),
68-
"The size of rows section should be the same with Outputs size.");
6962
int64_t n = ctx->Outputs("Out").size();
7063

7164
std::vector<framework::DDim> outs_dims;

paddle/operators/split_selected_rows_op.h

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,70 @@ limitations under the License. */
1616

1717
#include <vector>
1818
#include "paddle/framework/op_registry.h"
19+
#include "paddle/operators/math/selected_rows_functor.h"
1920

2021
namespace paddle {
2122
namespace operators {
2223

24+
static int FindOutIdx(int row, const std::vector<int>& height_sections) {
25+
int offset = 0;
26+
for (size_t i = 0; i < height_sections.size(); ++i) {
27+
if (row >= offset && row < (offset + height_sections[i])) {
28+
return i;
29+
}
30+
offset += height_sections[i];
31+
}
32+
return -1;
33+
}
34+
2335
template <typename DeviceContext, typename T>
2436
class SplitSelectedRowsOpKernel : public framework::OpKernel<T> {
2537
public:
2638
void Compute(const framework::ExecutionContext& ctx) const override {
2739
auto* x = ctx.Input<framework::SelectedRows>("X");
2840
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");
29-
30-
auto rows_sections = ctx.Attr<std::vector<int>>("rows_sections");
3141
auto height_sections = ctx.Attr<std::vector<int>>("height_sections");
3242

33-
int64_t n = outs.size();
34-
int offset = 0;
43+
auto x_rows = x->rows();
44+
std::vector<std::vector<int>> outs_rows_idx;
45+
outs_rows_idx.resize(outs.size());
3546

36-
for (int64_t i = 0; i < n; ++i) {
37-
framework::Vector<int64_t> out_rows;
38-
for (int64_t j = 0; j < rows_sections[i]; ++j) {
39-
out_rows.push_back(x->rows()[offset + j]);
40-
}
47+
auto row_numel = x->value().numel() / x->value().dims()[0];
48+
auto src = x->value().data<T>();
49+
50+
for (size_t i = 0; i < x_rows.size(); ++i) {
51+
int out_idx = FindOutIdx(x_rows[i], height_sections);
52+
outs_rows_idx[out_idx].push_back(i);
53+
}
54+
auto place = ctx.GetPlace();
4155

42-
auto& out = outs[i];
43-
auto x_dims = x->GetCompleteDims();
44-
x_dims[0] = rows_sections[i];
45-
out->mutable_value()->mutable_data<T>(x_dims, ctx.GetPlace());
46-
framework::Copy(x->value().Slice(offset, rows_sections[i] + offset),
47-
x->place(), ctx.device_context(), out->mutable_value());
48-
outs[i]->set_rows(out_rows);
49-
if (height_sections.size()) {
50-
outs[i]->set_height(height_sections[i]);
56+
for (size_t i = 0; i < outs_rows_idx.size(); ++i) {
57+
auto rows_idx = outs_rows_idx[i];
58+
if (rows_idx.size() > 0) {
59+
auto dims = x->GetCompleteDims();
60+
dims[0] = rows_idx.size();
61+
outs[i]->mutable_value()->mutable_data<T>(dims, x->place());
62+
for (auto idx : rows_idx) {
63+
outs[i]->mutable_rows()->push_back(x_rows[idx]);
64+
}
65+
auto dst = outs[i]->mutable_value()->mutable_data<T>(ctx.GetPlace());
66+
for (size_t j = 0; j < rows_idx.size(); j++) {
67+
if (platform::is_cpu_place(place)) {
68+
memory::Copy(platform::CPUPlace(), dst + j * row_numel,
69+
platform::CPUPlace(), src + rows_idx[j] * row_numel,
70+
sizeof(T) * row_numel);
71+
} else {
72+
#ifdef PADDLE_WITH_CUDA
73+
auto stream = ctx.cuda_device_context().stream();
74+
memory::Copy(platform::CUDAPlace(), dst + j * row_numel,
75+
platform::CUDAPlace(), src + rows_idx[j] * row_numel,
76+
sizeof(T) * row_numel, stream);
77+
#else
78+
PADDLE_THROW("Paddle is not compiled with GPU");
79+
#endif
80+
}
81+
}
5182
}
52-
offset += rows_sections[i];
5383
}
5484
}
5585
};

python/paddle/v2/fluid/distribute_transpiler.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from layer_helper import LayerHelper
2020
from distributed_spliter import *
2121
import math
22+
from . import core
2223

2324

2425
class VarBlock:
@@ -217,15 +218,28 @@ def _append_split_op(self, program, gradblocks):
217218
if len(splited_vars) <= 1:
218219
continue
219220
orig_var = program.global_block().vars[varname]
220-
sections = []
221-
for v in splited_vars:
222-
sections.append(v.shape[0])
223-
program.global_block().append_op(
224-
type="split",
225-
inputs={"X": orig_var},
226-
outputs={"Out": splited_vars},
227-
attrs={"sections": sections} # assume split evenly
228-
)
221+
if orig_var == core.VarDesc.VarType.SELECTED_ROWS:
222+
height_sections = []
223+
for v in splited_vars:
224+
height_sections.append(v.shape[0])
225+
program.global_block().append_op(
226+
type="split_selected_rows",
227+
inputs={"X": orig_var},
228+
outputs={"Out": splited_vars},
229+
attrs={"height_sections": height_sections})
230+
elif orig_var == core.VarDesc.VarType.LOD_TENSOR:
231+
sections = []
232+
for v in splited_vars:
233+
sections.append(v.shape[0])
234+
program.global_block().append_op(
235+
type="split",
236+
inputs={"X": orig_var},
237+
outputs={"Out": splited_vars},
238+
attrs={"sections": sections} # assume split evenly
239+
)
240+
else:
241+
AssertionError("Variable type should be in set "
242+
"[LOD_TENSOR, SELECTED_ROWS]")
229243
return var_mapping
230244

231245
def get_trainer_program(self):

python/paddle/v2/fluid/tests/test_split_selected_rows_op.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ def test_check_grad(self):
3535

3636
def check_with_place(self, place):
3737
scope = core.Scope()
38-
rows = [0, 5, 7, 4]
39-
height = 10
38+
rows = [0, 5, 7, 4, 20]
39+
height = 20
4040
row_numel = 2
4141

4242
# initialize input variable X
@@ -46,47 +46,49 @@ def check_with_place(self, place):
4646
np_array = np.ones((len(rows), row_numel)).astype("float32")
4747
np_array[0, 0] = 2.0
4848
np_array[2, 1] = 4.0
49+
np_array[4, 1] = 8.0
4950
x_tensor = x.get_tensor()
5051
x_tensor.set(np_array, place)
5152

52-
rows_sections = [2, 2]
53-
height_sections = []
53+
height_sections = [5, 5, 5, 5, 3]
5454

5555
# initialize output variables [out0, out1]
56-
out0 = scope.var('out0').get_selected_rows()
57-
out1 = scope.var('out1').get_selected_rows()
56+
outs_name = ["out%d" % i for i in xrange(len(height_sections))]
57+
outs = [
58+
scope.var(var_name).get_selected_rows() for var_name in outs_name
59+
]
5860

5961
# expected output selected rows
60-
expected_out0_rows = [0, 5]
61-
expected_out1_rows = [7, 4]
62-
expected_height = height
62+
expected_out0_rows = [0, 4]
63+
expected_out1_rows = [5, 7]
64+
expected_out4_rows = [20]
6365

6466
op = Operator(
6567
"split_selected_rows",
6668
X="X",
67-
Out=["out0", "out1"],
68-
rows_sections=rows_sections,
69+
Out=outs_name,
6970
height_sections=height_sections)
7071

7172
op.run(scope, place)
7273

73-
self.assertEqual(out0.rows(), expected_out0_rows)
74-
self.assertEqual(out1.rows(), expected_out1_rows)
74+
self.assertEqual(outs[0].rows(), expected_out0_rows)
75+
self.assertEqual(outs[1].rows(), expected_out1_rows)
76+
self.assertEqual(outs[4].rows(), expected_out4_rows)
7577

76-
self.assertEqual(out0.height(), expected_height)
77-
self.assertEqual(out1.height(), expected_height)
78+
self.assertEqual(outs[0].height(), height_sections[0])
79+
self.assertEqual(outs[4].height(), height_sections[4])
7880

79-
self.assertAlmostEqual(2.0, np.array(out0.get_tensor())[0, 0])
80-
self.assertAlmostEqual(4.0, np.array(out1.get_tensor())[0, 1])
81+
self.assertAlmostEqual(2.0, np.array(outs[0].get_tensor())[0, 0])
82+
self.assertAlmostEqual(4.0, np.array(outs[1].get_tensor())[1, 1])
83+
self.assertAlmostEqual(8.0, np.array(outs[4].get_tensor())[0, 1])
8184

8285
def check_grad_with_place(self, place):
8386
scope = core.Scope()
8487
height = 10
8588
row_numel = 2
8689

8790
# attr
88-
rows_sections = [2, 2]
89-
height_sections = []
91+
height_sections = [5, 5]
9092

9193
# initialize input variable X
9294
out0_grad = scope.var("out0@GRAD").get_selected_rows()
@@ -113,7 +115,6 @@ def check_grad_with_place(self, place):
113115
"sum",
114116
X=["out0@GRAD", "out1@GRAD"],
115117
Out="X@GRAD",
116-
rows_sections=rows_sections,
117118
height_sections=height_sections)
118119

119120
grad_op.run(scope, place)

0 commit comments

Comments
 (0)