Skip to content

Commit 4fb7b96

Browse files
committed
Add basic double buffer reader
1 parent 77200a7 commit 4fb7b96

File tree

4 files changed

+175
-24
lines changed

4 files changed

+175
-24
lines changed

paddle/fluid/framework/reader.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616

1717
#include "paddle/fluid/framework/ddim.h"
1818
#include "paddle/fluid/framework/lod_tensor_array.h"
19-
#include "paddle/fluid/framework/threadpool.h"
2019

2120
namespace paddle {
2221
namespace framework {
2322

24-
static constexpr size_t kDoubleBufferSize = 3;
25-
2623
class ReaderBase {
2724
public:
2825
explicit ReaderBase(const std::vector<DDim>& shapes) : shapes_(shapes) {

paddle/fluid/operators/reader/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ cc_library(reader_op_registry SRCS reader_op_registry.cc DEPS operator op_regist
22
op_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc DEPS reader_op_registry)
33
op_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc DEPS reader_op_registry)
44
op_library(create_batch_reader_op SRCS create_batch_reader_op.cc DEPS reader_op_registry)
5-
set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op PARENT_SCOPE)
5+
op_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS reader_op_registry threadpool)
6+
set(READER_LIBRARY create_random_data_generator_op create_shuffle_reader_op create_batch_reader_op create_double_buffer_reader_op PARENT_SCOPE)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/framework/threadpool.h"
16+
#include "paddle/fluid/operators/reader/reader_op_registry.h"
17+
18+
namespace paddle {
19+
namespace operators {
20+
namespace reader {
21+
22+
static constexpr size_t kDoubleBufferSize = 3;
23+
24+
class DoubleBufferReader : public framework::DecoratedReader {
25+
public:
26+
explicit DoubleBufferReader(ReaderBase* reader)
27+
: DecoratedReader(reader),
28+
buffer_(kDoubleBufferSize),
29+
write_pos_(0),
30+
read_pos_(0) {
31+
std::thread prefetch(
32+
std::bind(&DoubleBufferReader::PrefetchThreadFunc, this));
33+
prefetch.detach();
34+
// framework::Async(
35+
// std::bind(&DoubleBufferReader::PrefetchThreadFunc, this));
36+
}
37+
38+
void ReadNext(std::vector<framework::LoDTensor>* out) override;
39+
bool HasNext() const override;
40+
41+
private:
42+
void PrefetchThreadFunc();
43+
44+
std::vector<std::vector<framework::LoDTensor>> buffer_;
45+
size_t write_pos_;
46+
size_t read_pos_;
47+
48+
std::mutex mtx_;
49+
std::condition_variable buffer_not_full_;
50+
std::condition_variable buffer_not_empty_;
51+
};
52+
53+
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
54+
public:
55+
using framework::OperatorBase::OperatorBase;
56+
57+
private:
58+
void RunImpl(const framework::Scope& scope,
59+
const platform::Place& dev_place) const override {
60+
const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
61+
->Get<framework::ReaderHolder>();
62+
auto* out = scope.FindVar(Output("Out"))
63+
->template GetMutable<framework::ReaderHolder>();
64+
out->Reset(new DoubleBufferReader(underlying_reader.Get()));
65+
}
66+
};
67+
68+
class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
69+
public:
70+
CreateDoubleBufferReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker)
71+
: DecoratedReaderMakerBase(op_proto, op_checker) {
72+
AddComment(R"DOC(
73+
CreateDoubleBufferReader Operator
74+
75+
A double buffer reader takes another reader as its 'underlying reader'.
76+
It launches another thread to execute the 'underlying reader' asynchronously,
77+
which prevents reading process from blocking subsequent training.
78+
)DOC");
79+
}
80+
};
81+
82+
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
83+
std::unique_lock<std::mutex> lck(mtx_);
84+
while (write_pos_ == read_pos_) {
85+
buffer_not_empty_.wait(lck);
86+
}
87+
88+
out->clear();
89+
out->reserve(buffer_[read_pos_].size());
90+
// TODO(fengjiayi): This copy shall be reduced.
91+
for (size_t i = 0; i < buffer_[read_pos_].size(); ++i) {
92+
framework::LoDTensor dst;
93+
TensorCopy(buffer_[read_pos_][i], platform::CPUPlace(), &dst);
94+
dst.set_lod(buffer_[read_pos_][i].lod());
95+
out->push_back(dst);
96+
}
97+
98+
++read_pos_;
99+
if (read_pos_ >= kDoubleBufferSize) {
100+
read_pos_ = 0;
101+
}
102+
buffer_not_full_.notify_all();
103+
}
104+
105+
bool DoubleBufferReader::HasNext() const {
106+
return reader_->HasNext() || !buffer_.empty();
107+
}
108+
109+
void DoubleBufferReader::PrefetchThreadFunc() {
110+
while (reader_->HasNext()) {
111+
std::unique_lock<std::mutex> lck(mtx_);
112+
while (((write_pos_ + 1) % kDoubleBufferSize) == read_pos_) {
113+
buffer_not_full_.wait(lck);
114+
}
115+
reader_->ReadNext(&buffer_[write_pos_]);
116+
++write_pos_;
117+
if (write_pos_ >= kDoubleBufferSize) {
118+
write_pos_ = 0;
119+
}
120+
buffer_not_empty_.notify_all();
121+
}
122+
}
123+
124+
} // namespace reader
125+
} // namespace operators
126+
} // namespace paddle
127+
128+
namespace ops = paddle::operators::reader;
129+
REGISTER_DECORATED_READER_OPERATOR(create_double_buffer_reader,
130+
ops::CreateDoubleBufferReaderOp,
131+
ops::CreateDoubleBufferReaderOpMaker);

