Skip to content

Commit b789a3a

Browse files
committed
Change code
1 parent 401e92f commit b789a3a

File tree

4 files changed

+28
-11
lines changed

4 files changed

+28
-11
lines changed

paddle/fluid/operators/reader/buffered_reader.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ BufferedReader::BufferedReader(
2828
buffer_size_(buffer_size) {
2929
cpu_buffer_.resize(buffer_size);
3030
gpu_buffer_.resize(buffer_size);
31-
AppendFutureToBatchSize();
31+
ReadTillBufferFullAsync();
3232
}
33-
void BufferedReader::AppendFutureToBatchSize() {
33+
void BufferedReader::ReadTillBufferFullAsync() {
3434
PADDLE_ENFORCE_EQ(position_.size(), 0U);
3535
for (size_t i = 0; i < buffer_size_; ++i) {
36-
AppendFuture(i);
36+
ReadAsync(i);
3737
}
3838
}
39-
void BufferedReader::AppendFuture(size_t i) {
39+
void BufferedReader::ReadAsync(size_t i) {
4040
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
4141
TensorVec &cpu = cpu_buffer_[i];
4242
reader_->ReadNext(&cpu);
@@ -50,6 +50,7 @@ void BufferedReader::AppendFuture(size_t i) {
5050
gpu.resize(cpu.size());
5151
for (size_t i = 0; i < cpu.size(); ++i) {
5252
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
53+
gpu[i].set_lod(cpu[i].lod());
5354
}
5455
}
5556
return i;
@@ -60,10 +61,11 @@ void BufferedReader::ShutdownImpl() {
6061
while (!position_.empty()) {
6162
position_.pop();
6263
}
64+
prev_pos_ = -1UL;
6365
}
6466
void BufferedReader::StartImpl() {
6567
reader_->Start();
66-
AppendFutureToBatchSize();
68+
ReadTillBufferFullAsync();
6769
}
6870
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
6971
if (position_.empty()) {
@@ -79,7 +81,14 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
7981
}
8082

8183
*out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i];
82-
AppendFuture(i);
84+
85+
// Do not push current position into ReadAsync. Push the previous position
86+
// Since all computation in fluid are async, change the data of
87+
// current position may cause data error.
88+
if (prev_pos_ != -1Ul) {
89+
ReadAsync(prev_pos_);
90+
}
91+
prev_pos_ = i;
8392
}
8493

8594
} // namespace reader

paddle/fluid/operators/reader/buffered_reader.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ class BufferedReader : public framework::DecoratedReader {
3535
~BufferedReader() override;
3636

3737
private:
38-
void AppendFutureToBatchSize();
38+
void ReadTillBufferFullAsync();
3939

40-
void AppendFuture(size_t i);
40+
void ReadAsync(size_t i);
4141

4242
protected:
4343
void ShutdownImpl() override;
@@ -50,8 +50,15 @@ class BufferedReader : public framework::DecoratedReader {
5050
const size_t buffer_size_;
5151

5252
std::queue<std::future<size_t>> position_;
53+
54+
// The buffer for reading data.
55+
// NOTE: the simplest way to implement buffered reader is do not use any
56+
// buffer, just async read and create futures as buffer size. However, to
57+
// malloc Tensor every time is extremely slow. Here we store all data in
58+
// buffers and prevent alloc every time.
5359
std::vector<TensorVec> cpu_buffer_;
5460
std::vector<TensorVec> gpu_buffer_;
61+
size_t prev_pos_{-1UL};
5562
};
5663

5764
} // namespace reader

python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ def main(self, use_thread=False):
4545
) else fluid.CPUPlace()
4646
executor = fluid.Executor(place)
4747

48-
data_file, feed_queue = fluid.layers.py_reader(
48+
data_file = fluid.layers.py_reader(
4949
capacity=self.capacity,
5050
dtypes=self.dtypes,
5151
lod_levels=self.lod_levels,
5252
shapes=self.shapes)
53-
53+
feed_queue = data_file.queue
5454
read_out_data = fluid.layers.read_file(data_file)
5555
self.inputs = []
5656

python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ def simple_fc_net(in_size,
5252
batch_size,
5353
queue_capacity,
5454
use_double_buffer=False):
55-
reader, feed_queue = fluid.layers.py_reader(
55+
reader = fluid.layers.py_reader(
5656
capacity=queue_capacity,
5757
shapes=[[-1, in_size], [-1, 1]],
5858
lod_levels=[0, 0],
5959
dtypes=['float32', 'int64'])
60+
feed_queue = reader.queue
6061
reader = fluid.layers.batch(reader, batch_size=batch_size)
6162
if use_double_buffer:
6263
reader = fluid.layers.double_buffer(reader)

0 commit comments

Comments
 (0)