Skip to content

Commit 5fc3052

Browse files
authored
Merge pull request #13787 from PaddlePaddle/revert-13637-optimize-opyreader
Revert "optimize pyreader"
2 parents e1904ac + 9d087d5 commit 5fc3052

File tree

4 files changed

+133
-244
lines changed

4 files changed

+133
-244
lines changed

paddle/fluid/API.spec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
178178
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
179179
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
180180
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
181-
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
182181
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
183182
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
184183
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)

paddle/fluid/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ endif(NOT WIN32)
1212
if(WITH_INFERENCE)
1313
# NOTE: please add subdirectory inference at last.
1414
add_subdirectory(inference)
15-
add_subdirectory(train)
1615
endif()
16+
17+
add_subdirectory(train)

python/paddle/fluid/layers/io.py

Lines changed: 114 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@
3030

3131
__all__ = [
3232
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
33-
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
34-
'Preprocessor', 'load'
33+
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
3534
]
3635

3736

@@ -471,158 +470,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
471470
return monkey_patch_reader_methods(main_prog_var)
472471

473472

474-
def _py_reader(capacity,
475-
shapes,
476-
dtypes,
477-
lod_levels=None,
478-
name=None,
479-
use_double_buffer=True,
480-
feed_list=None):
481-
482-
if feed_list is not None:
483-
if not isinstance(feed_list, list):
484-
raise TypeError("feed_list should be a list of Variable"
485-
" instead of " + str(type(feed_list)))
486-
lod_levels = []
487-
dtypes = []
488-
shape_concat = []
489-
ranks = []
490-
shapes = []
491-
492-
for data in feed_list:
493-
dtypes.append(data.dtype)
494-
shape_concat.extend(data.shape)
495-
ranks.append(len(data.shape))
496-
shapes.append(data.shape)
497-
lod_levels.append(data.lod_level)
498-
else:
499-
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
500-
shape_concat = []
501-
ranks = []
502-
503-
for shape in shapes:
504-
shape_concat.extend(shape)
505-
ranks.append(len(shape))
506-
507-
if lod_levels is None:
508-
lod_levels = [0] * len(shapes)
509-
510-
if name is None:
511-
queue_name = unique_name('lod_tensor_blocking_queue')
512-
reader_name = unique_name('create_py_reader')
513-
double_buffer_name = unique_name('double_buffer')
514-
else:
515-
queue_name = "_".join([name, "queue"])
516-
reader_name = "_".join([name, "reader"])
517-
double_buffer_name = "_".join([name, "double_buffer"])
518-
519-
var = global_scope().var(queue_name)
520-
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
521-
522-
startup_blk = default_startup_program().current_block()
523-
startup_var = startup_blk.create_var(name=reader_name)
524-
startup_blk.append_op(
525-
type='create_py_reader',
526-
inputs={'blocking_queue': [queue_name]},
527-
outputs={'Out': [startup_var]},
528-
attrs={
529-
'shape_concat': shape_concat,
530-
'lod_levels': lod_levels,
531-
'ranks': ranks
532-
})
533-
534-
startup_var.desc.set_dtypes(dtypes)
535-
startup_var.persistable = True
536-
537-
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
538-
startup_var)
539-
540-
reader = monkey_patch_reader_methods(main_prog_var)
541-
if use_double_buffer:
542-
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
543-
# we return a double buffer reader. However, the reset method comes from
544-
# py_reader.
545-
double_buffer_reader.reset = reader.reset
546-
reader = double_buffer_reader
547-
548-
# monkey patch py_reader special methods
549-
reader.queue = feed_queue
550-
current_reset_method = reader.reset
551-
reader.thread = None
552-
reader.tensor_provider = None
553-
reader.exited = False
554-
555-
def start_provide_thread(func):
556-
def __provider_thread__():
557-
for tensors in func():
558-
array = core.LoDTensorArray()
559-
for item in tensors:
560-
if not isinstance(item, core.LoDTensor):
561-
tmp = core.LoDTensor()
562-
tmp.set(item, core.CPUPlace())
563-
item = tmp
564-
565-
array.append(item)
566-
567-
if reader.exited:
568-
break
569-
feed_queue.push(array)
570-
if reader.exited:
571-
break
572-
feed_queue.close()
573-
574-
reader.thread = threading.Thread(target=__provider_thread__)
575-
reader.thread.daemon = True
576-
reader.thread.start()
577-
578-
def __set_tensor_provider__(func):
579-
reader.tensor_provider = func
580-
581-
def __set_paddle_reader__(paddle_reader):
582-
with program_guard(Program(), Program()):
583-
actual_feed_list = feed_list
584-
if actual_feed_list is None:
585-
actual_feed_list = []
586-
counter = 0
587-
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
588-
name = str(counter)
589-
actual_feed_list.append(
590-
data(
591-
name=name,
592-
dtype=dtype,
593-
shape=shape,
594-
lod_level=lod_level))
595-
counter += 1
596-
597-
feeder = DataFeeder(
598-
feed_list=actual_feed_list, place=core.CPUPlace())
599-
paddle_reader = feeder.decorate_reader(
600-
paddle_reader, multi_devices=False)
601-
602-
def __tensor_provider__():
603-
for slots in paddle_reader():
604-
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
605-
606-
__set_tensor_provider__(__tensor_provider__)
607-
608-
def __reset__():
609-
current_reset_method()
610-
if reader.thread is not None and reader.tensor_provider is not None:
611-
reader.exited = True
612-
reader.thread.join()
613-
reader.exited = False
614-
615-
def __start__():
616-
start_provide_thread(reader.tensor_provider)
617-
618-
reader.reset = __reset__
619-
reader.decorate_tensor_provider = __set_tensor_provider__
620-
reader.decorate_paddle_reader = __set_paddle_reader__
621-
reader.start = __start__
622-
623-
return reader
624-
625-
626473
def py_reader(capacity,
627474
shapes,
628475
dtypes,
@@ -747,72 +594,128 @@ def py_reader(capacity,
747594
>>> except fluid.core.EOFException:
748595
>>> test_reader.reset()
749596
"""
750-
return _py_reader(
751-
capacity=capacity,
752-
shapes=shapes,
753-
dtypes=dtypes,
754-
lod_levels=lod_levels,
755-
name=name,
756-
use_double_buffer=use_double_buffer)
597+
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
598+
shape_concat = []
599+
ranks = []
757600

601+
for shape in shapes:
602+
shape_concat.extend(shape)
603+
ranks.append(len(shape))
758604

759-
def create_py_reader_by_data(capacity,
760-
feed_list,
761-
name=None,
762-
use_double_buffer=True):
763-
"""
764-
Create a Python reader for data feeding in Python
605+
if lod_levels is None:
606+
lod_levels = [0] * len(shapes)
765607

766-
This layer returns a Reader Variable.
608+
if name is None:
609+
queue_name = unique_name('lod_tensor_blocking_queue')
610+
reader_name = unique_name('create_py_reader')
611+
double_buffer_name = unique_name('double_buffer')
612+
else:
613+
queue_name = "_".join([name, "queue"])
614+
reader_name = "_".join([name, "reader"])
615+
double_buffer_name = "_".join([name, "double_buffer"])
767616

768-
Works much like py_reader except that it's input is feed_list
769-
instead of shapes, dtypes and lod_levels
617+
var = global_scope().var(queue_name)
618+
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
770619

771-
Args:
772-
capacity(int): The buffer capacity maintained by :code:`py_reader`.
773-
feed_list(list(Variable)): The data feed list.
774-
name(basestring): The prefix Python queue name and Reader name. None will
775-
be generated automatically.
776-
use_double_buffer(bool): Whether use double buffer or not.
620+
startup_blk = default_startup_program().current_block()
621+
startup_var = startup_blk.create_var(name=reader_name)
622+
startup_blk.append_op(
623+
type='create_py_reader',
624+
inputs={'blocking_queue': [queue_name]},
625+
outputs={'Out': [startup_var]},
626+
attrs={
627+
'shape_concat': shape_concat,
628+
'lod_levels': lod_levels,
629+
'ranks': ranks
630+
})
777631

778-
Returns:
779-
Variable: A Reader from which we can get feeding data.
632+
startup_var.desc.set_dtypes(dtypes)
633+
startup_var.persistable = True
780634

781-
Examples:
635+
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
636+
startup_var)
782637

783-
1. The basic usage of :code:`py_reader` is as follows:
638+
reader = monkey_patch_reader_methods(main_prog_var)
639+
if use_double_buffer:
640+
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
641+
# we return a double buffer reader. However, the reset method comes from
642+
# py_reader.
643+
double_buffer_reader.reset = reader.reset
644+
reader = double_buffer_reader
784645

785-
>>> import paddle.fluid as fluid
786-
>>> import paddle.dataset.mnist as mnist
787-
>>>
788-
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
789-
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
790-
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
791-
>>> reader.decorate_paddle_reader(
792-
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
793-
>>>
794-
>>> img, label = fluid.layers.read_file(reader)
795-
>>> loss = network(img, label) # some network definition
796-
>>>
797-
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
798-
>>>
799-
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
800-
>>> for epoch_id in range(10):
801-
>>> reader.start()
802-
>>> try:
803-
>>> while True:
804-
>>> exe.run(fetch_list=[loss.name])
805-
>>> except fluid.core.EOFException:
806-
>>> reader.reset()
807-
"""
808-
return _py_reader(
809-
capacity=capacity,
810-
shapes=None,
811-
dtypes=None,
812-
lod_levels=None,
813-
name=name,
814-
use_double_buffer=use_double_buffer,
815-
feed_list=feed_list)
646+
# monkey patch py_reader special methods
647+
reader.queue = feed_queue
648+
current_reset_method = reader.reset
649+
reader.thread = None
650+
reader.tensor_provider = None
651+
reader.exited = False
652+
653+
def start_provide_thread(func):
654+
def __provider_thread__():
655+
for tensors in func():
656+
array = core.LoDTensorArray()
657+
for item in tensors:
658+
if not isinstance(item, core.LoDTensor):
659+
tmp = core.LoDTensor()
660+
tmp.set(item, core.CPUPlace())
661+
item = tmp
662+
663+
array.append(item)
664+
665+
if reader.exited:
666+
break
667+
feed_queue.push(array)
668+
if reader.exited:
669+
break
670+
feed_queue.close()
671+
672+
reader.thread = threading.Thread(target=__provider_thread__)
673+
reader.thread.daemon = True
674+
reader.thread.start()
675+
676+
def __set_tensor_provider__(func):
677+
reader.tensor_provider = func
678+
679+
def __set_paddle_reader__(paddle_reader):
680+
with program_guard(Program(), Program()):
681+
feed_list = []
682+
counter = 0
683+
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
684+
name = str(counter)
685+
feed_list.append(
686+
data(
687+
name=name,
688+
dtype=dtype,
689+
shape=shape,
690+
lod_level=lod_level))
691+
counter += 1
692+
693+
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
694+
paddle_reader = feeder.decorate_reader(
695+
paddle_reader, multi_devices=False)
696+
697+
def __tensor_provider__():
698+
for slots in paddle_reader():
699+
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
700+
701+
__set_tensor_provider__(__tensor_provider__)
702+
703+
def __reset__():
704+
current_reset_method()
705+
if reader.thread is not None and reader.tensor_provider is not None:
706+
reader.exited = True
707+
reader.thread.join()
708+
reader.exited = False
709+
710+
def __start__():
711+
start_provide_thread(reader.tensor_provider)
712+
713+
reader.reset = __reset__
714+
reader.decorate_tensor_provider = __set_tensor_provider__
715+
reader.decorate_paddle_reader = __set_paddle_reader__
716+
reader.start = __start__
717+
718+
return reader
816719

817720

818721
def open_files(filenames,

0 commit comments

Comments
 (0)