Skip to content

Commit d36e13e

Browse files
committed
Merge branch 'feature/add_pyreader_demo' into feature/combine_open_files_and_double_buffer
2 parents 1478a5f + c9cf2bd commit d36e13e

File tree

5 files changed

+169
-29
lines changed

5 files changed

+169
-29
lines changed

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,13 @@ void ThreadedSSAGraphExecutor::InsertFetchOps(
168168

169169
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
170170
auto &var_name = fetch_tensors[i];
171-
auto &vars = fetched_vars.at(var_name);
171+
172+
auto fetched_var_it = fetched_vars.find(var_name);
173+
PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(),
174+
"Cannot find fetched variable.(Perhaps the main_program "
175+
"is not set to ParallelExecutor)");
176+
177+
auto &vars = fetched_var_it->second;
172178
auto *op = new FetchOpHandle(fetch_data, i, &local_scopes_);
173179
fetch_ops->emplace_back(op);
174180

paddle/fluid/operators/reader/create_shuffle_reader_op.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class ShuffleReader : public framework::DecoratedReader {
4848

4949
private:
5050
void ShutdownImpl() override {
51+
reader_->Shutdown();
5152
buffer_.clear();
5253
iteration_pos_ = 0;
53-
reader_->Shutdown();
5454
}
5555

5656
void StartImpl() override {

paddle/fluid/operators/reader/open_files_op.cc

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include "ThreadPool.h"
1919
#include "paddle/fluid/framework/blocking_queue.h"
2020
#include "paddle/fluid/operators/reader/blocking_queue.h"
21-
#include "paddle/fluid/operators/reader/buffered_reader.h"
2221
#include "paddle/fluid/operators/reader/reader_op_registry.h"
2322

2423
namespace paddle {
@@ -233,17 +232,12 @@ class OpenFilesOp : public framework::OperatorBase {
233232
container.reset(new OrderedReaderContainer());
234233
} else {
235234
container.reset(new PreemptiveReaderContainer(
236-
static_cast<size_t>(Attr<int>("thread_num"))));
235+
std::min(file_names.size(),
236+
static_cast<size_t>(std::thread::hardware_concurrency()))));
237237
}
238238

239-
auto reader =
240-
std::make_shared<MultiFileReader>(file_names, std::move(container));
241-
auto buffer_size = Attr<int>("buffer_size");
242-
if (buffer_size > 1) {
243-
reader = framework::MakeDecoratedReader<BufferedReader>(
244-
reader, platform::CPUPlace(), buffer_size);
245-
}
246-
out->Reset(reader);
239+
out->Reset(
240+
std::make_shared<MultiFileReader>(file_names, std::move(container)));
247241
}
248242
};
249243

@@ -259,8 +253,6 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
259253
An OpenFilesOp creates a MultiFileReader, which is able to
260254
read data multi-threaded from multiple files.
261255
)DOC");
262-
AddAttr<int>("thread_num", "Number of thread to read files.");
263-
AddAttr<int>("buffer_size", "The reading buffer of these files.");
264256
}
265257
};
266258

python/paddle/fluid/layers/io.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import contextlib
15+
import multiprocessing
1516

16-
from .. import core
17-
from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program
18-
from ..unique_name import generate as unique_name
1917
from control_flow import BlockGuard
20-
from ..layer_helper import LayerHelper
18+
from layer_function_generator import templatedoc
19+
from .. import core
2120
from ..executor import global_scope
22-
from layer_function_generator import generate_layer_fn, templatedoc
23-
import sys
24-
import multiprocessing
21+
from ..framework import convert_np_dtype_to_dtype_, default_main_program, \
22+
default_startup_program
23+
from ..layer_helper import LayerHelper
24+
from ..unique_name import generate as unique_name
2525

