|
| 1 | +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +from __future__ import print_function |
| 16 | + |
| 17 | +import unittest |
| 18 | +import paddle |
| 19 | +import paddle.fluid as fluid |
| 20 | +import paddle.fluid.core as core |
| 21 | +import numpy as np |
| 22 | +from threading import Thread |
| 23 | + |
| 24 | + |
| 25 | +def user_reader(inputs): |
| 26 | + def _reader(): |
| 27 | + for d in inputs: |
| 28 | + yield d |
| 29 | + |
| 30 | + return _reader |
| 31 | + |
| 32 | + |
| 33 | +def batch_feeder(batch_reader, pin_memory=False, img_dtype="float32"): |
| 34 | + def _feeder(): |
| 35 | + for batch_data in batch_reader(): |
| 36 | + sample_batch = [] |
| 37 | + label_batch = [] |
| 38 | + for sample, label in batch_data: |
| 39 | + sample_batch.append(sample) |
| 40 | + label_batch.append([label]) |
| 41 | + tensor = core.LoDTensor() |
| 42 | + label = core.LoDTensor() |
| 43 | + place = core.CUDAPinnedPlace() if pin_memory else core.CPUPlace() |
| 44 | + tensor.set(np.array(sample_batch, dtype=img_dtype), place) |
| 45 | + label.set(np.array(label_batch, dtype="int64"), place) |
| 46 | + yield [tensor, label] |
| 47 | + |
| 48 | + return _feeder |
| 49 | + |
| 50 | + |
| 51 | +class TestPyReader(unittest.TestCase): |
| 52 | + def setUp(self): |
| 53 | + self.capacity = 10 |
| 54 | + self.shapes = [(-1, 3, 2, 1), (-1, 1)] |
| 55 | + self.lod_levels = [0, 0] |
| 56 | + self.dtypes = ['float32', 'int64'] |
| 57 | + |
| 58 | + def test_pin_memory_pyreader(self): |
| 59 | + with fluid.program_guard(fluid.Program(), fluid.Program()): |
| 60 | + place = fluid.CUDAPlace(0) if fluid.core.is_compiled_with_cuda( |
| 61 | + ) else fluid.CPUPlace() |
| 62 | + executor = fluid.Executor(place) |
| 63 | + |
| 64 | + data_file = fluid.layers.py_reader( |
| 65 | + capacity=self.capacity, |
| 66 | + dtypes=self.dtypes, |
| 67 | + lod_levels=self.lod_levels, |
| 68 | + shapes=self.shapes) |
| 69 | + # feed_queue = data_file.queue |
| 70 | + read_out_data = fluid.layers.read_file(data_file) |
| 71 | + |
| 72 | + self.inputs = [] |
| 73 | + for _ in range(10): |
| 74 | + sample = np.random.uniform( |
| 75 | + low=0, high=1, size=[3, 2, 1]).astype("float32") |
| 76 | + label = np.random.uniform( |
| 77 | + low=0, high=10, size=[1]).astype("int64") |
| 78 | + self.inputs.append((sample, label)) |
| 79 | + |
| 80 | + self.input_tensors = [] |
| 81 | + for d, l in batch_feeder( |
| 82 | + paddle.batch( |
| 83 | + user_reader(self.inputs), batch_size=2), |
| 84 | + pin_memory=True |
| 85 | + if fluid.core.is_compiled_with_cuda() else False)(): |
| 86 | + ta = fluid.LoDTensorArray() |
| 87 | + ta.append(d) |
| 88 | + ta.append(l) |
| 89 | + self.input_tensors.append(ta) |
| 90 | + |
| 91 | + self.batched_inputs = [] |
| 92 | + for batch in paddle.batch(user_reader(self.inputs), batch_size=2)(): |
| 93 | + feed_d = [] |
| 94 | + feed_l = [] |
| 95 | + for d, l in batch: |
| 96 | + feed_d.append(d) |
| 97 | + feed_l.append([l]) |
| 98 | + self.batched_inputs.append([feed_d, feed_l]) |
| 99 | + |
| 100 | + data_file.decorate_tensor_provider( |
| 101 | + batch_feeder( |
| 102 | + paddle.batch( |
| 103 | + user_reader(self.inputs), batch_size=2), |
| 104 | + pin_memory=True |
| 105 | + if fluid.core.is_compiled_with_cuda() else False)) |
| 106 | + |
| 107 | + executor.run(fluid.default_startup_program()) |
| 108 | + self.outputs = [] |
| 109 | + |
| 110 | + data_file.start() |
| 111 | + for _ in self.input_tensors: |
| 112 | + self.outputs.append( |
| 113 | + executor.run(fetch_list=list(read_out_data))) |
| 114 | + data_file.reset() |
| 115 | + self.validate() |
| 116 | + |
| 117 | + def validate(self): |
| 118 | + self.assertEqual(len(self.batched_inputs), len(self.outputs)) |
| 119 | + for in_data_list, out_data_list in zip(self.batched_inputs, |
| 120 | + self.outputs): |
| 121 | + self.assertEqual(len(in_data_list), len(out_data_list)) |
| 122 | + in_data_list_np = [ |
| 123 | + np.array(in_lod_tensor) for in_lod_tensor in in_data_list |
| 124 | + ] |
| 125 | + for in_data, out_data in zip(in_data_list_np, out_data_list): |
| 126 | + self.assertTrue((in_data == out_data).all()) |
| 127 | + |
| 128 | + |
| 129 | +if __name__ == '__main__': |
| 130 | + unittest.main() |
0 commit comments