Skip to content

Commit 4bbfa9e

Browse files
committed
Add feed to ParallelExecutor
1 parent a98a3fd commit 4bbfa9e

File tree

6 files changed

+99
-39
lines changed

6 files changed

+99
-39
lines changed

doc/fluid/api/gen_doc.sh

100755100644
File mode changed.

paddle/fluid/framework/lod_tensor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
142142
return (lod_)[level].size() - 1;
143143
}
144144

145+
// Split LoDTensor and copy to each place specified in places.
145146
std::vector<LoDTensor> SplitLoDTensor(
146147
const std::vector<platform::Place> places) const;
147148

paddle/fluid/framework/parallel_executor.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
150150
#endif
151151
}
152152

153-
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
154-
const std::string &fetched_var_name) {
153+
void ParallelExecutor::Run(
154+
const std::vector<std::string> &fetch_tensors,
155+
const std::string &fetched_var_name,
156+
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
155157
platform::RecordBlock b(0);
158+
SplitTensorToPlaces(feed_tensors);
156159
auto fetch_data = member_->executor_->Run(fetch_tensors);
157160
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
158161
fetch_data;
159162
}
160163

164+
void ParallelExecutor::SplitTensorToPlaces(
165+
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
166+
for (auto it : feed_tensors) {
167+
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
168+
for (size_t j = 0; j < member_->places_.size(); ++j) {
169+
// TODO(panxy0718): Do I need to delete this var?
170+
member_->local_scopes_[j]
171+
->Var(it.first)
172+
->GetMutable<LoDTensor>()
173+
->ShareDataWith(lod_tensors[j]);
174+
}
175+
}
176+
}
177+
161178
} // namespace framework
162179
} // namespace paddle

paddle/fluid/framework/parallel_executor.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ class ParallelExecutor {
4242
bool allow_op_delay);
4343

4444
void Run(const std::vector<std::string>& fetch_tensors,
45-
const std::string& fetched_var_name = "fetched_var");
45+
const std::string& fetched_var_name,
46+
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
4647

