Skip to content

Commit c70fec9

Browse files
committed
optimize pyreader
1 parent 16b1beb commit c70fec9

File tree

4 files changed

+244
-133
lines changed

4 files changed

+244
-133
lines changed

paddle/fluid/API.spec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
178178
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
179179
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
180180
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
181+
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
181182
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
182183
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
183184
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)

paddle/fluid/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,5 @@ endif(NOT WIN32)
1212
if(WITH_INFERENCE)
1313
# NOTE: please add subdirectory inference at last.
1414
add_subdirectory(inference)
15+
add_subdirectory(train)
1516
endif()
16-
17-
add_subdirectory(train)

python/paddle/fluid/layers/io.py

Lines changed: 211 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030

3131
__all__ = [
3232
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
33-
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
33+
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
34+
'Preprocessor', 'load'
3435
]
3536

3637

@@ -470,6 +471,158 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
470471
return monkey_patch_reader_methods(main_prog_var)
471472

472473

474+
def _py_reader(capacity,
475+
shapes,
476+
dtypes,
477+
lod_levels=None,
478+
name=None,
479+
use_double_buffer=True,
480+
feed_list=None):
481+
482+
if feed_list is not None:
483+
if not isinstance(feed_list, list):
484+
raise TypeError("feed_list should be a list of Variable"
485+
" instead of " + str(type(feed_list)))
486+
lod_levels = []
487+
dtypes = []
488+
shape_concat = []
489+
ranks = []
490+
shapes = []
491+
492+
for data in feed_list:
493+
dtypes.append(data.dtype)
494+
shape_concat.extend(data.shape)
495+
ranks.append(len(data.shape))
496+
shapes.append(data.shape)
497+
lod_levels.append(data.lod_level)
498+
else:
499+
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
500+
shape_concat = []
501+
ranks = []
502+
503+
for shape in shapes:
504+
shape_concat.extend(shape)
505+
ranks.append(len(shape))
506+
507+
if lod_levels is None:
508+
lod_levels = [0] * len(shapes)
509+
510+
if name is None:
511+
queue_name = unique_name('lod_tensor_blocking_queue')
512+
reader_name = unique_name('create_py_reader')
513+
double_buffer_name = unique_name('double_buffer')
514+
else:
515+
queue_name = "_".join([name, "queue"])
516+
reader_name = "_".join([name, "reader"])
517+
double_buffer_name = "_".join([name, "double_buffer"])
518+
519+
var = global_scope().var(queue_name)
520+
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
521+
522+
startup_blk = default_startup_program().current_block()
523+
startup_var = startup_blk.create_var(name=reader_name)
524+
startup_blk.append_op(
525+
type='create_py_reader',
526+
inputs={'blocking_queue': [queue_name]},
527+
outputs={'Out': [startup_var]},
528+
attrs={
529+
'shape_concat': shape_concat,
530+
'lod_levels': lod_levels,
531+
'ranks': ranks
532+
})
533+
534+
startup_var.desc.set_dtypes(dtypes)
535+
startup_var.persistable = True
536+
537+
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
538+
startup_var)
539+
540+
reader = monkey_patch_reader_methods(main_prog_var)
541+
if use_double_buffer:
542+
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
543+
# we return a double buffer reader. However, the reset method comes from
544+
# py_reader.
545+
double_buffer_reader.reset = reader.reset
546+
reader = double_buffer_reader
547+
548+
# monkey patch py_reader special methods
549+
reader.queue = feed_queue
550+
current_reset_method = reader.reset
551+
reader.thread = None
552+
reader.tensor_provider = None
553+
reader.exited = False
554+
555+
def start_provide_thread(func):
556+
def __provider_thread__():
557+
for tensors in func():
558+
array = core.LoDTensorArray()
559+
for item in tensors:
560+
if not isinstance(item, core.LoDTensor):
561+
tmp = core.LoDTensor()
562+
tmp.set(item, core.CPUPlace())
563+
item = tmp
564+
565+
array.append(item)
566+
567+
if reader.exited:
568+
break
569+
feed_queue.push(array)
570+
if reader.exited:
571+
break
572+
feed_queue.close()
573+
574+
reader.thread = threading.Thread(target=__provider_thread__)
575+
reader.thread.daemon = True
576+
reader.thread.start()
577+
578+
def __set_tensor_provider__(func):
579+
reader.tensor_provider = func
580+
581+
def __set_paddle_reader__(paddle_reader):
582+
with program_guard(Program(), Program()):
583+
actual_feed_list = feed_list
584+
if actual_feed_list is None:
585+
actual_feed_list = []
586+
counter = 0
587+
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
588+
name = str(counter)
589+
actual_feed_list.append(
590+
data(
591+
name=name,
592+
dtype=dtype,
593+
shape=shape,
594+
lod_level=lod_level))
595+
counter += 1
596+
597+
feeder = DataFeeder(
598+
feed_list=actual_feed_list, place=core.CPUPlace())
599+
paddle_reader = feeder.decorate_reader(
600+
paddle_reader, multi_devices=False)
601+
602+
def __tensor_provider__():
603+
for slots in paddle_reader():
604+
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
605+
606+
__set_tensor_provider__(__tensor_provider__)
607+
608+
def __reset__():
609+
current_reset_method()
610+
if reader.thread is not None and reader.tensor_provider is not None:
611+
reader.exited = True
612+
reader.thread.join()
613+
reader.exited = False
614+
615+
def __start__():
616+
start_provide_thread(reader.tensor_provider)
617+
618+
reader.reset = __reset__
619+
reader.decorate_tensor_provider = __set_tensor_provider__
620+
reader.decorate_paddle_reader = __set_paddle_reader__
621+
reader.start = __start__
622+
623+
return reader
624+
625+
473626
def py_reader(capacity,
474627
shapes,
475628
dtypes,
@@ -594,128 +747,72 @@ def py_reader(capacity,
594747
>>> except fluid.core.EOFException:
595748
>>> test_reader.reset()
596749
"""
597-
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
598-
shape_concat = []
599-
ranks = []
600-
601-
for shape in shapes:
602-
shape_concat.extend(shape)
603-
ranks.append(len(shape))
604-
605-
if lod_levels is None:
606-
lod_levels = [0] * len(shapes)
607-
608-
if name is None:
609-
queue_name = unique_name('lod_tensor_blocking_queue')
610-
reader_name = unique_name('create_py_reader')
611-
double_buffer_name = unique_name('double_buffer')
612-
else:
613-
queue_name = "_".join([name, "queue"])
614-
reader_name = "_".join([name, "reader"])
615-
double_buffer_name = "_".join([name, "double_buffer"])
616-
617-
var = global_scope().var(queue_name)
618-
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
619-
620-
startup_blk = default_startup_program().current_block()
621-
startup_var = startup_blk.create_var(name=reader_name)
622-
startup_blk.append_op(
623-
type='create_py_reader',
624-
inputs={'blocking_queue': [queue_name]},
625-
outputs={'Out': [startup_var]},
626-
attrs={
627-
'shape_concat': shape_concat,
628-
'lod_levels': lod_levels,
629-
'ranks': ranks
630-
})
631-
632-
startup_var.desc.set_dtypes(dtypes)
633-
startup_var.persistable = True
634-
635-
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
636-
startup_var)
637-
638-
reader = monkey_patch_reader_methods(main_prog_var)
639-
if use_double_buffer:
640-
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
641-
# we return a double buffer reader. However, the reset method comes from
642-
# py_reader.
643-
double_buffer_reader.reset = reader.reset
644-
reader = double_buffer_reader
645-
646-
# monkey patch py_reader special methods
647-
reader.queue = feed_queue
648-
current_reset_method = reader.reset
649-
reader.thread = None
650-
reader.tensor_provider = None
651-
reader.exited = False
652-
653-
def start_provide_thread(func):
654-
def __provider_thread__():
655-
for tensors in func():
656-
array = core.LoDTensorArray()
657-
for item in tensors:
658-
if not isinstance(item, core.LoDTensor):
659-
tmp = core.LoDTensor()
660-
tmp.set(item, core.CPUPlace())
661-
item = tmp
750+
return _py_reader(
751+
capacity=capacity,
752+
shapes=shapes,
753+
dtypes=dtypes,
754+
lod_levels=lod_levels,
755+
name=name,
756+
use_double_buffer=use_double_buffer)
662757

663-
array.append(item)
664758

665-
if reader.exited:
666-
break
667-
feed_queue.push(array)
668-
if reader.exited:
669-
break
670-
feed_queue.close()
671-
672-
reader.thread = threading.Thread(target=__provider_thread__)
673-
reader.thread.daemon = True
674-
reader.thread.start()
675-
676-
def __set_tensor_provider__(func):
677-
reader.tensor_provider = func
759+
def create_py_reader_by_data(capacity,
760+
feed_list,
761+
name=None,
762+
use_double_buffer=True):
763+
"""
764+
Create a Python reader for data feeding in Python
678765
679-
def __set_paddle_reader__(paddle_reader):
680-
with program_guard(Program(), Program()):
681-
feed_list = []
682-
counter = 0
683-
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
684-
name = str(counter)
685-
feed_list.append(
686-
data(
687-
name=name,
688-
dtype=dtype,
689-
shape=shape,
690-
lod_level=lod_level))
691-
counter += 1
692-
693-
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
694-
paddle_reader = feeder.decorate_reader(
695-
paddle_reader, multi_devices=False)
766+
This layer returns a Reader Variable.
696767
697-
def __tensor_provider__():
698-
for slots in paddle_reader():
699-
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
768+
Works much like py_reader except that it's input is feed_list
769+
instead of shapes, dtypes and lod_levels
700770
701-
__set_tensor_provider__(__tensor_provider__)
771+
Args:
772+
capacity(int): The buffer capacity maintained by :code:`py_reader`.
773+
feed_list(list(Variable)): The data feed list.
774+
name(basestring): The prefix Python queue name and Reader name. None will
775+
be generated automatically.
776+
use_double_buffer(bool): Whether use double buffer or not.
702777
703-
def __reset__():
704-
current_reset_method()
705-
if reader.thread is not None and reader.tensor_provider is not None:
706-
reader.exited = True
707-
reader.thread.join()
708-
reader.exited = False
778+
Returns:
779+
Variable: A Reader from which we can get feeding data.
709780
710-
def __start__():
711-
start_provide_thread(reader.tensor_provider)
781+
Examples:
712782
713-
reader.reset = __reset__
714-
reader.decorate_tensor_provider = __set_tensor_provider__
715-
reader.decorate_paddle_reader = __set_paddle_reader__
716-
reader.start = __start__
783+
1. The basic usage of :code:`py_reader` is as follows:
717784
718-
return reader
785+
>>> import paddle.fluid as fluid
786+
>>> import paddle.dataset.mnist as mnist
787+
>>>
788+
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
789+
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
790+
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
791+
>>> reader.decorate_paddle_reader(
792+
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
793+
>>>
794+
>>> img, label = fluid.layers.read_file(reader)
795+
>>> loss = network(img, label) # some network definition
796+
>>>
797+
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
798+
>>>
799+
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
800+
>>> for epoch_id in range(10):
801+
>>> reader.start()
802+
>>> try:
803+
>>> while True:
804+
>>> exe.run(fetch_list=[loss.name])
805+
>>> except fluid.core.EOFException:
806+
>>> reader.reset()
807+
"""
808+
return _py_reader(
809+
capacity=capacity,
810+
shapes=None,
811+
dtypes=None,
812+
lod_levels=None,
813+
name=name,
814+
use_double_buffer=use_double_buffer,
815+
feed_list=feed_list)
719816

720817

721818
def open_files(filenames,

0 commit comments

Comments
 (0)