Skip to content

Commit a7a4b72

Browse files
authored
[Cherry pick] Support feed single persistable variable to PE (#19435)
* update executor feed
1 parent 5860cc4 commit a7a4b72

File tree

4 files changed

+135
-8
lines changed

4 files changed

+135
-8
lines changed

paddle/fluid/framework/parallel_executor.cc

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,17 @@ class ParallelExecutorPrivate {
200200
InitNCCLCtxs(scope, bst);
201201
}
202202
#endif
203+
inline bool IsPersistable(const std::string &name) const {
204+
auto iter = is_persistable_.find(name);
205+
return iter != is_persistable_.end() && iter->second;
206+
}
203207

204208
BuildStrategy build_strategy_;
205209
std::vector<platform::Place> places_;
206210
std::vector<Scope *> local_scopes_;
207211
Scope *global_scope_; // not owned
208212
std::unique_ptr<details::SSAGraphExecutor> executor_;
213+
std::unordered_map<std::string, bool> is_persistable_;
209214

210215
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
211216
platform::NCCLCommunicator *nccl_ctxs_{nullptr};
@@ -473,6 +478,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
473478
var_infos.back().name_ = node->Var()->Name();
474479
var_infos.back().type_ = node->Var()->GetType();
475480
var_infos.back().persistable_ = node->Var()->Persistable();
481+
member_->is_persistable_.emplace(node->Var()->Name(),
482+
node->Var()->Persistable());
476483
}
477484
}
478485