4748
private:
49+
void SplitTensorToPlaces(
50+
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
51+
4852
ParallelExecutorPrivate* member_;
4953

5054
void BCastParamsToGPUs(const ProgramDesc& startup_program) const;

python/paddle/fluid/parallel_executor.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,29 @@ def __init__(self,
2626
use_cuda,
2727
num_threads=None,
2828
allow_op_delay=False):
29-
places = []
29+
self._places = []
30+
self._act_places = []
3031
if use_cuda:
3132
for i in xrange(core.get_cuda_device_count()):
3233
p = core.Place()
33-
p.set_place(core.CUDAPlace(i))
34-
places.append(p)
34+
self._act_places.append(core.CUDAPlace(i))
35+
p.set_place(self._act_places[-1])
36+
self._places.append(p)
3537
else:
3638
for i in xrange(multiprocessing.cpu_count()):
3739
p = core.Place()
38-
p.set_place(core.CPUPlace())
39-
places.append(p)
40+
self._act_places.append(core.CPUPlace(i))
41+
p.set_place(self._act_places[-1])
42+
self._places.append(p)
43+
assert self._places, "no place for execution"
4044

4145
if num_threads is None:
4246
if use_cuda:
4347
# Experiments on se-resnext shows that too many threads hurt
4448
# performance. Worth tunning for other models in the future.
45-
num_threads = len(places)
49+
num_threads = len(self._places)
4650
else:
47-
min(len(places) * 2, multiprocessing.cpu_count())
51+
min(len(self._places) * 2, multiprocessing.cpu_count())
4852

4953
startup = framework.default_startup_program()
5054
main = framework.default_main_program()
@@ -53,7 +57,7 @@ def __init__(self,
5357
self.executor = core.ParallelExecutor(
5458
num_threads,
5559
True if use_cuda else False, # use_event
56-
places,
60+
self._places,
5761
set([
5862
p.name for p in main.global_block().iter_parameters()
5963
if not p.stop_gradient
@@ -65,8 +69,22 @@ def __init__(self,
6569
allow_op_delay)
6670
self.scope = scope
6771

68-
def run(self, fetch_list):
72+
def run(self, fetch_list, feed_dict={}):
73+
"""
74+
:param fetch_list: A list of variable names that will be fetched.
75+
:param feed_dict: A dict mapping for feed variable name to LoDTensor
76+
or numpy array.
77+
:return: fetched value list.
78+
"""
79+
feed_tensor_dict = {}
80+
for i, feed_name in enumerate(feed_dict):
81+
feed_tensor = feed_dict[feed_name]
82+
if not isinstance(feed_tensor, core.LoDTensor):
83+
feed_tensor = core.LoDTensor()
84+
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
85+
feed_tensor_dict[feed_name] = feed_tensor
86+
6987
fetch_var_name = '@FETCHED_VAR_NAME@'
70-
self.executor.run(fetch_list, fetch_var_name)
88+
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
7189
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
7290
return [arr[i] for i in range(len(arr))]

python/paddle/fluid/tests/unittests/test_parallel_executor.py

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import paddle.dataset.wmt16 as wmt16
2222

2323

24-
def simple_fc_net():
25-
reader = fluid.layers.open_recordio_file(
26-
filename='./mnist.recordio',
27-
shapes=[[-1, 784], [-1, 1]],
28-
lod_levels=[0, 0],
29-
dtypes=['float32', 'int64'])
30-
img, label = fluid.layers.read_file(reader)
24+
def simple_fc_net(use_feed):
25+
if use_feed:
26+
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
27+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
28+
else:
29+
reader = fluid.layers.open_recordio_file(
30+
filename='./mnist.recordio',
31+
shapes=[[-1, 784], [-1, 1]],
32+
lod_levels=[0, 0],
33+
dtypes=['float32', 'int64'])
34+
img, label = fluid.layers.read_file(reader)
3135
hidden = img
3236
for _ in xrange(4):
3337
hidden = fluid.layers.fc(
@@ -42,13 +46,18 @@ def simple_fc_net():
4246
return loss
4347

4448

45-
def fc_with_batchnorm():
46-
reader = fluid.layers.open_recordio_file(
47-
filename='./mnist.recordio',
48-
shapes=[[-1, 784], [-1, 1]],
49-
lod_levels=[0, 0],
50-
dtypes=['float32', 'int64'])
51-
img, label = fluid.layers.read_file(reader)
49+
def fc_with_batchnorm(use_feed):
50+
if use_feed:
51+
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
52+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
53+
else:
54+
reader = fluid.layers.open_recordio_file(
55+
filename='./mnist.recordio',
56+
shapes=[[-1, 784], [-1, 1]],
57+
lod_levels=[0, 0],
58+
dtypes=['float32', 'int64'])
59+
img, label = fluid.layers.read_file(reader)
60+
5261
hidden = img
5362
for _ in xrange(1):
5463
hidden = fluid.layers.fc(
@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
135144
return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
136145

137146

138-
def SE_ResNeXt152Small(batch_size=2):
147+
def SE_ResNeXt152Small(batch_size=2, use_feed=False):
148+
assert not use_feed, "SE_ResNeXt doesn't support feed yet"
149+
139150
img = fluid.layers.fill_constant(
140151
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
141152
label = fluid.layers.fill_constant(
@@ -185,30 +196,28 @@ def check_network_convergence(self,
185196
memory_opt=True,
186197
iter=10,
187198
batch_size=None,
188-
allow_op_delay=False):
199+
allow_op_delay=False,
200+
feed_dict={}):
189201
main = fluid.Program()
190202
startup = fluid.Program()
191203
with fluid.program_guard(main, startup):
192-
loss = method()
204+
loss = method(use_feed=len(feed_dict) > 0)
193205
adam = fluid.optimizer.Adam()
194206
adam.minimize(loss)
195207
if memory_opt:
196208
fluid.memory_optimize(main)
197209

198-
exe = fluid.ParallelExecutor(
199-
loss_name=loss.name,
200-
use_cuda=True,
201-
allow_op_delay=allow_op_delay)
210+
exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
202211
if batch_size is not None:
203212
batch_size *= fluid.core.get_cuda_device_count()
204213
begin = time.time()
205-
first_loss, = exe.run([loss.name])
214+
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
206215
first_loss = numpy.array(first_loss)
207216

208217
for i in xrange(iter):
209-
exe.run([])
218+
exe.run([], feed_dict=feed_dict)
210219

211-
last_loss, = exe.run([loss.name])
220+
last_loss, = exe.run([loss.name], feed_dict=feed_dict)
212221
end = time.time()
213222

214223
if batch_size is not None:
@@ -242,9 +251,19 @@ def test_simple_fc(self):
242251
self.check_network_convergence(simple_fc_net)
243252
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
244253

254+
img = numpy.zeros(shape=[32, 784], dtype='float32')
255+
label = numpy.ones(shape=[32, 1], dtype='int64')
256+
self.check_network_convergence(
257+
simple_fc_net, feed_dict={"image": img,
258+
"label": label})
259+
245260
def test_batchnorm_fc(self):
246261
self.check_network_convergence(fc_with_batchnorm)
247-
self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True)
262+
img = numpy.zeros(shape=[32, 784], dtype='float32')
263+
label = numpy.ones(shape=[32, 1], dtype='int64')
264+
self.check_network_convergence(
265+
fc_with_batchnorm, feed_dict={"image": img,
266+
"label": label})
248267

249268

250269
class TestResnet(TestParallelExecutorBase):
@@ -400,7 +419,8 @@ def data_to_tensor(data_list, name_list, input_dict, place):
400419
import transformer_model
401420

402421

403-
def transformer():
422+
def transformer(use_feed):
423+
assert not use_feed, "transfomer doesn't support feed yet"
404424
return transformer_model.transformer(
405425
ModelHyperParams.src_vocab_size + 1,
406426
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,

0 commit comments

Comments
 (0)