Skip to content

Commit e05abab

Browse files
committed
use recordio in dist train
1 parent ccf61b3 commit e05abab

File tree

6 files changed

+178
-17
lines changed

6 files changed

+178
-17
lines changed

doc/v2/howto/recordio/README.md

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# How to use RecordIO in Fluid
2+
3+
If you want to use RecordIO as your training data format, you need to convert to your training data
4+
to RecordIO files and reading them in the process of training, PaddlePaddle Fluid provides some
5+
interface to deal with the RecordIO files.
6+
7+
## Generate RecordIO File
8+
9+
Before start training with RecordIO files, you need to convert your training data
10+
to RecordIO format by `fluid.recordio_writer.convert_reader_to_recordio_file`, the sample codes
11+
as follows:
12+
13+
```python
14+
reader = paddle.batch(mnist.train(), batch_size=1)
15+
feeder = fluid.DataFeeder(
16+
feed_list=[ # order is image and label
17+
fluid.layers.data(
18+
name='image', shape=[784]),
19+
fluid.layers.data(
20+
name='label', shape=[1], dtype='int64'),
21+
],
22+
place=fluid.CPUPlace())
23+
fluid.recordio_writer.convert_reader_to_recordio_file('./mnist.recordio', reader, feeder)
24+
```
25+
26+
The above codes would generate a RecordIO `./mnist.recordio` on your host.
27+
28+
## Use the RecordIO file in a Local Training Job
29+
30+
PaddlePaddle Fluid provides an interface `fluid.layers.io.open_recordio_file` to load your RecordIO file
31+
and then you can use them as a Layer in your network configuration, the sample codes as follows:
32+
33+
```python
34+
data_file = fluid.layers.io.open_recordio_file(
35+
filename="./mnist.recordio",
36+
shapes=[(-1, 784),(-1, 1)],
37+
lod_levels=[0, 0],
38+
dtypes=["float32", "int32"])
39+
data_file = fluid.layers.io.batch(data_file, batch_size=4)
40+
41+
img, label = fluid.layers.io.read_file(data_file)
42+
hidden = fluid.layers.fc(input=img, size=100, act='tanh')
43+
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
44+
loss = fluid.layers.cross_entropy(input=prediction, label=label)
45+
avg_loss = fluid.layers.mean(loss)
46+
47+
fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss)
48+
49+
place = fluid.CPUPlace()
50+
51+
exe = fluid.Executor(place)
52+
exe.run(fluid.default_startup_program())
53+
avg_loss_np = []
54+
55+
# train a pass
56+
batch_id = 0
57+
while True:
58+
tmp, = exe.run(fetch_list=[avg_loss])
59+
60+
avg_loss_np.append(tmp)
61+
print(batch_id)
62+
batch_id += 1
63+
```
64+
65+
## Use the RecordIO files in Distributed Training
66+
67+
1. generate multiple RecordIO files
68+
69+
For a distributed training job, you may have multiple trainer nodes,
70+
and one or more RecordIO files for one trainer node, you can use the interface
71+
`fluid.recordio_writer.convert_reader_to_recordio_files` to convert your training data
72+
into multiple RecordIO files, the sample codes as follows:
73+
74+
```python
75+
reader = paddle.batch(mnist.train(), batch_size=1)
76+
feeder = fluid.DataFeeder(
77+
feed_list=[ # order is image and label
78+
fluid.layers.data(
79+
name='image', shape=[784]),
80+
fluid.layers.data(
81+
name='label', shape=[1], dtype='int64'),
82+
],
83+
place=fluid.CPUPlace())
84+
fluid.recordio_writer.convert_reader_to_recordio_files(
85+
filename_suffix='./mnist.recordio', batch_per_file=100, reader, feeder)
86+
```
87+
88+
The above codes would generate multiple RecordIO files on your host like:
89+
90+
```bash
91+
.
92+
\_mnist.recordio-00000
93+
|-mnist.recordio-00001
94+
|-mnist.recordio-00002
95+
|-mnist.recordio-00003
96+
|-mnist.recordio-00004
97+
```
98+
99+
1. read these RecordIO files with `fluid.layers.io.open_recordio_file`
100+
101+
For a distributed training job, the distributed operator system will schedule trainer process on multiple nodes,
102+
each trainer process reads parts of the whole training data, we usually take the following approach to make the training
103+
data allocated by each trainer process as uniform as possiable:
104+
105+
```python
106+
def gen_train_list(file_pattern, trainers, trainer_id):
107+
file_list = glob.glob(file_pattern)
108+
ret_list = []
109+
for idx, f in enumerate(file_list):
110+
if (idx + trainers) % trainers == trainer_id:
111+
ret_list.append(f)
112+
return ret_list
113+
114+
trainers = int(os.getenv("TRAINERS"))
115+
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
116+
data_file = fluid.layers.io.open_recordio_file(
117+
filename=gen_train_list("./mnist.recordio*", trainers, trainer_id),
118+
shapes=[(-1, 784),(-1, 1)],
119+
lod_levels=[0, 0],
120+
dtypes=["float32", "int32"])
121+
data_file = fluid.layers.io.batch(data_file, batch_size=4)
122+
```

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,11 @@ void ThreadedSSAGraphExecutor::RunOp(
189189
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
190190
auto op_run = [ready_var_q, op, this] {
191191
try {
192-
VLOG(10) << op << " " << op->Name() << " : " << op->DebugString();
192+
VLOG(10) << "PE start "
193+
<< " " << op->Name() << " : " << op->DebugString();
193194
op->Run(strategy_.use_event_);
194-
VLOG(10) << op << " " << op->Name() << " Done ";
195+
VLOG(10) << "PE end "
196+
<< " " << op->Name() << " Done ";
195197
running_ops_--;
196198
ready_var_q->Extend(op->Outputs());
197199
VLOG(10) << op << " " << op->Name() << "Signal posted";

paddle/fluid/operators/reader/create_recordio_file_reader_op.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,22 @@ class CreateRecordIOReaderOp : public framework::OperatorBase {
6565
static_cast<int>(shape_concat.size()),
6666
"The accumulate of all ranks should be equal to the "
6767
"shape concat's length.");
68-
std::string filename = Attr<std::string>("filename");
68+
auto filenames = Attr<std::vector<std::string>>("filenames");
6969

7070
auto* out = scope.FindVar(Output("Out"))
7171
->template GetMutable<framework::ReaderHolder>();
72-
73-
out->Reset(new RecordIOFileReader<true>(
74-
filename, RestoreShapes(shape_concat, ranks)));
72+
for (auto& fn : filenames) {
73+
out->Reset(
74+
new RecordIOFileReader<true>(fn, RestoreShapes(shape_concat, ranks)));
75+
}
7576
}
7677
};
7778

7879
class CreateRecordIOReaderOpMaker : public FileReaderMakerBase {
7980
protected:
8081
void Apply() override {
81-
AddAttr<std::string>("filename", "The filename of record io reader");
82+
AddAttr<std::vector<std::string>>("filenames",
83+
"The filenames of record io reader");
8284
AddComment(R"DOC(
8385
CreateRecordIOReader Operator
8486

python/paddle/fluid/layers/io.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from ..executor import global_scope
2222

2323
__all__ = [
24-
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file',
24+
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_files',
2525
'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
2626
'random_data_generator', 'Preprocessor'
2727
]
@@ -291,20 +291,20 @@ def _copy_reader_create_op_(block, op):
291291
return new_op
292292

293293

294-
def open_recordio_file(filename,
295-
shapes,
296-
lod_levels,
297-
dtypes,
298-
pass_num=1,
299-
for_parallel=True):
294+
def open_recordio_files(filenames,
295+
shapes,
296+
lod_levels,
297+
dtypes,
298+
pass_num=1,
299+
for_parallel=True):
300300
"""
301301
Open a RecordIO file
302302
303303
This layer takes a RecordIO file to read from and returns a Reader Variable.
304304
Via the Reader Variable, we can get data from the given RecordIO file.
305305
306306
Args:
307-
filename(str): The RecordIO file's name.
307+
filename(str) or list(str): The RecordIO file's name.
308308
shapes(list): List of tuples which declaring data shapes.
309309
lod_levels(list): List of ints which declaring data lod_level.
310310
dtypes(list): List of strs which declaring data type.
@@ -336,6 +336,8 @@ def open_recordio_file(filename,
336336
ranks.append(len(shape))
337337

338338
var_name = unique_name('open_recordio_file')
339+
if isinstance(filenames, str):
340+
filenames = [filenames]
339341

340342
startup_blk = default_startup_program().current_block()
341343
startup_var = startup_blk.create_var(name=var_name)
@@ -345,7 +347,7 @@ def open_recordio_file(filename,
345347
attrs={
346348
'shape_concat': shape_concat,
347349
'lod_levels': lod_levels,
348-
'filename': filename,
350+
'filenames': filenames,
349351
'ranks': ranks
350352
})
351353

python/paddle/fluid/recordio_writer.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import core
1616
import contextlib
17-
17+
from ..batch import batch
1818
__all__ = ['convert_reader_to_recordio_file']
1919

2020

@@ -46,3 +46,36 @@ def convert_reader_to_recordio_file(
4646
writer.complete_append_tensor()
4747
counter += 1
4848
return counter
49+
50+
51+
import paddle
52+
53+
54+
def convert_reader_to_recordio_files(
55+
filename_suffix,
56+
batch_per_file,
57+
reader_creator,
58+
feeder,
59+
compressor=core.RecordIOWriter.Compressor.Snappy,
60+
max_num_records=1000,
61+
feed_order=None):
62+
if feed_order is None:
63+
feed_order = feeder.feed_names
64+
lines = []
65+
f_idx = 0
66+
counter = 0
67+
for idx, batch in enumerate(reader_creator()):
68+
lines.append(batch)
69+
if idx >= batch_per_file and idx % batch_per_file == 0:
70+
filename = "%s-%05d" % (filename_suffix, f_idx)
71+
with create_recordio_writer(filename, compressor,
72+
max_num_records) as writer:
73+
for l in lines:
74+
res = feeder.feed(l)
75+
for each in feed_order:
76+
writer.append_tensor(res[each])
77+
writer.complete_append_tensor()
78+
counter += 1
79+
lines = []
80+
f_idx += 1
81+
return counter

tools/codestyle/docstring_checker.pyc

0 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)