Skip to content

Commit 635099c

Browse files
authored
Merge pull request #11121 from typhoonzero/fluid_benchmark_support_recordioreader
Fluid benchmark support recordio reader
2 parents f7c96f0 + cd33057 commit 635099c

File tree

10 files changed

+354
-55
lines changed

10 files changed

+354
-55
lines changed

benchmark/fluid/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ ADD *.whl /
1919
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s
2020

2121
ENV LD_LIBRARY_PATH=/usr/local/lib
22-
ADD fluid_benchmark.py dataset.py models/ /workspace/
22+
ADD fluid_benchmark.py recordio_converter.py models/ /workspace/

benchmark/fluid/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ Currently supported `--model` argument include:
4444
PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2
4545
```
4646

47+
## Prepare the RecordIO file to Achieve Better Performance
48+
49+
Run the following command will generate RecordIO files like "mnist.recordio" under the path
50+
and batch_size you choose, you can use batch_size=1 so that later reader can change the batch_size
51+
at any time using `fluid.batch`.
52+
53+
```bash
54+
python -c 'from recordio_converter import *; prepare_mnist("data", 1)'
55+
```
56+
4757
## Run Distributed Benchmark on Kubernetes Cluster
4858

4959
You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will

benchmark/fluid/fluid_benchmark.py

Lines changed: 87 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,23 @@ def parse_args():
3838
default='resnet',
3939
help='The model to run benchmark with.')
4040
parser.add_argument(
41-
'--batch_size', type=int, default=32, help='The minibatch size.')
41+
'--batch_size',
42+
type=int,
43+
default=32,
44+
help='The batch size on each gpu.')
4245
parser.add_argument(
4346
'--learning_rate', type=float, default=0.001, help='The learning rate.')
44-
# TODO(wuyi): add "--use_fake_data" option back.
4547
parser.add_argument(
4648
'--skip_batch_num',
4749
type=int,
4850
default=5,
4951
help='The first num of minibatch num to skip, for better performance test'
5052
)
5153
parser.add_argument(
52-
'--iterations', type=int, default=80, help='The number of minibatches.')
54+
'--iterations',
55+
type=int,
56+
default=80,
57+
help='The number of minibatches, set to -1 to run all batches.')
5358
parser.add_argument(
5459
'--pass_num', type=int, default=100, help='The number of passes.')
5560
parser.add_argument(
@@ -69,6 +74,7 @@ def parse_args():
6974
type=int,
7075
default=1,
7176
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
77+
# this option is available only for vgg and resnet.
7278
parser.add_argument(
7379
'--cpus',
7480
type=int,
@@ -78,7 +84,7 @@ def parse_args():
7884
'--data_set',
7985
type=str,
8086
default='flowers',
81-
choices=['cifar10', 'flowers'],
87+
choices=['cifar10', 'flowers', 'imagenet'],
8288
help='Optional dataset for benchmark.')
8389
parser.add_argument(
8490
'--infer_only', action='store_true', help='If set, run forward only.')
@@ -108,6 +114,16 @@ def parse_args():
108114
default='local',
109115
choices=['local', 'pserver', 'nccl2'],
110116
help='Choose parameter update method, can be local, pserver, nccl2.')
117+
parser.add_argument(
118+
'--use_reader_op',
119+
action='store_true',
120+
help='Whether to use reader op, and must specify the data path if set this to true.'
121+
)
122+
parser.add_argument(
123+
'--data_path',
124+
type=str,
125+
default="",
126+
help='Directory that contains all the training recordio files.')
111127
args = parser.parse_args()
112128
return args
113129

@@ -210,26 +226,50 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
210226
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
211227
exe = fluid.Executor(place)
212228
exe.run(startup_prog)
213-
feed_var_list = [
214-
var for var in train_prog.global_block().vars.itervalues()
215-
if var.is_data
216-
]
217-
feeder = fluid.DataFeeder(feed_var_list, place)
229+
230+
if not args.use_reader_op:
231+
feed_var_list = [
232+
var for var in train_prog.global_block().vars.itervalues()
233+
if var.is_data
234+
]
235+
feeder = fluid.DataFeeder(feed_var_list, place)
218236

219237
iters, num_samples, start_time = 0, 0, time.time()
220238
for pass_id in range(args.pass_num):
221239
train_losses = []
222-
for batch_id, data in enumerate(train_reader()):
240+
if not args.use_reader_op:
241+
reader_generator = train_reader()
242+
batch_id = 0
243+
data = None
244+
while True:
245+
if not args.use_reader_op:
246+
data = next(reader_generator, None)
247+
if data == None:
248+
break
249+
if iters == args.iterations:
250+
break
223251
if iters == args.skip_batch_num:
224252
start_time = time.time()
225253
num_samples = 0
226-
if iters == args.iterations:
227-
break
228-
loss = exe.run(train_prog,
229-
feed=feeder.feed(data),
230-
fetch_list=[avg_loss])
254+
255+
if args.use_reader_op:
256+
try:
257+
loss = exe.run(train_prog, fetch_list=[avg_loss])
258+
except fluid.core.EnforceNotMet as ex:
259+
break
260+
else:
261+
loss = exe.run(train_prog,
262+
feed=feeder.feed(data),
263+
fetch_list=[avg_loss])
231264
iters += 1
232-
num_samples += len(data)
265+
batch_id += 1
266+
# FIXME(wuyi): For use_reader_op, if the current
267+
# pass is not the last, the last batch of this pass
268+
# is also equal to args.batch_size.
269+
if args.use_reader_op:
270+
num_samples += args.batch_size * args.gpus
271+
else:
272+
num_samples += len(data)
233273
train_losses.append(loss)
234274
print("Pass: %d, Iter: %d, Loss: %f\n" %
235275
(pass_id, iters, np.mean(train_losses)))
@@ -250,10 +290,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
250290
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
251291
batch_acc, args, train_prog, startup_prog, nccl_id_var,
252292
num_trainers, trainer_id):
253-
feed_var_list = [
254-
var for var in train_prog.global_block().vars.itervalues()
255-
if var.is_data
256-
]
293+
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
294+
if not args.use_reader_op:
295+
feed_var_list = [
296+
var for var in train_prog.global_block().vars.itervalues()
297+
if var.is_data
298+
]
299+
feeder = fluid.DataFeeder(feed_var_list, place)
300+
257301
# generate fake:
258302
if args.use_fake_data:
259303
for var in feed_var_list:
@@ -270,7 +314,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
270314
"value": 1.0,
271315
"dtype": var.dtype})
272316

273-
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
274317
if nccl_id_var and trainer_id == 0:
275318
#FIXME(wuyi): wait other trainer to start listening
276319
time.sleep(30)
@@ -287,12 +330,21 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
287330
num_trainers=num_trainers,
288331
trainer_id=trainer_id)
289332

290-
feeder = fluid.DataFeeder(feed_var_list, place)
291333
for pass_id in range(args.pass_num):
292334
num_samples = 0
293335
iters = 0
294336
start_time = time.time()
295-
for batch_id, data in enumerate(train_reader()):
337+
if not args.use_reader_op:
338+
reader_generator = train_reader()
339+
batch_id = 0
340+
data = None
341+
while True:
342+
if not args.use_reader_op:
343+
data = next(reader_generator, None)
344+
if data == None:
345+
break
346+
if iters == args.iterations:
347+
break
296348
if args.profile and pass_id == 0 and batch_id == 5:
297349
profiler.start_profiler("All")
298350
elif args.profile and pass_id == 0 and batch_id == 10:
@@ -301,19 +353,26 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
301353
if iters == args.skip_batch_num:
302354
start_time = time.time()
303355
num_samples = 0
304-
if iters == args.iterations:
305-
break
306-
if args.use_fake_data:
307-
loss, = exe.run([avg_loss.name])
356+
if args.use_fake_data or args.use_reader_op:
357+
try:
358+
loss, = exe.run([avg_loss.name])
359+
except fluid.core.EnforceNotMet as ex:
360+
break
308361
else:
309362
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
310363
if args.update_method == "pserver":
311364
exe.bcast_params()
312-
num_samples += len(data)
365+
if args.use_reader_op:
366+
num_samples += args.batch_size * args.gpus
367+
else:
368+
num_samples += len(data)
313369
iters += 1
314370
if batch_id % 1 == 0:
315371
print("Pass %d, batch %d, loss %s" %
316372
(pass_id, batch_id, np.array(loss)))
373+
batch_id += 1
374+
if args.use_reader_op:
375+
num_samples = num_samples * args.gpus
317376
print_train_time(start_time, time.time(), num_samples)
318377
if not args.no_test and batch_acc:
319378
test_acc = test(startup_exe, infer_prog, test_reader, feeder,

benchmark/fluid/models/machine_translation.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor):
197197

198198

199199
def get_model(args):
200+
if args.use_reader_op:
201+
raise Exception("machine_translation do not support reader op for now.")
200202
embedding_dim = 512
201203
encoder_size = 512
202204
decoder_size = 512
@@ -221,7 +223,7 @@ def get_model(args):
221223
train_batch_generator = paddle.batch(
222224
paddle.reader.shuffle(
223225
paddle.dataset.wmt14.train(dict_size), buf_size=1000),
224-
batch_size=args.batch_size)
226+
batch_size=args.batch_size * args.gpus)
225227

226228
test_batch_generator = paddle.batch(
227229
paddle.reader.shuffle(

benchmark/fluid/models/mnist.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import argparse
2121
import time
2222
import cProfile
23+
import os
2324

2425
import paddle
2526
import paddle.fluid as fluid
@@ -65,9 +66,24 @@ def cnn_model(data):
6566

6667

6768
def get_model(args):
68-
# Input data
69-
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
70-
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
69+
if args.use_reader_op:
70+
filelist = [
71+
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
72+
]
73+
data_file = fluid.layers.open_files(
74+
filenames=filelist,
75+
shapes=[[-1, 1, 28, 28], (-1, 1)],
76+
lod_levels=[0, 0],
77+
dtypes=["float32", "int64"],
78+
thread_num=args.gpus,
79+
pass_num=args.pass_num)
80+
data_file = fluid.layers.double_buffer(
81+
fluid.layers.batch(
82+
data_file, batch_size=args.batch_size))
83+
images, label = fluid.layers.read_file(data_file)
84+
else:
85+
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
86+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
7187

7288
if args.device == 'CPU' and args.cpus > 1:
7389
places = fluid.layers.get_places(args.cpus)
@@ -103,7 +119,7 @@ def get_model(args):
103119

104120
# Reader
105121
train_reader = paddle.batch(
106-
paddle.dataset.mnist.train(), batch_size=args.batch_size)
122+
paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus)
107123
test_reader = paddle.batch(
108124
paddle.dataset.mnist.test(), batch_size=args.batch_size)
109125
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc

benchmark/fluid/models/resnet.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import functools
2020
import numpy as np
2121
import time
22+
import os
2223

2324
import cProfile, pstats, StringIO
2425

2526
import paddle
2627
import paddle.fluid as fluid
2728
import paddle.fluid.core as core
2829
import paddle.fluid.profiler as profiler
30+
from recordio_converter import imagenet_train, imagenet_test
2931

3032

3133
def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):
@@ -122,16 +124,48 @@ def get_model(args):
122124
else:
123125
dshape = [32, 32, 3]
124126
model = resnet_cifar10
125-
else:
127+
train_reader = paddle.dataset.cifar.train10()
128+
test_reader = paddle.dataset.cifar.test10()
129+
elif args.data_set == "flowers":
126130
class_dim = 102
127131
if args.data_format == 'NCHW':
128132
dshape = [3, 224, 224]
129133
else:
130134
dshape = [224, 224, 3]
131135
model = resnet_imagenet
132-
133-
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
134-
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
136+
train_reader = paddle.dataset.flowers.train()
137+
test_reader = paddle.dataset.flowers.test()
138+
elif args.data_set == "imagenet":
139+
class_dim = 1000
140+
if args.data_format == 'NCHW':
141+
dshape = [3, 224, 224]
142+
else:
143+
dshape = [224, 224, 3]
144+
model = resnet_imagenet
145+
if not args.data_path:
146+
raise Exception(
147+
"Must specify --data_path when training with imagenet")
148+
train_reader = imagenet_train(args.data_path)
149+
test_reader = imagenet_test(args.data_path)
150+
151+
if args.use_reader_op:
152+
filelist = [
153+
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
154+
]
155+
data_file = fluid.layers.open_files(
156+
filenames=filelist,
157+
shapes=[[-1] + dshape, (-1, 1)],
158+
lod_levels=[0, 0],
159+
dtypes=["float32", "int64"],
160+
thread_num=args.gpus,
161+
pass_num=args.pass_num)
162+
data_file = fluid.layers.double_buffer(
163+
fluid.layers.batch(
164+
data_file, batch_size=args.batch_size))
165+
input, label = fluid.layers.read_file(data_file)
166+
else:
167+
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
168+
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
135169

136170
if args.device == 'CPU' and args.cpus > 1:
137171
places = fluid.layers.get_places(args.cpus)
@@ -162,15 +196,10 @@ def get_model(args):
162196

163197
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
164198

165-
train_reader = paddle.batch(
199+
batched_train_reader = paddle.batch(
166200
paddle.reader.shuffle(
167-
paddle.dataset.cifar.train10()
168-
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
169-
buf_size=5120),
170-
batch_size=args.batch_size)
171-
test_reader = paddle.batch(
172-
paddle.dataset.cifar.test10()
173-
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
174-
batch_size=args.batch_size)
175-
176-
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
201+
train_reader, buf_size=5120),
202+
batch_size=args.batch_size * args.gpus)
203+
batched_test_reader = paddle.batch(train_reader, batch_size=args.batch_size)
204+
205+
return avg_cost, inference_program, optimizer, batched_train_reader, batched_test_reader, batch_acc

0 commit comments

Comments
 (0)