Skip to content

Commit e4cfe47

Browse files
authored
Merge pull request #9898 from reyoung/feature/mix_cpu_gpu_op
Feature/mix cpu gpu op
2 parents 9bc0c23 + ed2d7d7 commit e4cfe47

File tree

5 files changed

+247
-29
lines changed

5 files changed

+247
-29
lines changed

paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
1616

17+
#include <algorithm>
18+
1719
namespace paddle {
1820
namespace framework {
1921
namespace details {
@@ -27,6 +29,32 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle(
2729
}
2830
}
2931

32+
struct ReduceLoDTensor {
33+
const std::vector<LoDTensor> &src_tensors_;
34+
LoDTensor &dst_tensor_;
35+
36+
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
37+
: src_tensors_(src), dst_tensor_(*dst) {}
38+
39+
template <typename T>
40+
void operator()() const {
41+
PADDLE_ENFORCE(!src_tensors_.empty());
42+
auto &t0 = src_tensors_[0];
43+
PADDLE_ENFORCE_NE(t0.numel(), 0);
44+
dst_tensor_.Resize(t0.dims());
45+
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
46+
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
47+
48+
for (size_t i = 1; i < src_tensors_.size(); ++i) {
49+
auto &t = src_tensors_[i];
50+
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
51+
PADDLE_ENFORCE_EQ(t.type(), t0.type());
52+
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
53+
[](T a, T b) -> T { return a + b; });
54+
}
55+
}
56+
};
57+
3058
void NCCLAllReduceOpHandle::RunImpl() {
3159
if (inputs_.size() == 1) {
3260
return; // No need to all reduce when GPU count = 1;
@@ -41,40 +69,67 @@ void NCCLAllReduceOpHandle::RunImpl() {
4169
int dtype = -1;
4270
size_t numel = 0;
4371

44-
std::vector<std::function<void()>> all_reduce_calls;
72+
std::vector<LoDTensor> lod_tensors;
4573

4674
for (size_t i = 0; i < local_scopes_.size(); ++i) {
47-
auto &p = places_[i];
4875
auto *s = local_scopes_[i];
49-
int dev_id = boost::get<platform::CUDAPlace>(p).device;
5076

5177
auto &lod_tensor = s->FindVar(var_name)->Get<LoDTensor>();
52-
void *buffer = const_cast<void *>(lod_tensor.data<void>());
78+
lod_tensors.emplace_back(lod_tensor);
79+
}
5380

54-
if (dtype == -1) {
55-
dtype = platform::ToNCCLDataType(lod_tensor.type());
56-
}
81+
if (platform::is_gpu_place(lod_tensors[0].place())) {
82+
std::vector<std::function<void()>> all_reduce_calls;
83+
for (size_t i = 0; i < local_scopes_.size(); ++i) {
84+
auto &p = places_[i];
85+
auto &lod_tensor = lod_tensors[i];
86+
void *buffer = const_cast<void *>(lod_tensor.data<void>());
5787

58-
if (numel == 0) {
59-
numel = static_cast<size_t>(lod_tensor.numel());
60-
}
88+
if (dtype == -1) {
89+
dtype = platform::ToNCCLDataType(lod_tensor.type());
90+
}
6191

62-
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
63-
auto stream = nccl_ctx.stream();
64-
auto comm = nccl_ctx.comm_;
65-
all_reduce_calls.emplace_back([=] {
66-
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
67-
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype), ncclSum,
68-
comm, stream));
92+
if (numel == 0) {
93+
numel = static_cast<size_t>(lod_tensor.numel());
94+
}
95+
96+
int dev_id = boost::get<platform::CUDAPlace>(p).device;
97+
auto &nccl_ctx = nccl_ctxs_.at(dev_id);
98+
auto stream = nccl_ctx.stream();
99+
auto comm = nccl_ctx.comm_;
100+
all_reduce_calls.emplace_back([=] {
101+
PADDLE_ENFORCE(platform::dynload::ncclAllReduce(
102+
buffer, buffer, numel, static_cast<ncclDataType_t>(dtype),
103+
ncclSum, comm, stream));
104+
});
105+
}
106+
this->RunAndRecordEvent([&] {
107+
platform::NCCLGroupGuard guard;
108+
for (auto &call : all_reduce_calls) {
109+
call();
110+
}
69111
});
70-
}
112+
} else { // Special handle CPU only Operator's gradient. Like CRF
113+
auto &trg =
114+
*this->local_scopes_[0]->Var()->GetMutable<framework::LoDTensor>();
115+
116+
// Reduce All Tensor to trg in CPU
117+
ReduceLoDTensor func(lod_tensors, &trg);
118+
VisitDataType(ToDataType(lod_tensors[0].type()), func);
71119

72-
this->RunAndRecordEvent([&] {
73-
platform::NCCLGroupGuard guard;
74-
for (auto &call : all_reduce_calls) {
75-
call();
120+
for (size_t i = 0; i < local_scopes_.size(); ++i) {
121+
auto &scope = local_scopes_[i];
122+
auto &p = places_[i];
123+
auto *var = scope->FindVar(var_name);
124+
auto *dev_ctx = dev_ctxes_[p];
125+
126+
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
127+
auto &tensor_gpu = *var->GetMutable<framework::LoDTensor>();
128+
auto &tensor_cpu = trg;
129+
TensorCopy(tensor_cpu, p, *dev_ctx, &tensor_gpu);
130+
});
76131
}
77-
});
132+
}
78133
}
79134
}
80135

