Skip to content

Commit 47630a4

Browse files
author
yi.wu
committed
fluid benchmark support recordio reader
1 parent 86d8659 commit 47630a4

File tree

10 files changed

+241
-27
lines changed

10 files changed

+241
-27
lines changed

benchmark/fluid/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ ADD *.whl /
1919
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s
2020

2121
ENV LD_LIBRARY_PATH=/usr/local/lib
22-
ADD fluid_benchmark.py dataset.py models/ /workspace/
22+
ADD fluid_benchmark.py recordio_converter.py models/ /workspace/

benchmark/fluid/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ Currently supported `--model` argument include:
4242
PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2
4343
```
4444

45+
## Prepare the RecordIO file to Achieve Better Performance
46+
47+
Run the following command will generate RecordIO files like "mnist.recordio" under the path
48+
and batch_size you choose:
49+
50+
```bash
51+
python -c 'from recordio_converter import *; prepare_mnist("data", 32)'
52+
```
53+
4554
## Run Distributed Benchmark on Kubernetes Cluster
4655

4756
You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will

benchmark/fluid/fluid_benchmark.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def parse_args():
4444
type=float,
4545
default=0.001,
4646
help='The minibatch size.')
47-
# TODO(wuyi): add "--use_fake_data" option back.
4847
parser.add_argument(
4948
'--skip_batch_num',
5049
type=int,
@@ -106,6 +105,16 @@ def parse_args():
106105
default='local',
107106
choices=['local', 'pserver', 'nccl2'],
108107
help='Choose parameter update method, can be local, pserver, nccl2.')
108+
parser.add_argument(
109+
'--use_reader_op',
110+
action='store_true',
111+
help='Whether to use reader op, and must specify the data path if set this to true.'
112+
)
113+
parser.add_argument(
114+
'--data_path',
115+
type=str,
116+
default="",
117+
help='Directory that contains all the training recordio files.')
109118
args = parser.parse_args()
110119
return args
111120

@@ -208,11 +217,13 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
208217
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
209218
exe = fluid.Executor(place)
210219
exe.run(startup_prog)
211-
feed_var_list = [
212-
var for var in train_prog.global_block().vars.itervalues()
213-
if var.is_data
214-
]
215-
feeder = fluid.DataFeeder(feed_var_list, place)
220+
221+
if not args.use_reader_op:
222+
feed_var_list = [
223+
var for var in train_prog.global_block().vars.itervalues()
224+
if var.is_data
225+
]
226+
feeder = fluid.DataFeeder(feed_var_list, place)
216227

217228
iters, num_samples, start_time = 0, 0, time.time()
218229
for pass_id in range(args.pass_num):
@@ -223,9 +234,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
223234
num_samples = 0
224235
if iters == args.iterations:
225236
break
226-
loss = exe.run(train_prog,
227-
feed=feeder.feed(data),
228-
fetch_list=[avg_loss])
237+
if args.use_reader_op:
238+
loss = exe.run(train_prog, fetch_list=[avg_loss])
239+
else:
240+
loss = exe.run(train_prog,
241+
feed=feeder.feed(data),
242+
fetch_list=[avg_loss])
229243
iters += 1
230244
num_samples += len(data)
231245
train_losses.append(loss)
@@ -251,10 +265,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
251265
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
252266
batch_acc, args, train_prog, startup_prog, nccl_id_var,
253267
num_trainers, trainer_id):
254-
feed_var_list = [
255-
var for var in train_prog.global_block().vars.itervalues()
256-
if var.is_data
257-
]
268+
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
269+
if not args.use_reader_op:
270+
feed_var_list = [
271+
var for var in train_prog.global_block().vars.itervalues()
272+
if var.is_data
273+
]
274+
feeder = fluid.DataFeeder(feed_var_list, place)
275+
258276
# generate fake:
259277
if args.use_fake_data:
260278
for var in feed_var_list:
@@ -271,7 +289,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
271289
"value": 1.0,
272290
"dtype": var.dtype})
273291

274-
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
275292
if nccl_id_var and trainer_id == 0:
276293
#FIXME(wuyi): wait other trainer to start listening
277294
time.sleep(30)
@@ -288,7 +305,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
288305
num_trainers=num_trainers,
289306
trainer_id=trainer_id)
290307

291-
feeder = fluid.DataFeeder(feed_var_list, place)
292308
for pass_id in range(args.pass_num):
293309
num_samples = 0
294310
iters = 0
@@ -304,7 +320,10 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
304320
num_samples = 0
305321
if iters == args.iterations:
306322
break
307-
if args.use_fake_data:
323+
# NOTE: if use reader ops, the input data is not splited to multiple cards
324+
if args.use_reader_op and iters >= args.iterations / args.gpus:
325+
break
326+
if args.use_fake_data or args.use_reader_op:
308327
loss, = exe.run([avg_loss.name])
309328
else:
310329
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
@@ -316,6 +335,8 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
316335
print("Pass %d, batch %d, loss %s" %
317336
(pass_id, batch_id, np.array(loss)))
318337
train_elapsed = time.time() - start_time
338+
if args.use_reader_op:
339+
num_samples = num_samples * args.gpus
319340
examples_per_sec = num_samples / train_elapsed
320341
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
321342
(num_samples, train_elapsed, examples_per_sec))
@@ -342,7 +363,7 @@ def main():
342363
# the unique trainer id, starting from 0, needed by trainer
343364
# only
344365
nccl_id_var, num_trainers, trainer_id = (
345-
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1")))
366+
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
346367

347368
if args.use_cprof:
348369
pr = cProfile.Profile()

benchmark/fluid/models/machine_translation.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor):
197197

198198

199199
def get_model(args):
200+
if args.use_reader_op:
201+
raise Exception("machine_translation do not support reader op for now.")
200202
embedding_dim = 512
201203
encoder_size = 512
202204
decoder_size = 512

benchmark/fluid/models/mnist.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import argparse
2121
import time
2222
import cProfile
23+
import os
2324

2425
import paddle
2526
import paddle.fluid as fluid
@@ -65,9 +66,23 @@ def cnn_model(data):
6566

6667

6768
def get_model(args):
68-
# Input data
69-
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
70-
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
69+
if args.use_reader_op:
70+
filelist = [
71+
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
72+
]
73+
data_file = fluid.layers.open_files(
74+
filenames=filelist,
75+
shapes=[[-1, 1, 28, 28], (-1, 1)],
76+
lod_levels=[0, 0],
77+
dtypes=["float32", "int64"],
78+
thread_num=args.gpus)
79+
data_file = fluid.layers.double_buffer(
80+
fluid.layers.batch(
81+
data_file, batch_size=args.batch_size))
82+
images, label = fluid.layers.read_file(data_file)
83+
else:
84+
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
85+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
7186

7287
# Train program
7388
predict = cnn_model(images)

benchmark/fluid/models/resnet.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import functools
2020
import numpy as np
2121
import time
22+
import os
2223

2324
import cProfile, pstats, StringIO
2425

@@ -129,9 +130,24 @@ def get_model(args):
129130
else:
130131
dshape = [224, 224, 3]
131132
model = resnet_imagenet
133+
if args.use_reader_op:
134+
filelist = [
135+
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
136+
]
137+
data_file = fluid.layers.open_files(
138+
filenames=filelist,
139+
shapes=[[-1] + dshape, (-1, 1)],
140+
lod_levels=[0, 0],
141+
dtypes=["float32", "int64"],
142+
thread_num=args.gpus)
143+
data_file = fluid.layers.double_buffer(
144+
fluid.layers.batch(
145+
data_file, batch_size=args.batch_size))
146+
input, label = fluid.layers.read_file(data_file)
147+
else:
148+
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
149+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
132150

133-
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
134-
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
135151
predict = model(input, class_dim)
136152
cost = fluid.layers.cross_entropy(input=predict, label=label)
137153
avg_cost = fluid.layers.mean(x=cost)

benchmark/fluid/models/stacked_dynamic_lstm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ def __impl__():
4444

4545

4646
def get_model(args):
47+
if args.use_reader_op:
48+
raise Exception(
49+
"stacked_dynamic_lstm do not support reader op for now.")
4750
lstm_size = 512
4851
emb_dim = 512
4952
crop_size = 1500

benchmark/fluid/models/vgg.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import paddle.fluid.core as core
2323
import argparse
2424
import functools
25+
import os
2526

2627

2728
def vgg16_bn_drop(input):
@@ -65,9 +66,23 @@ def get_model(args):
6566
else:
6667
data_shape = [224, 224, 3]
6768

68-
# Input data
69-
images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32')
70-
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
69+
if args.use_reader_op:
70+
filelist = [
71+
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
72+
]
73+
data_file = fluid.layers.open_files(
74+
filenames=filelist,
75+
shapes=[[-1] + data_shape, (-1, 1)],
76+
lod_levels=[0, 0],
77+
dtypes=["float32", "int64"],
78+
thread_num=args.gpus)
79+
data_file = fluid.layers.double_buffer(
80+
fluid.layers.batch(
81+
data_file, batch_size=args.batch_size))
82+
images, label = fluid.layers.read_file(data_file)
83+
else:
84+
images = fluid.layers.data(name='data', shape=dshape, dtype='float32')
85+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
7186

7287
# Train program
7388
net = vgg16_bn_drop(images)

0 commit comments

Comments
 (0)