Skip to content

Commit d0a9393

Browse files
committed
Split SelectedRows to multiple pservers
1 parent 9fea1d4 commit d0a9393

File tree

5 files changed

+101
-63
lines changed

5 files changed

+101
-63
lines changed

paddle/operators/recv_op.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class RecvOp : public framework::OperatorBase {
107107

108108
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
109109
bool exit_flag = false;
110-
int64_t barrier_size = param_count * fan_in;
110+
size_t barrier_size = param_count * fan_in;
111111
while (!exit_flag) {
112112
// Get from multiple trainers, we don't care about the order in which
113113
// the gradients arrives, just add suffix 0~n and merge the gradient.

paddle/operators/split_selected_rows_op.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker {
2323
: OpProtoAndCheckerMaker(proto, op_checker) {
2424
AddInput("X", "The input SelectedRows.");
2525
AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable();
26-
AddAttr<std::vector<int>>("rows_sections", "Rows section for output.")
27-
.SetDefault(std::vector<int>({}));
2826
AddAttr<std::vector<int>>("height_sections",
2927
"Height for each output SelectedRows.")
3028
.SetDefault(std::vector<int>({}));
@@ -35,16 +33,16 @@ height_sections is only needed when need to split the dims of the original tenso
3533
3634
Example:
3735
Input:
38-
X.rows = {0, 7, 5}
36+
X.rows = {7, 5}
3937
X.height = 12
4038
Attr:
41-
rows_sections = {1, 2}
42-
height_sections = {}
39+
height_sections = {4, 8}
4340
Out:
44-
out0.rows = {0}
45-
out0.height = 12
46-
out1.rows = {7, 5}
47-
out2.height = 12
41+
out0.rows = {}
42+
out0.height = 4
43+
44+
out1.rows = {5, 7}
45+
out2.height = 8
4846
4947
)DOC");
5048
}
@@ -61,11 +59,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel {
6159

6260
std::vector<int> height_sections =
6361
ctx->Attrs().Get<std::vector<int>>("height_sections");
64-
std::vector<int> rows_sections =
65-
ctx->Attrs().Get<std::vector<int>>("rows_sections");
66-
PADDLE_ENFORCE_EQ(
67-
rows_sections.size(), ctx->Outputs("Out").size(),
68-
"The size of rows section should be the same with Outputs size.");
6962
int64_t n = ctx->Outputs("Out").size();
7063

7164
std::vector<framework::DDim> outs_dims;

paddle/operators/split_selected_rows_op.h

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,70 @@ limitations under the License. */
1616

1717
#include <vector>
1818
#include "paddle/framework/op_registry.h"
19+
#include "paddle/operators/math/selected_rows_functor.h"
1920

2021
namespace paddle {
2122
namespace operators {
2223

24+
static int FindOutIdx(int row, const std::vector<int>& height_sections) {
25+
int offset = 0;
26+
for (size_t i = 0; i < height_sections.size(); ++i) {
27+
if (row >= offset && row < (offset + height_sections[i])) {
28+
return i;
29+
}
30+
offset += height_sections[i];
31+
}
32+
return -1;
33+
}
34+
2335
template <typename DeviceContext, typename T>
2436
class SplitSelectedRowsOpKernel : public framework::OpKernel<T> {
2537
public:
2638
void Compute(const framework::ExecutionContext& ctx) const override {
2739
auto* x = ctx.Input<framework::SelectedRows>("X");
2840
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");
29-
30-
auto rows_sections = ctx.Attr<std::vector<int>>("rows_sections");
3141
auto height_sections = ctx.Attr<std::vector<int>>("height_sections");
3242

33-
int64_t n = outs.size();
34-
int offset = 0;
43+
auto x_rows = x->rows();
44+
std::vector<std::vector<int>> outs_rows_idx;
45+
outs_rows_idx.resize(outs.size());
3546

36-
for (int64_t i = 0; i < n; ++i) {
37-
framework::Vector<int64_t> out_rows;
38-
for (int64_t j = 0; j < rows_sections[i]; ++j) {
39-
out_rows.push_back(x->rows()[offset + j]);
40-
}
47+
auto row_numel = x->value().numel() / x->value().dims()[0];
48+
auto src = x->value().data<T>();
49+
50+
for (size_t i = 0; i < x_rows.size(); ++i) {
51+
int out_idx = FindOutIdx(x_rows[i], height_sections);
52+
outs_rows_idx[out_idx].push_back(i);
53+
}
54+
auto place = ctx.GetPlace();
4155

42-
auto& out = outs[i];
43-
auto x_dims = x->GetCompleteDims();
44-
x_dims[0] = rows_sections[i];
45-
out->mutable_value()->mutable_data<T>(x_dims, ctx.GetPlace());
46-
framework::Copy(x->value().Slice(offset, rows_sections[i] + offset),
47-
x->place(), ctx.device_context(), out->mutable_value());
48-
outs[i]->set_rows(out_rows);
49-
if (height_sections.size()) {
50-
outs[i]->set_height(height_sections[i]);
56+
for (size_t i = 0; i < outs_rows_idx.size(); ++i) {
57+
auto rows_idx = outs_rows_idx[i];
58+
if (rows_idx.size() > 0) {
59+
auto dims = x->GetCompleteDims();
60+
dims[0] = rows_idx.size();
61+
outs[i]->mutable_value()->mutable_data<T>(dims, x->place());
62+
for (auto idx : rows_idx) {
63+
outs[i]->mutable_rows()->push_back(x_rows[idx]);
64+
}
65+
auto dst = outs[i]->mutable_value()->mutable_data<T>(ctx.GetPlace());
66+
for (size_t j = 0; j < rows_idx.size(); j++) {
67+
if (platform::is_cpu_place(place)) {
68+
memory::Copy(platform::CPUPlace(), dst + j * row_numel,
69+
platform::CPUPlace(), src + rows_idx[j] * row_numel,
70+
sizeof(T) * row_numel);
71+
} else {
72+
#ifdef PADDLE_WITH_CUDA
73+
auto stream = ctx.cuda_device_context().stream();
74+
memory::Copy(platform::CUDAPlace(), dst + j * row_numel,
75+
platform::CUDAPlace(), src + rows_idx[j] * row_numel,
76+
sizeof(T) * row_numel, stream);
77+
#else
78+
PADDLE_THROW("Paddle is not compiled with GPU");
79+
#endif
80+
}
81+
}
5182
}
52-
offset += rows_sections[i];
5383
}
5484
}
5585
};

python/paddle/v2/fluid/distribute_transpiler.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from layer_helper import LayerHelper
1919
from distributed_spliter import *
2020
import math
21+
from . import core
2122

2223

2324
class VarBlock:
@@ -216,15 +217,28 @@ def _append_split_op(self, program, gradblocks):
216217
if len(splited_vars) <= 1:
217218
continue
218219
orig_var = program.global_block().vars[varname]
219-
sections = []
220-
for v in splited_vars:
221-
sections.append(v.shape[0])
222-
program.global_block().append_op(
223-
type="split",
224-
inputs={"X": orig_var},
225-
outputs={"Out": splited_vars},
226-
attrs={"sections": sections} # assume split evenly
227-
)
220+
if orig_var == core.VarDesc.VarType.SELECTED_ROWS:
221+
height_sections = []
222+
for v in splited_vars:
223+
height_sections.append(v.shape[0])
224+
program.global_block().append_op(
225+
type="split_selected_rows",
226+
inputs={"X": orig_var},
227+
outputs={"Out": splited_vars},
228+
attrs={"height_sections": height_sections})
229+
elif orig_var == core.VarDesc.VarType.LOD_TENSOR:
230+
sections = []
231+
for v in splited_vars:
232+
sections.append(v.shape[0])
233+
program.global_block().append_op(
234+
type="split",
235+
inputs={"X": orig_var},
236+
outputs={"Out": splited_vars},
237+
attrs={"sections": sections} # assume split evenly
238+
)
239+
else:
240+
AssertionError("Variable type should be in set "
241+
"[LOD_TENSOR, SELECTED_ROWS]")
228242
return var_mapping
229243

230244
def get_trainer_program(self):

python/paddle/v2/fluid/tests/test_split_selected_rows_op.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ def test_check_grad(self):
3434

3535
def check_with_place(self, place):
3636
scope = core.Scope()
37-
rows = [0, 5, 7, 4]
38-
height = 10
37+
rows = [0, 5, 7, 4, 20]
38+
height = 20
3939
row_numel = 2
4040

4141
# initialize input variable X
@@ -45,47 +45,49 @@ def check_with_place(self, place):
4545
np_array = np.ones((len(rows), row_numel)).astype("float32")
4646
np_array[0, 0] = 2.0
4747
np_array[2, 1] = 4.0
48+
np_array[4, 1] = 8.0
4849
x_tensor = x.get_tensor()
4950
x_tensor.set(np_array, place)
5051

51-
rows_sections = [2, 2]
52-
height_sections = []
52+
height_sections = [5, 5, 5, 5, 3]
5353

5454
# initialize output variables [out0, out1]
55-
out0 = scope.var('out0').get_selected_rows()
56-
out1 = scope.var('out1').get_selected_rows()
55+
outs_name = ["out%d" % i for i in xrange(len(height_sections))]
56+
outs = [
57+
scope.var(var_name).get_selected_rows() for var_name in outs_name
58+
]
5759

5860
# expected output selected rows
59-
expected_out0_rows = [0, 5]
60-
expected_out1_rows = [7, 4]
61-
expected_height = height
61+
expected_out0_rows = [0, 4]
62+
expected_out1_rows = [5, 7]
63+
expected_out4_rows = [20]
6264

6365
op = Operator(
6466
"split_selected_rows",
6567
X="X",
66-
Out=["out0", "out1"],
67-
rows_sections=rows_sections,
68+
Out=outs_name,
6869
height_sections=height_sections)
6970

7071
op.run(scope, place)
7172

72-
self.assertEqual(out0.rows(), expected_out0_rows)
73-
self.assertEqual(out1.rows(), expected_out1_rows)
73+
self.assertEqual(outs[0].rows(), expected_out0_rows)
74+
self.assertEqual(outs[1].rows(), expected_out1_rows)
75+
self.assertEqual(outs[4].rows(), expected_out4_rows)
7476

75-
self.assertEqual(out0.height(), expected_height)
76-
self.assertEqual(out1.height(), expected_height)
77+
self.assertEqual(outs[0].height(), height_sections[0])
78+
self.assertEqual(outs[4].height(), height_sections[4])
7779

78-
self.assertAlmostEqual(2.0, np.array(out0.get_tensor())[0, 0])
79-
self.assertAlmostEqual(4.0, np.array(out1.get_tensor())[0, 1])
80+
self.assertAlmostEqual(2.0, np.array(outs[0].get_tensor())[0, 0])
81+
self.assertAlmostEqual(4.0, np.array(outs[1].get_tensor())[1, 1])
82+
self.assertAlmostEqual(8.0, np.array(outs[4].get_tensor())[0, 1])
8083

8184
def check_grad_with_place(self, place):
8285
scope = core.Scope()
8386
height = 10
8487
row_numel = 2
8588

8689
# attr
87-
rows_sections = [2, 2]
88-
height_sections = []
90+
height_sections = [5, 5]
8991

9092
# initialize input variable X
9193
out0_grad = scope.var("out0@GRAD").get_selected_rows()
@@ -112,7 +114,6 @@ def check_grad_with_place(self, place):
112114
"sum",
113115
X=["out0@GRAD", "out1@GRAD"],
114116
Out="X@GRAD",
115-
rows_sections=rows_sections,
116117
height_sections=height_sections)
117118

118119
grad_op.run(scope, place)

0 commit comments

Comments
 (0)