paddle/fluid/framework/details/op_handle_base.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,22 @@ void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
107107
#endif
108108
}
109109

110+
void OpHandleBase::RunAndRecordEvent(platform::Place p,
111+
const std::function<void()> &callback) {
112+
#ifdef PADDLE_WITH_CUDA
113+
if (platform::is_cpu_place(p) || events_.empty()) {
114+
callback();
115+
} else {
116+
auto *ctx = dev_ctxes_.at(p);
117+
auto *cuda_ctx = static_cast<platform::CUDADeviceContext *>(ctx);
118+
cuda_ctx->RecordEvent(events_.at(boost::get<platform::CUDAPlace>(p).device),
119+
callback);
120+
}
121+
#else
122+
callback();
123+
#endif
124+
}
125+
110126
} // namespace details
111127
} // namespace framework
112128
} // namespace paddle

paddle/fluid/framework/details/op_handle_base.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class OpHandleBase {
6464
protected:
6565
void RunAndRecordEvent(const std::function<void()> &callback);
6666

67+
void RunAndRecordEvent(platform::Place p,
68+
const std::function<void()> &callback);
69+
6770
virtual void RunImpl() = 0;
6871
};
6972

python/paddle/fluid/tests/book/test_label_semantic_roles.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,16 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import contextlib
1516
import math
16-
1717
import numpy as np
18+
import os
19+
import time
20+
import unittest
21+
1822
import paddle
1923
import paddle.dataset.conll05 as conll05
2024
import paddle.fluid as fluid
21-
from paddle.fluid.initializer import init_on_cpu
22-
import contextlib
23-
import time
24-
import unittest
25-
import os
2625

2726
word_dict, verb_dict, label_dict = conll05.get_dict()
2827
word_dict_len = len(word_dict)

