Skip to content

Commit 2a5cb2e

Browse files
author
Yancey
authored
Merge pull request #11066 from Yancey1989/dist_recordio
support recordio in dist train
2 parents a210300 + 97b2f6f commit 2a5cb2e

File tree

3 files changed

+165
-2
lines changed

3 files changed

+165
-2
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# How to use RecordIO in Fluid
2+
3+
If you want to use RecordIO as your training data format, you need to convert to your training data
4+
to RecordIO files and reading them in the process of training, PaddlePaddle Fluid provides some
5+
interface to deal with the RecordIO files.
6+
7+
## Generate RecordIO File
8+
9+
Before start training with RecordIO files, you need to convert your training data
10+
to RecordIO format by `fluid.recordio_writer.convert_reader_to_recordio_file`, the sample codes
11+
as follows:
12+
13+
```python
14+
reader = paddle.batch(mnist.train(), batch_size=1)
15+
feeder = fluid.DataFeeder(
16+
feed_list=[ # order is image and label
17+
fluid.layers.data(
18+
name='image', shape=[784]),
19+
fluid.layers.data(
20+
name='label', shape=[1], dtype='int64'),
21+
],
22+
place=fluid.CPUPlace())
23+
fluid.recordio_writer.convert_reader_to_recordio_file('./mnist.recordio', reader, feeder)
24+
```
25+
26+
The above code snippet would generate a RecordIO `./mnist.recordio` on your host.
27+
28+
**NOTE**: we recommend users to set `batch_size=1` when generating the recordio files so that users can
29+
adjust it flexibly while reading it.
30+
31+
## Use the RecordIO file in a Local Training Job
32+
33+
PaddlePaddle Fluid provides an interface `fluid.layers.io.open_recordio_file` to load your RecordIO file
34+
and then you can use them as a Layer in your network configuration, the sample codes as follows:
35+
36+
```python
37+
data_file = fluid.layers.io.open_recordio_file(
38+
filename="./mnist.recordio",
39+
shapes=[(-1, 784),(-1, 1)],
40+
lod_levels=[0, 0],
41+
dtypes=["float32", "int32"])
42+
data_file = fluid.layers.io.batch(data_file, batch_size=4)
43+
44+
img, label = fluid.layers.io.read_file(data_file)
45+
hidden = fluid.layers.fc(input=img, size=100, act='tanh')
46+
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
47+
loss = fluid.layers.cross_entropy(input=prediction, label=label)
48+
avg_loss = fluid.layers.mean(loss)
49+
50+
fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss)
51+
52+
place = fluid.CPUPlace()
53+
54+
exe = fluid.Executor(place)
55+
exe.run(fluid.default_startup_program())
56+
avg_loss_np = []
57+
58+
# train a pass
59+
batch_id = 0
60+
while True:
61+
tmp, = exe.run(fetch_list=[avg_loss])
62+
63+
avg_loss_np.append(tmp)
64+
print(batch_id)
65+
batch_id += 1
66+
```
67+
68+
## Use the RecordIO files in Distributed Training
69+
70+
1. generate multiple RecordIO files
71+
72+
For a distributed training job, you may have multiple trainer nodes,
73+
and one or more RecordIO files for one trainer node, you can use the interface
74+
`fluid.recordio_writer.convert_reader_to_recordio_files` to convert your training data
75+
into multiple RecordIO files, the sample codes as follows:
76+
77+
```python
78+
reader = paddle.batch(mnist.train(), batch_size=1)
79+
feeder = fluid.DataFeeder(
80+
feed_list=[ # order is image and label
81+
fluid.layers.data(
82+
name='image', shape=[784]),
83+
fluid.layers.data(
84+
name='label', shape=[1], dtype='int64'),
85+
],
86+
place=fluid.CPUPlace())
87+
fluid.recordio_writer.convert_reader_to_recordio_files(
88+
filename_suffix='./mnist.recordio', batch_per_file=100, reader, feeder)
89+
```
90+
91+
The above codes would generate multiple RecordIO files on your host like:
92+
93+
```bash
94+
.
95+
\_mnist-00000.recordio
96+
|-mnist-00001.recordio
97+
|-mnist-00002.recordio
98+
|-mnist-00003.recordio
99+
|-mnist-00004.recordio
100+
```
101+
102+
2. open multiple RecordIO files by `fluid.layers.io.open_files`
103+
104+
For a distributed training job, the distributed operator system will schedule trainer process on multiple nodes,
105+
each trainer process reads parts of the whole training data, we usually take the following approach to make the training
106+
data allocated by each trainer process as uniform as possiable:
107+
108+
```python
109+
def gen_train_list(file_pattern, trainers, trainer_id):
110+
file_list = glob.glob(file_pattern)
111+
ret_list = []
112+
for idx, f in enumerate(file_list):
113+
if (idx + trainers) % trainers == trainer_id:
114+
ret_list.append(f)
115+
return ret_list
116+
117+
trainers = int(os.getenv("TRAINERS"))
118+
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
119+
data_file = fluid.layers.io.open_files(
120+
filenames=gen_train_list("./mnist-[0-9]*.recordio", 2, 0),
121+
thread_num=1,
122+
shapes=[(-1, 784),(-1, 1)],
123+
lod_levels=[0, 0],
124+
dtypes=["float32", "int32"])
125+
img, label = fluid.layers.io.read_file(data_files)
126+
...
127+
```

python/paddle/fluid/recordio_writer.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import os
1516
import core
1617
import contextlib
17-
18-
__all__ = ['convert_reader_to_recordio_file']
18+
__all__ = [
19+
'convert_reader_to_recordio_file', 'convert_reader_to_recordio_files'
20+
]
1921

2022

2123
@contextlib.contextmanager
@@ -46,3 +48,36 @@ def convert_reader_to_recordio_file(
4648
writer.complete_append_tensor()
4749
counter += 1
4850
return counter
51+
52+
53+
def convert_reader_to_recordio_files(
54+
filename,
55+
batch_per_file,
56+
reader_creator,
57+
feeder,
58+
compressor=core.RecordIOWriter.Compressor.Snappy,
59+
max_num_records=1000,
60+
feed_order=None):
61+
if feed_order is None:
62+
feed_order = feeder.feed_names
63+
f_name, f_ext = os.path.splitext(filename)
64+
assert (f_ext == ".recordio")
65+
66+
lines = []
67+
f_idx = 0
68+
counter = 0
69+
for idx, batch in enumerate(reader_creator()):
70+
lines.append(batch)
71+
if idx >= batch_per_file and idx % batch_per_file == 0:
72+
filename = "%s-%05d%s" % (f_name, f_idx, f_ext)
73+
with create_recordio_writer(filename, compressor,
74+
max_num_records) as writer:
75+
for l in lines:
76+
res = feeder.feed(l)
77+
for each in feed_order:
78+
writer.append_tensor(res[each])
79+
writer.complete_append_tensor()
80+
counter += 1
81+
lines = []
82+
f_idx += 1
83+
return counter

tools/codestyle/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.pyc

0 commit comments

Comments
 (0)