2626
__all__ = [
2727
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'Recv',
@@ -448,7 +448,12 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
448448
return monkey_patch_reader_methods(main_prog_var)
449449

450450

451-
def py_reader(capacity, shapes, dtypes, lod_levels=None):
451+
def py_reader(capacity,
452+
shapes,
453+
dtypes,
454+
lod_levels=None,
455+
name=None,
456+
use_double_buffer=True):
452457
"""
453458
Create a reader and blocking queue for data feeding in Python
454459
@@ -461,10 +466,13 @@ def py_reader(capacity, shapes, dtypes, lod_levels=None):
461466
using `close()` method when unused.
462467
463468
Args:
469+
use_double_buffer(bool): Whether use double buffer or not.
464470
capacity(int): The maximum capacity of the BlockingQueue.
465-
shapes(list): List of tuples which declaring data shapes.
466-
dtypes(list): List of strs which declaring data type.
467-
lod_levels(list): List of ints which declaring data lod_level.
471+
shapes(list|tuple): List of tuples which declaring data shapes.
472+
dtypes(list|tuple): List of strs which declaring data type.
473+
lod_levels(list|tuple): List of ints which declaring data lod_level.
474+
name(basestring): The prefix Python queue name and Reader name. None will
475+
be generated automatically.
468476
469477
Returns:
470478
tuple(Variable, BlockingQueue):
@@ -505,15 +513,23 @@ def feed_data(queue, feed_images, feed_labels):
505513
if lod_levels is None:
506514
lod_levels = [0] * len(shapes)
507515

508-
queue_name = unique_name('lod_tensor_blocking_queue')
516+
if name is None:
517+
queue_name = unique_name('lod_tensor_blocking_queue')
518+
reader_name = unique_name('create_py_reader')
519+
double_buffer_name = unique_name('double_buffer')
520+
else:
521+
queue_name = "_".join([name, "queue"])
522+
reader_name = "_".join([name, "reader"])
523+
double_buffer_name = "_".join([name, "double_buffer"])
524+
509525
var = global_scope().var(queue_name)
510526
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
511527

512528
startup_blk = default_startup_program().current_block()
513-
startup_var = startup_blk.create_var(name=unique_name('create_py_reader'))
529+
startup_var = startup_blk.create_var(name=reader_name)
514530
startup_blk.append_op(
515531
type='create_py_reader',
516-
inputs={'blocking_queue': queue_name},
532+
inputs={'blocking_queue': [queue_name]},
517533
outputs={'Out': [startup_var]},
518534
attrs={
519535
'shape_concat': shape_concat,
@@ -527,7 +543,10 @@ def feed_data(queue, feed_images, feed_labels):
527543
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
528544
startup_var)
529545

530-
return monkey_patch_reader_methods(main_prog_var), feed_queue
546+
reader = monkey_patch_reader_methods(main_prog_var)
547+
if use_double_buffer:
548+
reader = double_buffer(reader, name=double_buffer_name)
549+
return reader, feed_queue
531550

532551

533552
def open_files(filenames,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import paddle.fluid as fluid
16+
import paddle.dataset.mnist as mnist
17+
import paddle
18+
import paddle.v2
19+
import threading
20+
import numpy
21+
22+
23+
def network(is_train):
24+
reader, queue = fluid.layers.py_reader(
25+
capacity=10,
26+
shapes=((-1, 784), (-1, 1)),
27+
dtypes=('float32', 'int64'),
28+
name="train_reader" if is_train else "test_reader")
29+
img, label = fluid.layers.read_file(reader)
30+
31+
hidden = img
32+
33+
for i in xrange(2):
34+
hidden = fluid.layers.fc(input=hidden, size=100, act='tanh')
35+
hidden = fluid.layers.dropout(
36+
hidden, dropout_prob=0.5, is_test=not is_train)
37+
38+
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
39+
loss = fluid.layers.cross_entropy(input=prediction, label=label)
40+
return fluid.layers.mean(loss), queue, reader
41+
42+
43+
def pipe_reader_to_queue(reader_creator, queue):
44+
with fluid.program_guard(fluid.Program(), fluid.Program()):
45+
feeder = fluid.DataFeeder(
46+
feed_list=[
47+
fluid.layers.data(
48+
name='img', dtype='float32', shape=[784]),
49+
fluid.layers.data(
50+
name='label', dtype='int64', shape=[1])
51+
],
52+
place=fluid.CPUPlace())
53+
54+
def __thread_main__():
55+
for data in feeder.decorate_reader(
56+
reader_creator, multi_devices=False)():
57+
tmp = fluid.core.LoDTensorArray()
58+
tmp.append(data['img'])
59+
tmp.append(data['label'])
60+
queue.push(tmp)
61+
queue.close()
62+
63+
th = threading.Thread(target=__thread_main__)
64+
th.start()
65+
return th
66+
67+
68+
def main():
69+
train_prog = fluid.Program()
70+
startup_prog = fluid.Program()
71+
72+
with fluid.program_guard(train_prog, startup_prog):
73+
with fluid.unique_name.guard():
74+
loss, train_queue, train_reader = network(True)
75+
adam = fluid.optimizer.Adam(learning_rate=0.01)
76+
adam.minimize(loss)
77+
78+
test_prog = fluid.Program()
79+
test_startup = fluid.Program()
80+
with fluid.program_guard(test_prog, test_startup):
81+
with fluid.unique_name.guard():
82+
test_loss, test_queue, test_reader = network(False)
83+
84+
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
85+
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup)
86+
87+
trainer = fluid.ParallelExecutor(
88+
use_cuda=True, loss_name=loss.name, main_program=train_prog)
89+
90+
tester = fluid.ParallelExecutor(
91+
use_cuda=True, share_vars_from=trainer, main_program=test_prog)
92+
93+
for epoch_id in xrange(10):
94+
train_data_thread = pipe_reader_to_queue(
95+
paddle.batch(paddle.v2.reader.firstn(mnist.train(), 32), 64),
96+
train_queue)
97+
try:
98+
while True:
99+
print 'train_loss', numpy.array(
100+
trainer.run(fetch_list=[loss.name]))
101+
except fluid.core.EOFException:
102+
print 'End of epoch', epoch_id
103+
train_reader.reset()
104+
train_data_thread.join()
105+
106+
test_data_thread = pipe_reader_to_queue(
107+
paddle.batch(mnist.test(), 32), test_queue)
108+
try:
109+
while True:
110+
print 'test loss', numpy.array(
111+
tester.run(fetch_list=[test_loss.name]))
112+
except fluid.core.EOFException:
113+
print 'End of testing'
114+
test_reader.reset()
115+
116+
test_data_thread.join()
117+
break
118+
del trainer
119+
del tester
120+
121+
122+
if __name__ == '__main__':
123+
main()

0 commit comments

Comments
 (0)