Skip to content

Commit b063093

Browse files
authored
Merge pull request #12149 from reyoung/feature/combine_open_files_and_double_buffer
Change and polish readers
2 parents a3ac54b + 8c3cd42 commit b063093

26 files changed

+668
-329
lines changed

paddle/fluid/API.spec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,13 @@ paddle.fluid.layers.log ArgSpec(args=['x'], varargs=None, keywords=None, default
180180
paddle.fluid.layers.crop ArgSpec(args=['x', 'shape', 'offsets', 'name'], varargs=None, keywords=None, defaults=(None, None, None))
181181
paddle.fluid.layers.data ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True))
182182
paddle.fluid.layers.open_recordio_file ArgSpec(args=['filename', 'shapes', 'lod_levels', 'dtypes', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, True))
183-
paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'for_parallel'], varargs=None, keywords=None, defaults=(1, None, 1, True))
183+
paddle.fluid.layers.open_files ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None))
184184
paddle.fluid.layers.read_file ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None)
185185
paddle.fluid.layers.shuffle ArgSpec(args=['reader', 'buffer_size'], varargs=None, keywords=None, defaults=None)
186186
paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None)
187187
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
188188
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
189-
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels'], varargs=None, keywords=None, defaults=(None,))
189+
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
190190
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
191191
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
192192
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ void ThreadedSSAGraphExecutor::InsertFetchOps(
171171

172172
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
173173
auto &var_name = fetch_tensors[i];
174-
auto &vars = fetched_vars.at(var_name);
174+
auto fetched_var_it = fetched_vars.find(var_name);
175+
PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(),
176+
"Cannot find fetched variable.(Perhaps the main_program "
177+
"is not set to ParallelExecutor)");
178+
179+
auto &vars = fetched_var_it->second;
175180

