18
18
#include " ThreadPool.h"
19
19
#include " paddle/fluid/framework/blocking_queue.h"
20
20
#include " paddle/fluid/operators/reader/blocking_queue.h"
21
+ #include " paddle/fluid/operators/reader/buffered_reader.h"
21
22
#include " paddle/fluid/operators/reader/reader_op_registry.h"
22
23
23
24
namespace paddle {
@@ -232,12 +233,17 @@ class OpenFilesOp : public framework::OperatorBase {
232
233
container.reset (new OrderedReaderContainer ());
233
234
} else {
234
235
container.reset (new PreemptiveReaderContainer (
235
- std::min (file_names.size (),
236
- static_cast <size_t >(std::thread::hardware_concurrency ()))));
236
+ static_cast <size_t >(Attr<int >(" thread_num" ))));
237
237
}
238
238
239
- out->Reset (
240
- std::make_shared<MultiFileReader>(file_names, std::move (container)));
239
+ std::shared_ptr<framework::ReaderBase> reader (
240
+ new MultiFileReader (file_names, std::move (container)));
241
+ auto buffer_size = Attr<int >(" buffer_size" );
242
+ if (buffer_size > 1 ) {
243
+ reader = framework::MakeDecoratedReader<BufferedReader>(
244
+ reader, platform::CPUPlace (), buffer_size);
245
+ }
246
+ out->Reset (reader);
241
247
}
242
248
};
243
249
@@ -253,6 +259,8 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
253
259
An OpenFilesOp creates a MultiFileReader, which is able to
254
260
read data multi-threaded from multiple files.
255
261
)DOC" );
262
+ AddAttr<int >(" thread_num" , " Number of thread to read files." );
263
+ AddAttr<int >(" buffer_size" , " The reading buffer of these files." );
256
264
}
257
265
};
258
266
0 commit comments