python/paddle/fluid/tests/unittests/test_parallel_executor.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,3 +505,148 @@ def test_parallel_testing(self):
505505
train_loss, test_loss, atol=1e-8),
506506
"Train loss: " + str(train_loss) + "\n Test loss:" +
507507
str(test_loss))
508+
509+
510+
import paddle.dataset.conll05 as conll05
511+
import paddle.fluid as fluid
512+
513+
word_dict, verb_dict, label_dict = conll05.get_dict()
514+
word_dict_len = len(word_dict)
515+
label_dict_len = len(label_dict)
516+
pred_dict_len = len(verb_dict)
517+
mark_dict_len = 2
518+
word_dim = 32
519+
mark_dim = 5
520+
hidden_dim = 512
521+
depth = 8
522+
mix_hidden_lr = 1e-3
523+
embedding_name = 'emb'
524+
525+
526+
def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
527+
**ignored):
528+
# 8 features
529+
predicate_embedding = fluid.layers.embedding(
530+
input=predicate,
531+
size=[pred_dict_len, word_dim],
532+
dtype='float32',
533+
param_attr='vemb')
534+
535+
mark_embedding = fluid.layers.embedding(
536+
input=mark, size=[mark_dict_len, mark_dim], dtype='float32')
537+
538+
word_input = [word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2]
539+
emb_layers = [
540+
fluid.layers.embedding(
541+
size=[word_dict_len, word_dim],
542+
input=x,
543+
param_attr=fluid.ParamAttr(
544+
name=embedding_name, trainable=False)) for x in word_input
545+
]
546+
emb_layers.append(predicate_embedding)
547+
emb_layers.append(mark_embedding)
548+
549+
hidden_0_layers = [
550+
fluid.layers.fc(input=emb, size=hidden_dim, act='tanh')
551+
for emb in emb_layers
552+
]
553+
554+
hidden_0 = fluid.layers.sums(input=hidden_0_layers)
555+
556+
lstm_0 = fluid.layers.dynamic_lstm(
557+
input=hidden_0,
558+
size=hidden_dim,
559+
candidate_activation='relu',
560+
gate_activation='sigmoid',
561+
cell_activation='sigmoid')
562+
563+
# stack L-LSTM and R-LSTM with direct edges
564+
input_tmp = [hidden_0, lstm_0]
565+
566+
for i in range(1, depth):
567+
mix_hidden = fluid.layers.sums(input=[
568+
fluid.layers.fc(input=input_tmp[0], size=hidden_dim, act='tanh'),
569+
fluid.layers.fc(input=input_tmp[1], size=hidden_dim, act='tanh')
570+
])
571+
572+
lstm = fluid.layers.dynamic_lstm(
573+
input=mix_hidden,
574+
size=hidden_dim,
575+
candidate_activation='relu',
576+
gate_activation='sigmoid',
577+
cell_activation='sigmoid',
578+
is_reverse=((i % 2) == 1))
579+
580+
input_tmp = [mix_hidden, lstm]
581+
582+
feature_out = fluid.layers.sums(input=[
583+
fluid.layers.fc(input=input_tmp[0], size=label_dict_len, act='tanh'),
584+
fluid.layers.fc(input=input_tmp[1], size=label_dict_len, act='tanh')
585+
])
586+
587+
return feature_out
588+
589+
590+
class TestCRFModel(unittest.TestCase):
591+
def test_all(self):
592+
main = fluid.Program()
593+
startup = fluid.Program()
594+
with fluid.program_guard(main, startup):
595+
word = fluid.layers.data(
596+
name='word_data', shape=[1], dtype='int64', lod_level=1)
597+
predicate = fluid.layers.data(
598+
name='verb_data', shape=[1], dtype='int64', lod_level=1)
599+
ctx_n2 = fluid.layers.data(
600+
name='ctx_n2_data', shape=[1], dtype='int64', lod_level=1)
601+
ctx_n1 = fluid.layers.data(
602+
name='ctx_n1_data', shape=[1], dtype='int64', lod_level=1)
603+
ctx_0 = fluid.layers.data(
604+
name='ctx_0_data', shape=[1], dtype='int64', lod_level=1)
605+
ctx_p1 = fluid.layers.data(
606+
name='ctx_p1_data', shape=[1], dtype='int64', lod_level=1)
607+
ctx_p2 = fluid.layers.data(
608+
name='ctx_p2_data', shape=[1], dtype='int64', lod_level=1)
609+
mark = fluid.layers.data(
610+
name='mark_data', shape=[1], dtype='int64', lod_level=1)
611+
feature_out = db_lstm(**locals())
612+
target = fluid.layers.data(
613+
name='target', shape=[1], dtype='int64', lod_level=1)
614+
crf_cost = fluid.layers.linear_chain_crf(
615+
input=feature_out,
616+
label=target,
617+
param_attr=fluid.ParamAttr(
618+
name='crfw', learning_rate=1e-1))
619+
avg_cost = fluid.layers.mean(crf_cost)
620+
621+
sgd_optimizer = fluid.optimizer.SGD(
622+
learning_rate=fluid.layers.exponential_decay(
623+
learning_rate=0.01,
624+
decay_steps=100000,
625+
decay_rate=0.5,
626+
staircase=True))
627+
sgd_optimizer.minimize(avg_cost)
628+
629+
train_data = paddle.batch(
630+
paddle.reader.shuffle(
631+
paddle.dataset.conll05.test(), buf_size=8192),
632+
batch_size=16)
633+
634+
place = fluid.CUDAPlace(0)
635+
exe = fluid.Executor(place)
636+
exe.run(startup)
637+
638+
pe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name)
639+
640+
feeder = fluid.DataFeeder(
641+
feed_list=[
642+
word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, predicate,
643+
mark, target
644+
],
645+
place=fluid.CPUPlace())
646+
647+
data = train_data()
648+
for i in xrange(10):
649+
cur_batch = next(data)
650+
print map(numpy.array,
651+
pe.run(feed_dict=feeder.feed(cur_batch),
652+
fetch_list=[avg_cost.name]))[0]

0 commit comments

Comments
 (0)