python/paddle/fluid/tests/test_cpp_reader.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,30 @@
1515
import paddle.v2 as paddle
1616
import paddle.fluid as fluid
1717
import numpy as np
18+
import sys
1819

19-
prog = fluid.framework.Program()
20-
block = prog.current_block()
20+
startup_prog = fluid.framework.Program()
21+
startup_block = startup_prog.current_block()
2122

22-
random_reader = block.create_var(
23+
random_reader = startup_block.create_var(
2324
type=fluid.core.VarDesc.VarType.READER, name="RandomDataGenerator")
2425
random_reader.desc.set_dtypes(
2526
[fluid.core.VarDesc.VarType.FP32, fluid.core.VarDesc.VarType.FP32])
27+
random_reader.persistable = True
28+
shuffle_reader = startup_block.create_var(
29+
type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader")
30+
shuffle_reader.persistable = True
31+
batch_reader = startup_block.create_var(
32+
type=fluid.core.VarDesc.VarType.READER, name="BatchReader")
33+
batch_reader.persistable = True
34+
double_buffer = startup_block.create_var(
35+
type=fluid.core.VarDesc.VarType.READER, name="DoubleBuffer")
36+
double_buffer.persistable = True
37+
38+
main_prog = startup_prog.clone()
39+
main_block = main_prog.current_block()
2640

27-
create_random_data_generator_op = block.append_op(
41+
create_random_data_generator_op = startup_block.append_op(
2842
type="create_random_data_generator",
2943
outputs={"Out": random_reader},
3044
attrs={
@@ -34,37 +48,45 @@
3448
"max": 1.0,
3549
'lod_levels': [0, 0]
3650
})
37-
shuffle_reader = block.create_var(
38-
type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader")
3951

40-
create_shuffle_reader_op = block.append_op(
52+
create_shuffle_reader_op = startup_block.append_op(
4153
type="create_shuffle_reader",
4254
inputs={"UnderlyingReader": random_reader},
4355
outputs={"Out": shuffle_reader},
4456
attrs={"buffer_size": 7})
4557

46-
batch_reader = block.create_var(
47-
type=fluid.core.VarDesc.VarType.READER, name="BatchReader")
48-
49-
create_batch_reader_op = block.append_op(
58+
create_batch_reader_op = startup_block.append_op(
5059
type="create_batch_reader",
5160
inputs={"UnderlyingReader": shuffle_reader},
5261
outputs={"Out": batch_reader},
5362
attrs={"batch_size": 10})
5463

55-
out1 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1")
56-
out2 = block.create_var(type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2")
64+
create_double_buffer_reader_op = startup_block.append_op(
65+
type="create_double_buffer_reader",
66+
inputs={"UnderlyingReader": batch_reader},
67+
outputs={"Out": double_buffer})
68+
69+
out1 = main_block.create_var(
70+
type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1")
71+
out2 = main_block.create_var(
72+
type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2")
5773

58-
read_op = block.append_op(
59-
type="read", inputs={"Reader": batch_reader},
74+
main_block.var("DoubleBuffer").desc.set_shapes(double_buffer.desc.shapes())
75+
main_block.var("DoubleBuffer").desc.set_dtypes(double_buffer.desc.dtypes())
76+
main_block.var("DoubleBuffer").desc.set_lod_levels(
77+
double_buffer.desc.lod_levels())
78+
79+
read_op = main_block.append_op(
80+
type="read",
81+
inputs={"Reader": double_buffer},
6082
outputs={"Out": [out1, out2]})
6183

6284
place = fluid.CPUPlace()
6385
exe = fluid.Executor(place)
6486

65-
[res1, res2] = exe.run(prog, fetch_list=[out1, out2])
66-
67-
if not (res1.shape == (10, 2) and res2.shape == (10, 1)):
68-
exit(1)
87+
exe.run(startup_prog)
6988

70-
exit(0)
89+
for i in range(1, 100):
90+
[res1, res2] = exe.run(main_prog, fetch_list=[out1, out2])
91+
if not (res1.shape == (10, 2) and res2.shape == (10, 1)):
92+
exit(1)

0 commit comments

Comments
 (0)