Skip to content

Commit f9974a4

Browse files
committed
Make double_buffer reader async
1 parent a8c076e commit f9974a4

File tree

1 file changed

+42
-17
lines changed

1 file changed

+42
-17
lines changed

paddle/fluid/operators/reader/create_double_buffer_reader_op.cc

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,31 @@ static constexpr size_t kDoubleBufferSize = 2;
2424

2525
class DoubleBufferReader : public framework::DecoratedReader {
2626
public:
27+
struct Item {
28+
Item() : ctx_(nullptr) {}
29+
30+
std::vector<framework::LoDTensor> payloads_;
31+
platform::DeviceContext* ctx_;
32+
};
33+
2734
explicit DoubleBufferReader(
2835
ReaderBase* reader, platform::Place target_place = platform::CPUPlace())
2936
: DecoratedReader(reader), place_(target_place) {
37+
for (size_t i = 0; i < kDoubleBufferSize; ++i) {
38+
if (platform::is_gpu_place(place_)) {
39+
#ifdef PADDLE_WITH_CUDA
40+
ctxs_.emplace_back(new platform::CUDADeviceContext(
41+
boost::get<platform::CUDAPlace>(place_)));
42+
#else
43+
#endif
44+
}
45+
}
46+
3047
start_thread();
3148
}
3249

3350
void start_thread() {
34-
buffer_ = framework::MakeChannel<std::vector<framework::LoDTensor>>(
35-
kDoubleBufferSize);
51+
buffer_ = framework::MakeChannel<Item>(kDoubleBufferSize);
3652
std::thread prefetch([this] { PrefetchThreadFunc(); });
3753
prefetch.detach();
3854
}
@@ -47,9 +63,10 @@ class DoubleBufferReader : public framework::DecoratedReader {
4763
private:
4864
void PrefetchThreadFunc();
4965

50-
framework::Channel<std::vector<framework::LoDTensor>>* buffer_;
66+
framework::Channel<Item>* buffer_;
5167
platform::Place place_;
52-
mutable std::vector<framework::LoDTensor> local_buffer_;
68+
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
69+
mutable Item local_buffer_;
5370
};
5471

5572
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
@@ -104,12 +121,14 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
104121
};
105122

106123
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
107-
out->clear();
108-
if (local_buffer_.empty()) {
109-
buffer_->Receive(out);
110-
} else {
111-
*out = local_buffer_;
112-
local_buffer_.clear();
124+
if (local_buffer_.payloads_.empty()) {
125+
buffer_->Receive(&local_buffer_);
126+
}
127+
128+
*out = local_buffer_.payloads_;
129+
local_buffer_.payloads_.clear();
130+
if (local_buffer_.ctx_) {
131+
local_buffer_.ctx_->Wait();
113132
}
114133
}
115134

@@ -121,16 +140,22 @@ void DoubleBufferReader::ReInit() {
121140

122141
void DoubleBufferReader::PrefetchThreadFunc() {
123142
VLOG(5) << "A new prefetch thread starts.";
143+
size_t gpu_ctx_offset = 0;
124144
while (reader_->HasNext()) {
125-
std::vector<framework::LoDTensor> batch;
126-
reader_->ReadNext(&batch);
145+
Item batch;
146+
reader_->ReadNext(&batch.payloads_);
127147
if (platform::is_gpu_place(place_)) {
128148
std::vector<framework::LoDTensor> gpu_batch;
129-
gpu_batch.resize(batch.size());
130-
for (size_t i = 0; i < batch.size(); ++i) {
131-
framework::TensorCopy(batch[i], place_, &gpu_batch[i]);
132-
gpu_batch[i].set_lod(batch[i].lod());
149+
auto& gpu_ctx = this->ctxs_[gpu_ctx_offset++];
150+
gpu_ctx_offset %= this->ctxs_.size();
151+
gpu_batch.resize(batch.payloads_.size());
152+
for (size_t i = 0; i < batch.payloads_.size(); ++i) {
153+
framework::TensorCopy(batch.payloads_[i], place_, *gpu_ctx,
154+
&gpu_batch[i]);
155+
gpu_batch[i].set_lod(batch.payloads_[i].lod());
133156
}
157+
batch.ctx_ = gpu_ctx.get();
158+
std::swap(gpu_batch, batch.payloads_);
134159
}
135160

136161
if (!buffer_->Send(&batch)) {
@@ -143,7 +168,7 @@ void DoubleBufferReader::PrefetchThreadFunc() {
143168
}
144169

145170
bool DoubleBufferReader::HasNext() const {
146-
if (local_buffer_.empty()) {
171+
if (local_buffer_.payloads_.empty()) {
147172
bool ok = buffer_->Receive(&local_buffer_);
148173
return ok;
149174
} else {

0 commit comments

Comments
 (0)