@@ -642,23 +649,58 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes(
642649

643650
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
644651
const std::unordered_map<std::string, LoDTensor> &tensors) {
645-
for (auto pair : tensors) {
652+
size_t num_places = member_->places_.size();
653+
for (auto &pair : tensors) {
654+
bool is_persistable = member_->IsPersistable(pair.first);
655+
VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable")
656+
<< " data (" << pair.first << "), dim:" << pair.second.dims()
657+
<< ", place: " << pair.second.place();
646658
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
647-
if (member_->places_.size() != lod_tensors.size()) {
648-
bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
659+
bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
660+
if (!is_persistable && num_places != lod_tensors.size()) {
649661
auto error_info = string::Sprintf(
650-
"The number(%d) of samples of "
651-
"current batch is less than the count(%d) of "
652-
"devices(%s), currently, it is not allowed. ",
653-
lod_tensors.size(), member_->places_.size(),
662+
"The number(%d) of samples[%s] of current batch is less than the "
663+
"count(%d) of devices(%s), currently, it is not allowed. ",
664+
lod_tensors.size(), pair.first, num_places,
654665
(is_cpu_place ? "CPU" : "GPU"));
655666
if (is_cpu_place) {
656667
error_info +=
657668
"You should set the environment variable CPU_NUM in the system "
658669
"to determine the number of devices you need.";
659670
}
660671
PADDLE_THROW(error_info);
672+
} else if (is_persistable) {
673+
if (lod_tensors.size() == 1) {
674+
lod_tensors.reserve(num_places);
675+
auto &tensor = lod_tensors.front();
676+
PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(),
677+
"The dim doesn't match.");
678+
PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0),
679+
"The place doesn't match.");
680+
for (size_t i = 1; i < num_places; ++i) {
681+
lod_tensors.emplace_back();
682+
auto &tmp = lod_tensors.back();
683+
framework::TensorCopy(pair.second, member_->places_.at(i), &tmp);
684+
}
685+
}
686+
if (lod_tensors.size() != num_places) {
687+
auto error_info = string::Sprintf(
688+
"The number(%d) of samples[%s] of the current batch does not match "
689+
"the count(%d) of devices(%s). Because that %s is a persistable "
690+
"variable, you can feed just one sample, in that case, the input "
691+
"sample will be copied in %d copies and be sent to different "
692+
"places separately. If you need that different place has different "
693+
"value, you should feed %d samples.",
694+
lod_tensors.size(), pair.first, num_places,
695+
(is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places);
696+
PADDLE_THROW(error_info);
697+
}
661698
}
699+
PADDLE_ENFORCE_EQ(
700+
lod_tensors.size(), num_places,
701+
"The number(%d) of samples of the current batch does not match the "
702+
"count(%d) of devices.",
703+
lod_tensors.size(), num_places);
662704
for (size_t j = 0; j < member_->places_.size(); ++j) {
663705
// TODO(panxy0718): Do I need to delete this var?
664706
auto t =

python/paddle/fluid/executor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,8 +496,11 @@ def _run_parallel(self, program, scope, feed, fetch_list, fetch_var_name,
496496
feed_tensor = feed[feed_name]
497497
if not isinstance(feed_tensor, core.LoDTensor):
498498
feed_tensor = core.LoDTensor()
499-
# always set to CPU place, since the tensor need to be splitted
499+
# always set to CPU place, since the tensor need to be split
500500
# it is fast in CPU
501+
assert isinstance( feed[feed_name], np.ndarray ), \
502+
"The input({}) should be numpy.array, but not {}.".format(
503+
feed_name, type(feed[feed_name]))
501504
feed_tensor.set(feed[feed_name], core.CPUPlace())
502505
feed_tensor_dict[feed_name] = feed_tensor
503506

@@ -518,6 +521,9 @@ def _run_parallel(self, program, scope, feed, fetch_list, fetch_var_name,
518521
tensor = each[feed_name]
519522
if not isinstance(tensor, core.LoDTensor):
520523
tmp = core.LoDTensor()
524+
assert isinstance(each[feed_name], np.ndarray), \
525+
"The input({}) should be numpy.array, but not {}.".format(
526+
feed_name, type(each[feed_name]))
521527
tmp.set(tensor, program._places[i])
522528
tensor = tmp
523529
res_dict[feed_name] = tensor

python/paddle/fluid/tests/unittests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,5 +230,6 @@ if(WITH_DISTRIBUTE)
230230
endif()
231231

232232
set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist
233+
test_parallel_executor_feed_persistable_var
233234
test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op
234235
PROPERTIES LABELS "RUN_TYPE=DIST")
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import print_function
16+
from functools import partial
17+
import numpy
18+
import unittest
19+
import paddle.fluid.core as core
20+
import paddle.fluid as fluid
21+
from simple_nets import init_data, simple_fc_net
22+
import os
23+
24+
25+
class TestFeedPersistableVar(unittest.TestCase):
26+
@classmethod
27+
def setUpClass(cls):
28+
os.environ['CPU_NUM'] = str(4)
29+
batch_size = 4
30+
cls.img, cls.label = init_data(
31+
batch_size, img_shape=[784], label_range=9)
32+
cls.feed_dict = {
33+
'image': cls.img,
34+
'label': cls.label,
35+
'learning_rate': numpy.array([1.0]).astype("float32")
36+
}
37+
38+
def optimizer(self):
39+
learning_rate = fluid.layers.create_global_var(
40+
name="learning_rate",
41+
shape=[1],
42+
value=1.0,
43+
dtype='float32',
44+
persistable=True)
45+
optimizer = fluid.optimizer.SGD(learning_rate=learning_rate)
46+
return optimizer
47+
48+
def check_feed_persistable_var(self, feed_dict, use_cuda=False):
49+
if use_cuda and not core.is_compiled_with_cuda():
50+
return
51+
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
52+
exe = fluid.Executor(place)
53+
54+
main = fluid.Program()
55+
startup = fluid.Program()
56+
with fluid.program_guard(main, startup):
57+
loss = simple_fc_net()
58+
59+
optimizer = self.optimizer()
60+
optimizer.minimize(loss)
61+
62+
exe.run(program=startup)
63+
compiled_prog = fluid.compiler.CompiledProgram(
64+
main).with_data_parallel(loss_name=loss.name)
65+
66+
exe.run(program=compiled_prog, feed=feed_dict)
67+
68+
def test_feed_persistable_var(self):
69+
self.check_feed_persistable_var(self.feed_dict)
70+
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
71+
72+
self.feed_dict['learning_rate'] = numpy.array(
73+
[1.0, 1.0]).astype("float32")
74+
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
75+
76+
77+
if __name__ == '__main__':
78+
unittest.main()

0 commit comments

Comments
 (0)