Skip to content

Commit eec412b

Browse files
authored
Merge pull request #12273 from JiayiFeng/update_py_reader
Some enhancement on readers
2 parents 9e0a94f + ea8a375 commit eec412b

File tree

3 files changed

+26
-10
lines changed

3 files changed

+26
-10
lines changed

paddle/fluid/operators/reader/buffered_reader.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818
namespace paddle {
1919
namespace operators {
2020
namespace reader {
21-
BufferedReader::~BufferedReader() { reader_->Shutdown(); }
21+
BufferedReader::~BufferedReader() {
22+
reader_->Shutdown();
23+
while (!position_.empty()) {
24+
position_.front().wait();
25+
position_.pop();
26+
}
27+
}
28+
2229
BufferedReader::BufferedReader(
2330
const std::shared_ptr<framework::ReaderBase> &reader,
2431
const platform::Place &place, size_t buffer_size)
@@ -30,12 +37,14 @@ BufferedReader::BufferedReader(
3037
gpu_buffer_.resize(buffer_size);
3138
ReadTillBufferFullAsync();
3239
}
40+
3341
void BufferedReader::ReadTillBufferFullAsync() {
3442
PADDLE_ENFORCE_EQ(position_.size(), 0U);
3543
for (size_t i = 0; i < buffer_size_; ++i) {
3644
ReadAsync(i);
3745
}
3846
}
47+
3948
void BufferedReader::ReadAsync(size_t i) {
4049
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
4150
TensorVec &cpu = cpu_buffer_[i];
@@ -56,17 +65,20 @@ void BufferedReader::ReadAsync(size_t i) {
5665
return i;
5766
}));
5867
}
68+
5969
void BufferedReader::ShutdownImpl() {
6070
reader_->Shutdown();
6171
while (!position_.empty()) {
6272
position_.pop();
6373
}
6474
prev_pos_ = -1UL;
6575
}
76+
6677
void BufferedReader::StartImpl() {
6778
reader_->Start();
6879
ReadTillBufferFullAsync();
6980
}
81+
7082
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
7183
if (position_.empty()) {
7284
out->clear();

python/paddle/fluid/layers/io.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def py_reader(capacity,
457457
use_double_buffer=True):
458458
"""
459459
Create a reader and blocking queue for data feeding in Python
460-
460+
461461
This layer returns a Reader Variable and a BlockingQueue.
462462
The BlockingQueue provides `push()` method to push a `LoDTensorArray`
463463
object into the queue in Python side. In C++ side, the Reader
@@ -478,7 +478,7 @@ def py_reader(capacity,
478478
Returns:
479479
tuple(Variable, BlockingQueue):
480480
A Reader Variable from which we can get feeding data.
481-
481+
482482
A BlockingQueue object for data feeding.
483483
484484
Examples:
@@ -491,15 +491,15 @@ def py_reader(capacity,
491491
dtypes=['float32', 'int64'])
492492
# Via the reader, we can use 'read_file' layer to get data:
493493
image, label = fluid.layers.read_file(reader)
494-
494+
495495
# Via the blocking queue, we can feed data using threads
496496
def feed_data(queue, feed_images, feed_labels):
497497
for feed_image, feed_label in zip(feed_images, feed_labels):
498498
data = core.LoDTensorArray()
499499
data.append(feed_image)
500500
data.append(feed_label)
501501
queue.push(data)
502-
502+
503503
thread = threading.Thread(target=feed_data, args=(queue, feed_images, feed_labels))
504504
thread.start()
505505
"""
@@ -579,6 +579,7 @@ def __provider_thread__():
579579
feed_queue.close()
580580

581581
reader.thread = threading.Thread(target=__provider_thread__)
582+
reader.thread.daemon = True
582583
reader.thread.start()
583584

584585
def __set_tensor_provider__(func):

python/paddle/fluid/tests/demo/pyreader.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def network(is_train):
2525
capacity=10,
2626
shapes=((-1, 784), (-1, 1)),
2727
dtypes=('float32', 'int64'),
28-
name="train_reader" if is_train else "test_reader")
28+
name="train_reader" if is_train else "test_reader",
29+
use_double_buffer=True)
2930
img, label = fluid.layers.read_file(reader)
3031

3132
hidden = img
@@ -56,14 +57,16 @@ def main():
5657
with fluid.unique_name.guard():
5758
test_loss, test_reader = network(False)
5859

59-
fluid.Executor(fluid.CUDAPlace(0)).run(startup_prog)
60-
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup)
60+
use_cuda = fluid.core.is_compiled_with_cuda()
61+
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
62+
fluid.Executor(place).run(startup_prog)
63+
fluid.Executor(place).run(test_startup)
6164

6265
trainer = fluid.ParallelExecutor(
63-
use_cuda=True, loss_name=loss.name, main_program=train_prog)
66+
use_cuda=use_cuda, loss_name=loss.name, main_program=train_prog)
6467

6568
tester = fluid.ParallelExecutor(
66-
use_cuda=True, share_vars_from=trainer, main_program=test_prog)
69+
use_cuda=use_cuda, share_vars_from=trainer, main_program=test_prog)
6770

6871
train_reader.decorate_paddle_reader(
6972
paddle.v2.reader.shuffle(

0 commit comments

Comments
 (0)