Skip to content

Commit e576345

Browse files
committed
Try to speed up buffered reader
1 parent 61b3a59 commit e576345

File tree

2 files changed

+43
-25
lines changed

2 files changed

+43
-25
lines changed

paddle/fluid/operators/reader/buffered_reader.cc

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,68 @@
1818
namespace paddle {
1919
namespace operators {
2020
namespace reader {
21-
BufferedReader::~BufferedReader() {
22-
reader_->Shutdown();
23-
buffer_.clear();
24-
}
21+
BufferedReader::~BufferedReader() { reader_->Shutdown(); }
2522
BufferedReader::BufferedReader(
2623
const std::shared_ptr<framework::ReaderBase> &reader,
2724
const platform::Place &place, size_t buffer_size)
2825
: framework::DecoratedReader(reader),
2926
thread_pool_(1),
3027
place_(place),
3128
buffer_size_(buffer_size) {
29+
cpu_buffer_.resize(buffer_size);
30+
gpu_buffer_.resize(buffer_size);
3231
AppendFutureToBatchSize();
3332
}
3433
void BufferedReader::AppendFutureToBatchSize() {
35-
while (buffer_.size() < buffer_size_) {
36-
AppendFuture();
34+
PADDLE_ENFORCE_EQ(position_.size(), 0U);
35+
for (size_t i = 0; i < buffer_size_; ++i) {
36+
AppendFuture(i);
3737
}
3838
}
39-
void BufferedReader::AppendFuture() {
40-
buffer_.emplace_back(thread_pool_.enqueue([this] {
41-
TensorVec cpu_buffer;
42-
reader_->ReadNext(&cpu_buffer);
43-
if (platform::is_gpu_place(place_)) {
44-
TensorVec gpu_buffer;
39+
void BufferedReader::AppendFuture(size_t i) {
40+
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
41+
TensorVec &cpu = cpu_buffer_[i];
42+
reader_->ReadNext(&cpu);
4543

46-
for (size_t i = 0; i < cpu_buffer.size(); ++i) {
47-
gpu_buffer.emplace_back();
48-
framework::TensorCopySync(cpu_buffer[i], place_, &gpu_buffer.back());
49-
}
44+
if (cpu.empty()) {
45+
return -1UL;
46+
}
5047

51-
cpu_buffer = gpu_buffer;
48+
if (platform::is_gpu_place(place_)) {
49+
TensorVec &gpu = gpu_buffer_[i];
50+
gpu.resize(cpu.size());
51+
for (size_t i = 0; i < cpu.size(); ++i) {
52+
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
53+
}
5254
}
53-
return cpu_buffer;
55+
return i;
5456
}));
5557
}
5658
void BufferedReader::ShutdownImpl() {
5759
reader_->Shutdown();
58-
buffer_.clear();
60+
while (!position_.empty()) {
61+
position_.pop();
62+
}
5963
}
6064
void BufferedReader::StartImpl() {
6165
reader_->Start();
6266
AppendFutureToBatchSize();
6367
}
6468
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
65-
PADDLE_ENFORCE_EQ(buffer_.size(), buffer_size_);
66-
*out = buffer_.front().get();
67-
buffer_.pop_front();
68-
AppendFuture();
69+
if (position_.empty()) {
70+
out->clear();
71+
return;
72+
}
73+
size_t i = position_.front().get();
74+
position_.pop();
75+
76+
if (i == -1UL) {
77+
ReadNextImpl(out);
78+
return;
79+
}
80+
81+
*out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i];
82+
AppendFuture(i);
6983
}
7084

7185
} // namespace reader

paddle/fluid/operators/reader/buffered_reader.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#pragma once
1616

1717
#include <list>
18+
#include <queue>
1819
#include <vector>
1920
#include "ThreadPool.h"
2021
#include "paddle/fluid/framework/reader.h"
@@ -36,7 +37,7 @@ class BufferedReader : public framework::DecoratedReader {
3637
private:
3738
void AppendFutureToBatchSize();
3839

39-
void AppendFuture();
40+
void AppendFuture(size_t i);
4041

4142
protected:
4243
void ShutdownImpl() override;
@@ -47,7 +48,10 @@ class BufferedReader : public framework::DecoratedReader {
4748
ThreadPool thread_pool_;
4849
platform::Place place_;
4950
const size_t buffer_size_;
50-
std::list<VecFuture> buffer_;
51+
52+
std::queue<std::future<size_t>> position_;
53+
std::vector<TensorVec> cpu_buffer_;
54+
std::vector<TensorVec> gpu_buffer_;
5155
};
5256

5357
} // namespace reader

0 commit comments

Comments
 (0)