176181
temp_nodes->emplace_back(new ir::Node("fetch", ir::Node::Type::kOperation));
177182
auto *op = new FetchOpHandle(temp_nodes->back().get(), fetch_data, i,

paddle/fluid/framework/lod_tensor.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -312,19 +312,22 @@ void WriteToRecordIO(recordio::Writer *writer,
312312
writer->Write(buffer.str());
313313
}
314314

315-
std::vector<LoDTensor> ReadFromRecordIO(
316-
recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) {
317-
std::vector<LoDTensor> result;
318-
if (scanner->HasNext()) {
319-
std::istringstream sin(scanner->Next());
320-
uint32_t sz;
321-
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
322-
result.resize(sz);
323-
for (uint32_t i = 0; i < sz; ++i) {
324-
DeserializeFromStream(sin, &result[i], dev_ctx);
325-
}
315+
bool ReadFromRecordIO(recordio::Scanner *scanner,
316+
const platform::DeviceContext &dev_ctx,
317+
std::vector<LoDTensor> *result_ptr) {
318+
if (!scanner->HasNext()) {
319+
return false;
326320
}
327-
return result;
321+
std::istringstream sin(scanner->Next());
322+
uint32_t sz;
323+
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
324+
auto &result = *result_ptr;
325+
result.resize(sz);
326+
for (uint32_t i = 0; i < sz; ++i) {
327+
DeserializeFromStream(sin, &result[i], dev_ctx);
328+
}
329+
330+
return true;
328331
}
329332

330333
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(

paddle/fluid/framework/lod_tensor.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,9 @@ extern void WriteToRecordIO(recordio::Writer* writer,
223223
const std::vector<LoDTensor>& tensor,
224224
const platform::DeviceContext& dev_ctx);
225225

226-
extern std::vector<LoDTensor> ReadFromRecordIO(
227-
recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx);
226+
extern bool ReadFromRecordIO(recordio::Scanner* scanner,
227+
const platform::DeviceContext& dev_ctx,
228+
std::vector<LoDTensor>* result_ptr);
228229

229230
/*
230231
* Convert between length-based LoD and offset-based LoD.

paddle/fluid/framework/lod_tensor_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,11 +301,12 @@ static void TestRecordIO() {
301301
{
302302
std::unique_ptr<std::istream> stream_ptr(stream);
303303
recordio::Scanner scanner(std::move(stream_ptr));
304-
auto tensors = ReadFromRecordIO(&scanner, ctx);
304+
std::vector<framework::LoDTensor> tensors;
305+
ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors));
305306
ASSERT_EQ(tensors.size(), static_cast<size_t>(2));
306307
assert_tensor_ok(tensors[0]);
307308
assert_tensor_ok(tensors[1]);
308-
tensors = ReadFromRecordIO(&scanner, ctx);
309+
ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors));
309310
ASSERT_EQ(tensors.size(), static_cast<size_t>(2));
310311
assert_tensor_ok(tensors[0]);
311312
assert_tensor_ok(tensors[1]);

paddle/fluid/framework/reader.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ void ReaderBase::Start() {
6767
}
6868
}
6969

70-
ReaderBase::~ReaderBase() { Shutdown(); }
70+
ReaderBase::~ReaderBase() {}
7171

72+
DecoratedReader::~DecoratedReader() { reader_->Shutdown(); }
7273
} // namespace framework
7374
} // namespace paddle

paddle/fluid/framework/reader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
namespace paddle {
2626
namespace framework {
2727

28-
enum ReaderStatus { kRunning, kStopped };
29-
3028
class ReaderBase {
3129
public:
3230
virtual void ReadNext(std::vector<LoDTensor>* out);
@@ -48,6 +46,8 @@ class ReaderBase {
4846

4947
virtual void StartImpl() {}
5048

49+
enum ReaderStatus { kRunning, kStopped };
50+
5151
ReaderStatus status_{kRunning};
5252

5353
mutable std::mutex mu_;
@@ -74,6 +74,8 @@ class DecoratedReader : public ReaderBase,
7474
reader_->InsertDecoratedReader(shared_from_this());
7575
}
7676

77+
~DecoratedReader();
78+
7779
protected:
7880
void ShutdownImpl() override { reader_->Shutdown(); }
7981

paddle/fluid/operators/reader/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ function(reader_library TARGET_NAME)
1515
PARENT_SCOPE)
1616
endfunction()
1717

18-
reader_library(open_files_op SRCS open_files_op.cc)
18+
cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool)
19+
reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader)
1920
reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc)
2021
reader_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc)
2122
reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc)
2223
reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc)
23-
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
24+
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS buffered_reader)
2425
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
2526
reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc)
2627
reader_library(create_py_reader_op SRCS create_py_reader_op.cc)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/operators/reader/buffered_reader.h"
16+
#include <vector>
17+
18+
namespace paddle {
19+
namespace operators {
20+
namespace reader {
21+
BufferedReader::~BufferedReader() { reader_->Shutdown(); }
22+
BufferedReader::BufferedReader(
23+
const std::shared_ptr<framework::ReaderBase> &reader,
24+
const platform::Place &place, size_t buffer_size)
25+
: framework::DecoratedReader(reader),
26+
thread_pool_(1),
27+
place_(place),
28+
buffer_size_(buffer_size) {
29+
cpu_buffer_.resize(buffer_size);
30+
gpu_buffer_.resize(buffer_size);
31+
ReadTillBufferFullAsync();
32+
}
33+
void BufferedReader::ReadTillBufferFullAsync() {
34+
PADDLE_ENFORCE_EQ(position_.size(), 0U);
35+
for (size_t i = 0; i < buffer_size_; ++i) {
36+
ReadAsync(i);
37+
}
38+
}
39+
void BufferedReader::ReadAsync(size_t i) {
40+
position_.emplace(thread_pool_.enqueue([this, i]() -> size_t {
41+
TensorVec &cpu = cpu_buffer_[i];
42+
reader_->ReadNext(&cpu);
43+
44+
if (cpu.empty()) {
45+
return -1UL;
46+
}
47+
48+
if (platform::is_gpu_place(place_)) {
49+
TensorVec &gpu = gpu_buffer_[i];
50+
gpu.resize(cpu.size());
51+
for (size_t i = 0; i < cpu.size(); ++i) {
52+
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
53+
gpu[i].set_lod(cpu[i].lod());
54+
}
55+
}
56+
return i;
57+
}));
58+
}
59+
void BufferedReader::ShutdownImpl() {
60+
reader_->Shutdown();
61+
while (!position_.empty()) {
62+
position_.pop();
63+
}
64+
prev_pos_ = -1UL;
65+
}
66+
void BufferedReader::StartImpl() {
67+
reader_->Start();
68+
ReadTillBufferFullAsync();
69+
}
70+
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
71+
if (position_.empty()) {
72+
out->clear();
73+
return;
74+
}
75+
size_t i = position_.front().get();
76+
position_.pop();
77+
78+
if (i == -1UL) {
79+
ReadNextImpl(out);
80+
return;
81+
}
82+
83+
*out = platform::is_gpu_place(place_) ? gpu_buffer_[i] : cpu_buffer_[i];
84+
85+
// Do not push current position into ReadAsync. Push the previous position
86+
// Since all computation in fluid are async, change the data of
87+
// current position may cause data error.
88+
if (prev_pos_ != -1Ul) {
89+
ReadAsync(prev_pos_);
90+
}
91+
prev_pos_ = i;
92+
}
93+
94+
} // namespace reader
95+
} // namespace operators
96+
} // namespace paddle
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <list>
18+
#include <queue>
19+
#include <vector>
20+
#include "ThreadPool.h"
21+
#include "paddle/fluid/framework/reader.h"
22+
23+
namespace paddle {
24+
namespace operators {
25+
namespace reader {
26+
27+
class BufferedReader : public framework::DecoratedReader {
28+
using TensorVec = std::vector<framework::LoDTensor>;
29+
using VecFuture = std::future<TensorVec>;
30+
31+
public:
32+
BufferedReader(const std::shared_ptr<framework::ReaderBase>& reader,
33+
const platform::Place& place, size_t buffer_size);
34+
35+
~BufferedReader() override;
36+
37+
private:
38+
void ReadTillBufferFullAsync();
39+
40+
void ReadAsync(size_t i);
41+
42+
protected:
43+
void ShutdownImpl() override;
44+
void StartImpl() override;
45+
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override;
46+
47+
private:
48+
ThreadPool thread_pool_;
49+
platform::Place place_;
50+
const size_t buffer_size_;
51+
52+
std::queue<std::future<size_t>> position_;
53+
54+
// The buffer for reading data.
55+
// NOTE: the simplest way to implement buffered reader is do not use any
56+
// buffer, just read async and create futures as buffer size. However, to
57+
// malloc tensors every time is extremely slow. Here we store all data in
58+
// buffers and prevent alloc every time.
59+
std::vector<TensorVec> cpu_buffer_;
60+
std::vector<TensorVec> gpu_buffer_;
61+
size_t prev_pos_{-1UL};
62+
};
63+
64+
} // namespace reader
65+
} // namespace operators
66+
} // namespace paddle

0 commit comments

Comments
 (0)