Skip to content

Commit fc6f0be

Browse files
authored
Merge pull request #9942 from reyoung/feature/tuning_pe_trans
Feature/tuning pe trans
2 parents b53f7e2 + 7286954 commit fc6f0be

File tree

6 files changed

+139
-39
lines changed

6 files changed

+139
-39
lines changed

paddle/fluid/framework/parallel_executor.cc

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs(
155155
#endif
156156
}
157157

158-
void ParallelExecutor::Run(
159-
const std::vector<std::string> &fetch_tensors,
160-
const std::string &fetched_var_name,
161-
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
158+
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
159+
const std::string &fetched_var_name) {
162160
platform::RecordBlock b(0);
163-
SplitTensorToPlaces(feed_tensors);
164-
165161
// Create local scopes.
166162
for (auto &scope : member_->local_scopes_) {
167163
Scope &local_scope = scope->NewScope();
@@ -195,14 +191,28 @@ void ParallelExecutor::Run(
195191
auto &local_scope =
196192
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
197193
scope->DeleteScope(local_scope);
198-
local_scope = nullptr;
199194
}
200195
}
201196

202-
void ParallelExecutor::SplitTensorToPlaces(
203-
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
204-
for (auto it : feed_tensors) {
205-
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
197+
void ParallelExecutor::FeedTensorsIntoLocalScopes(
198+
const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
199+
PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());
200+
201+
for (size_t i = 0; i < tensors.size(); ++i) {
202+
auto &map = tensors[i];
203+
auto *scope = member_->local_scopes_[i];
204+
for (auto &pair : map) {
205+
auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
206+
trg->ShareDataWith(pair.second);
207+
trg->set_lod(pair.second.lod());
208+
}
209+
}
210+
}
211+
212+
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
213+
const std::unordered_map<std::string, LoDTensor> &tensors) {
214+
for (auto pair : tensors) {
215+
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
206216
PADDLE_ENFORCE_EQ(
207217
member_->places_.size(), lod_tensors.size(),
208218
"The number of samples of current batch is less than the count of "
@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces(
211221
for (size_t j = 0; j < member_->places_.size(); ++j) {
212222
// TODO(panxy0718): Do I need to delete this var?
213223
auto t =
214-
member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
224+
member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
215225
t->ShareDataWith(lod_tensors[j]);
216226
t->set_lod(lod_tensors[j].lod());
217227
}

paddle/fluid/framework/parallel_executor.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,22 @@ class ParallelExecutor {
4444

4545
std::vector<Scope*>& GetLocalScopes();
4646

47+
/**
48+
* Feed tensors to local scopes. The size of tensors should be equal to the
49+
* size of local scopes.
50+
*/
51+
void FeedTensorsIntoLocalScopes(
52+
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
53+
54+
void FeedAndSplitTensorIntoLocalScopes(
55+
const std::unordered_map<std::string, LoDTensor>& tensors);
56+
4757
void Run(const std::vector<std::string>& fetch_tensors,
48-
const std::string& fetched_var_name,
49-
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
58+
const std::string& fetched_var_name);
5059

5160
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
5261

5362
private:
54-
void SplitTensorToPlaces(
55-
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
56-
5763
ParallelExecutorPrivate* member_;
5864
};
5965

paddle/fluid/pybind/pybind.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle.
505505
scope, local_scopes, allow_op_delay);
506506
})
507507
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
508+
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
509+
// We still cannot get local_scope from this vector, since the element
510+
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
511+
// one by one and mark them as reference.
508512
.def("local_scopes",
509513
[](ParallelExecutor &self) -> std::vector<Scope *> * {
510514
return &self.GetLocalScopes();
511515
},
512516
py::return_value_policy::reference)
517+
.def("feed_tensors_into_local_scopes",
518+
&ParallelExecutor::FeedTensorsIntoLocalScopes)
519+
.def("feed_and_split_tensor_into_local_scopes",
520+
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
513521
.def("run", &ParallelExecutor::Run);
514522

515523
BindRecordIOWriter(&m);

paddle/fluid/pybind/tensor_py.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ void PyCUDATensorSetFromArray(
190190
static_cast<const platform::CUDADeviceContext *>(pool.Get(place));
191191
paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(T) * array.size(),
192192
cudaMemcpyHostToDevice, dev_ctx->stream());
193+
// NOTE: For safety, here wait the copy complete.
194+
// It because the CPU array.data() could be destroyed after this method.
195+
// If we make this method async, it could be copied data from a memory buffer
196+
// that has been freed.
197+
dev_ctx->Wait();
193198
}
194199

195200
template <>
@@ -216,6 +221,11 @@ void PyCUDATensorSetFromArray(
216221
paddle::platform::GpuMemcpyAsync(dst, array.data(),
217222
sizeof(uint16_t) * array.size(),
218223
cudaMemcpyHostToDevice, dev_ctx->stream());
224+
// NOTE: For safety, here wait the copy complete.
225+
// It because the CPU array.data() could be destroyed after this method.
226+
// If we make this method async, it could be copied data from a memory buffer
227+
// that has been freed.
228+
dev_ctx->Wait();
219229
}
220230

221231
template <typename T>

python/paddle/fluid/parallel_executor.py

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import multiprocessing
1717
import framework
1818
import executor
19+
import sys
1920

2021
__all__ = ['ParallelExecutor']
2122

@@ -123,28 +124,93 @@ def __init__(self,
123124
allow_op_delay)
124125
self.scope = scope
125126

126-
def run(self, fetch_list, feed_dict={}):
127+
def run(self, fetch_list, feed=None, feed_dict=None):
127128
"""
128-
:param fetch_list: A list of variable names that will be fetched.
129-
:param feed_dict: A dict mapping for feed variable name to LoDTensor
130-
or numpy array.
131-
:return: fetched value list.
132-
"""
133-
if not isinstance(feed_dict, dict):
134-
raise TypeError("feed_dict should be a dict")
129+
Run a parallel executor with fetch_list.
130+
131+
The feed parameter can be a dict or a list. If feed is a dict, the
132+
feed data will be split into multiple devices. If feed is a list, we
133+
assume the data has been splitted into multiple devices, the each
134+
element in the list will be copied to each device directly.
135+
136+
For example, if the feed is a dict:
137+
>>> exe = ParallelExecutor()
138+
>>> # the image will be splitted into devices. If there is two devices
139+
>>> # each device will process an image with shape (24, 1, 28, 28)
140+
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
141+
142+
For example, if the feed is a list:
143+
>>> exe = ParallelExecutor()
144+
>>> # each device will process each element in the list.
145+
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
146+
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
147+
>>> #
148+
>>> # you can use exe.device_count to get the device number.
149+
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
150+
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
151+
>>> ])
152+
153+
154+
Args:
155+
fetch_list(list): The fetched variable names
156+
feed(list|dict|None): The feed variables. If the feed is a dict,
157+
tensors in that dict will be splitted into each devices. If
158+
the feed is a list, each element of the list will be copied
159+
to each device.
160+
feed_dict: Alias for feed parameter, for backward compatibility.
161+
This parameter is deprecated.
135162
136-
feed_tensor_dict = {}
137-
for i, feed_name in enumerate(feed_dict):
138-
feed_tensor = feed_dict[feed_name]
139-
if not isinstance(feed_tensor, core.LoDTensor):
140-
feed_tensor = core.LoDTensor()
141-
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
142-
feed_tensor_dict[feed_name] = feed_tensor
163+
Returns: fetched result list.
164+
165+
"""
166+
if feed is None:
167+
feed = feed_dict
168+
print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`"
169+
170+
if isinstance(feed, dict):
171+
feed_tensor_dict = dict()
172+
for feed_name in feed:
173+
feed_tensor = feed[feed_name]
174+
if not isinstance(feed_tensor, core.LoDTensor):
175+
feed_tensor = core.LoDTensor()
176+
# always set to CPU place, since the tensor need to be splitted
177+
# it is fast in CPU
178+
feed_tensor.set(feed[feed_name], core.CPUPlace())
179+
feed_tensor_dict[feed_name] = feed_tensor
180+
181+
self.executor.feed_and_split_tensor_into_local_scopes(
182+
feed_tensor_dict)
183+
elif isinstance(feed, list) or isinstance(feed, tuple):
184+
if len(feed) != len(self._act_places):
185+
raise ValueError(
186+
"Feed a list of tensor, the list should be the same size as places"
187+
)
188+
189+
res = list()
190+
191+
for i, each in enumerate(feed):
192+
if not isinstance(each, dict):
193+
raise TypeError(
194+
"Each element of feed list should be a dict")
195+
res_dict = dict()
196+
for feed_name in each:
197+
tensor = each[feed_name]
198+
if not isinstance(tensor, core.LoDTensor):
199+
tmp = core.LoDTensor()
200+
tmp.set(tensor, self._act_places[i])
201+
tensor = tmp
202+
res_dict[feed_name] = tensor
203+
res.append(res_dict)
204+
self.executor.feed_tensors_into_local_scopes(res)
143205

144206
fetch_var_name = '@FETCHED_VAR_NAME@'
145-
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
207+
self.executor.run(fetch_list, fetch_var_name)
146208
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
147209
return [arr[i] for i in range(len(arr))]
148210

149211
def bcast_params(self):
150212
self.executor.bcast_params(set(self.persistable_vars))
213+
214+
@property
215+
def device_count(self):
216+
return len(self._act_places)

python/paddle/fluid/tests/unittests/test_parallel_executor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,12 @@ def check_network_convergence(self,
203203
iter=10,
204204
batch_size=None,
205205
allow_op_delay=False,
206-
feed_dict={}):
206+
feed_dict=None):
207207
main = fluid.Program()
208208
startup = fluid.Program()
209209
startup.random_seed = 1 # Fix random seed
210210
with fluid.program_guard(main, startup):
211-
loss = method(use_feed=len(feed_dict) > 0)
211+
loss = method(use_feed=feed_dict is not None)
212212
adam = fluid.optimizer.Adam()
213213
adam.minimize(loss)
214214
if memory_opt:
@@ -222,13 +222,13 @@ def check_network_convergence(self,
222222
if batch_size is not None:
223223
batch_size *= fluid.core.get_cuda_device_count()
224224
begin = time.time()
225-
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
225+
first_loss, = exe.run([loss.name], feed=feed_dict)
226226
first_loss = numpy.array(first_loss)
227227

228228
for i in xrange(iter):
229-
exe.run([], feed_dict=feed_dict)
229+
exe.run([], feed=feed_dict)
230230

231-
last_loss, = exe.run([loss.name], feed_dict=feed_dict)
231+
last_loss, = exe.run([loss.name], feed=feed_dict)
232232
end = time.time()
233233

234234
if batch_size is not None:
@@ -649,5 +649,5 @@ def test_all(self):
649649
for i in xrange(10):
650650
cur_batch = next(data)
651651
print map(numpy.array,
652-
pe.run(feed_dict=feeder.feed(cur_batch),
652+
pe.run(feed=feeder.feed(cur_batch),
653653
fetch_list=[avg_cost.name]))[0]

0 commit comments

Comments
 (0)