Skip to content

Commit 5b7a9dd

Browse files
authored
Merge pull request #13815 from jacquesqiao/optimize-pyreader
optimize pyreader
2 parents a270fdf + ce99419 commit 5b7a9dd

File tree

3 files changed

+270
-143
lines changed

3 files changed

+270
-143
lines changed

paddle/fluid/API.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
189189
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
190190
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
191191
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
192+
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
192193
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
193194
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
194195
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)

python/paddle/fluid/layers/io.py

Lines changed: 212 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030

3131
__all__ = [
3232
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
33-
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
33+
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
34+
'Preprocessor', 'load'
3435
]
3536

3637

@@ -475,6 +476,159 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
475476
return monkey_patch_reader_methods(main_prog_var)
476477

477478

479+
def _py_reader(capacity,
480+
shapes,
481+
dtypes,
482+
lod_levels=None,
483+
name=None,
484+
use_double_buffer=True,
485+
feed_list=None):
486+
487+
if feed_list is not None:
488+
if not isinstance(feed_list, list):
489+
raise TypeError("feed_list should be a list of Variable"
490+
" instead of " + str(type(feed_list)))
491+
lod_levels = []
492+
dtypes = []
493+
shape_concat = []
494+
ranks = []
495+
shapes = []
496+
497+
for feed_data in feed_list:
498+
dtypes.append(feed_data.dtype)
499+
shape_concat.extend(feed_data.shape)
500+
ranks.append(len(feed_data.shape))
501+
shapes.append(feed_data.shape)
502+
lod_levels.append(feed_data.lod_level)
503+
else:
504+
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
505+
shape_concat = []
506+
ranks = []
507+
508+
for shape in shapes:
509+
shape_concat.extend(shape)
510+
ranks.append(len(shape))
511+
512+
if lod_levels is None:
513+
lod_levels = [0] * len(shapes)
514+
515+
if name is None:
516+
queue_name = unique_name('lod_tensor_blocking_queue')
517+
reader_name = unique_name('create_py_reader')
518+
double_buffer_name = unique_name('double_buffer')
519+
else:
520+
queue_name = "_".join([name, "queue"])
521+
reader_name = "_".join([name, "reader"])
522+
double_buffer_name = "_".join([name, "double_buffer"])
523+
524+
var = global_scope().var(queue_name)
525+
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
526+
527+
startup_blk = default_startup_program().current_block()
528+
startup_var = startup_blk.create_var(name=reader_name)
529+
startup_blk.append_op(
530+
type='create_py_reader',
531+
inputs={'blocking_queue': [queue_name]},
532+
outputs={'Out': [startup_var]},
533+
attrs={
534+
'shape_concat': shape_concat,
535+
'lod_levels': lod_levels,
536+
'ranks': ranks
537+
})
538+
539+
startup_var.desc.set_dtypes(dtypes)
540+
startup_var.persistable = True
541+
542+
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
543+
startup_var)
544+
545+
reader = monkey_patch_reader_methods(main_prog_var)
546+
if use_double_buffer:
547+
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
548+
# we return a double buffer reader. However, the reset method comes from
549+
# py_reader.
550+
double_buffer_reader.reset = reader.reset
551+
reader = double_buffer_reader
552+
553+
# monkey patch py_reader special methods
554+
reader.queue = feed_queue
555+
current_reset_method = reader.reset
556+
reader.thread = None
557+
reader.tensor_provider = None
558+
reader.exited = False
559+
560+
def start_provide_thread(func):
561+
def __provider_thread__():
562+
for tensors in func():
563+
array = core.LoDTensorArray()
564+
for item in tensors:
565+
if not isinstance(item, core.LoDTensor):
566+
tmp = core.LoDTensor()
567+
tmp.set(item, core.CPUPlace())
568+
item = tmp
569+
570+
array.append(item)
571+
572+
if reader.exited:
573+
break
574+
feed_queue.push(array)
575+
if reader.exited:
576+
break
577+
feed_queue.close()
578+
579+
reader.thread = threading.Thread(target=__provider_thread__)
580+
reader.thread.daemon = True
581+
reader.thread.start()
582+
583+
def __set_tensor_provider__(func):
584+
reader.tensor_provider = func
585+
586+
def __set_paddle_reader__(paddle_reader):
587+
with program_guard(Program(), Program()):
588+
actual_feed_list = feed_list
589+
if actual_feed_list is None:
590+
actual_feed_list = []
591+
counter = 0
592+
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
593+
name = str(counter)
594+
actual_feed_list.append(
595+
data(
596+
name=name,
597+
dtype=dtype,
598+
shape=shape,
599+
lod_level=lod_level))
600+
counter += 1
601+
602+
data_names = [feed_data.name for feed_data in actual_feed_list]
603+
feeder = DataFeeder(
604+
feed_list=actual_feed_list, place=core.CPUPlace())
605+
paddle_reader = feeder.decorate_reader(
606+
paddle_reader, multi_devices=False)
607+
608+
def __tensor_provider__():
609+
for slots in paddle_reader():
610+
yield [slots[data_name] for data_name in data_names]
611+
612+
__set_tensor_provider__(__tensor_provider__)
613+
614+
def __reset__():
615+
current_reset_method()
616+
if reader.thread is not None and reader.tensor_provider is not None:
617+
reader.exited = True
618+
reader.thread.join()
619+
reader.exited = False
620+
621+
def __start__():
622+
start_provide_thread(reader.tensor_provider)
623+
624+
reader.reset = __reset__
625+
reader.decorate_tensor_provider = __set_tensor_provider__
626+
reader.decorate_paddle_reader = __set_paddle_reader__
627+
reader.start = __start__
628+
629+
return reader
630+
631+
478632
def py_reader(capacity,
479633
shapes,
480634
dtypes,
@@ -599,128 +753,72 @@ def py_reader(capacity,
599753
>>> except fluid.core.EOFException:
600754
>>> test_reader.reset()
601755
"""
602-
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
603-
shape_concat = []
604-
ranks = []
605-
606-
for shape in shapes:
607-
shape_concat.extend(shape)
608-
ranks.append(len(shape))
609-
610-
if lod_levels is None:
611-
lod_levels = [0] * len(shapes)
612-
613-
if name is None:
614-
queue_name = unique_name('lod_tensor_blocking_queue')
615-
reader_name = unique_name('create_py_reader')
616-
double_buffer_name = unique_name('double_buffer')
617-
else:
618-
queue_name = "_".join([name, "queue"])
619-
reader_name = "_".join([name, "reader"])
620-
double_buffer_name = "_".join([name, "double_buffer"])
621-
622-
var = global_scope().var(queue_name)
623-
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
624-
625-
startup_blk = default_startup_program().current_block()
626-
startup_var = startup_blk.create_var(name=reader_name)
627-
startup_blk.append_op(
628-
type='create_py_reader',
629-
inputs={'blocking_queue': [queue_name]},
630-
outputs={'Out': [startup_var]},
631-
attrs={
632-
'shape_concat': shape_concat,
633-
'lod_levels': lod_levels,
634-
'ranks': ranks
635-
})
636-
637-
startup_var.desc.set_dtypes(dtypes)
638-
startup_var.persistable = True
639-
640-
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
641-
startup_var)
642-
643-
reader = monkey_patch_reader_methods(main_prog_var)
644-
if use_double_buffer:
645-
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
646-
# we return a double buffer reader. However, the reset method comes from
647-
# py_reader.
648-
double_buffer_reader.reset = reader.reset
649-
reader = double_buffer_reader
650-
651-
# monkey patch py_reader special methods
652-
reader.queue = feed_queue
653-
current_reset_method = reader.reset
654-
reader.thread = None
655-
reader.tensor_provider = None
656-
reader.exited = False
657-
658-
def start_provide_thread(func):
659-
def __provider_thread__():
660-
for tensors in func():
661-
array = core.LoDTensorArray()
662-
for item in tensors:
663-
if not isinstance(item, core.LoDTensor):
664-
tmp = core.LoDTensor()
665-
tmp.set(item, core.CPUPlace())
666-
item = tmp
667-
668-
array.append(item)
669-
670-
if reader.exited:
671-
break
672-
feed_queue.push(array)
673-
if reader.exited:
674-
break
675-
feed_queue.close()
756+
return _py_reader(
757+
capacity=capacity,
758+
shapes=shapes,
759+
dtypes=dtypes,
760+
lod_levels=lod_levels,
761+
name=name,
762+
use_double_buffer=use_double_buffer)
676763

677-
reader.thread = threading.Thread(target=__provider_thread__)
678-
reader.thread.daemon = True
679-
reader.thread.start()
680764

681-
def __set_tensor_provider__(func):
682-
reader.tensor_provider = func
765+
def create_py_reader_by_data(capacity,
766+
feed_list,
767+
name=None,
768+
use_double_buffer=True):
769+
"""
770+
Create a Python reader for data feeding in Python
683771
684-
def __set_paddle_reader__(paddle_reader):
685-
with program_guard(Program(), Program()):
686-
feed_list = []
687-
counter = 0
688-
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
689-
name = str(counter)
690-
feed_list.append(
691-
data(
692-
name=name,
693-
dtype=dtype,
694-
shape=shape,
695-
lod_level=lod_level))
696-
counter += 1
697-
698-
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
699-
paddle_reader = feeder.decorate_reader(
700-
paddle_reader, multi_devices=False)
772+
This layer returns a Reader Variable.
701773
702-
def __tensor_provider__():
703-
for slots in paddle_reader():
704-
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
774+
Works much like py_reader except that it's input is feed_list
775+
instead of shapes, dtypes and lod_levels
705776
706-
__set_tensor_provider__(__tensor_provider__)
777+
Args:
778+
capacity(int): The buffer capacity maintained by :code:`py_reader`.
779+
feed_list(list(Variable)): The data feed list.
780+
name(basestring): The prefix Python queue name and Reader name. None will
781+
be generated automatically.
782+
use_double_buffer(bool): Whether use double buffer or not.
707783
708-
def __reset__():
709-
current_reset_method()
710-
if reader.thread is not None and reader.tensor_provider is not None:
711-
reader.exited = True
712-
reader.thread.join()
713-
reader.exited = False
784+
Returns:
785+
Variable: A Reader from which we can get feeding data.
714786
715-
def __start__():
716-
start_provide_thread(reader.tensor_provider)
787+
Examples:
717788
718-
reader.reset = __reset__
719-
reader.decorate_tensor_provider = __set_tensor_provider__
720-
reader.decorate_paddle_reader = __set_paddle_reader__
721-
reader.start = __start__
789+
1. The basic usage of :code:`py_reader` is as follows:
722790
723-
return reader
791+
>>> import paddle.fluid as fluid
792+
>>> import paddle.dataset.mnist as mnist
793+
>>>
794+
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
795+
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
796+
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
797+
>>> reader.decorate_paddle_reader(
798+
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
799+
>>>
800+
>>> img, label = fluid.layers.read_file(reader)
801+
>>> loss = network(img, label) # some network definition
802+
>>>
803+
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
804+
>>>
805+
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
806+
>>> for epoch_id in range(10):
807+
>>> reader.start()
808+
>>> try:
809+
>>> while True:
810+
>>> exe.run(fetch_list=[loss.name])
811+
>>> except fluid.core.EOFException:
812+
>>> reader.reset()
813+
"""
814+
return _py_reader(
815+
capacity=capacity,
816+
shapes=None,
817+
dtypes=None,
818+
lod_levels=None,
819+
name=name,
820+
use_double_buffer=use_double_buffer,
821+
feed_list=feed_list)
724822

725823

726824
def open_files(filenames,

0 commit comments

Comments
 (0)