|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
| 15 | +import core |
| 16 | +import framework |
| 17 | +import executor |
| 18 | +import data_feeder |
| 19 | +import contextlib |
| 20 | + |
| 21 | +# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module |
| 22 | +import optimizer as opt_module |
| 23 | + |
15 | 24 | __all__ = [
|
16 |
| - 'Event', |
17 | 25 | 'Trainer',
|
| 26 | + 'BeginEpochEvent', |
| 27 | + 'EndEpochEvent', |
| 28 | + 'BeginStepEvent', |
| 29 | + 'EndStepEvent', |
18 | 30 | ]
|
19 | 31 |
|
20 | 32 |
|
21 |
| -class Event(object): |
22 |
| - BEGIN_EPOCH = 0 |
23 |
| - END_EPOCH = 1 |
24 |
| - BEGIN_STEP = 2 |
25 |
| - END_STEP = 3 |
| 33 | +class BeginEpochEvent(object): |
| 34 | + def __init__(self, epoch_id): |
| 35 | + self.epoch = epoch_id |
| 36 | + |
| 37 | + |
| 38 | +class EndEpochEvent(object): |
| 39 | + def __init__(self, epoch_id): |
| 40 | + self.epoch = epoch_id |
26 | 41 |
|
27 |
| - def __init__(self): |
28 |
| - self.step = 0 |
29 |
| - self.epoch = 0 |
30 |
| - self.type = Event.BEGIN_EPOCH |
| 42 | + |
| 43 | +class BeginStepEvent(object): |
| 44 | + def __init__(self, epoch_id, step_id): |
| 45 | + self.epoch = epoch_id |
| 46 | + self.step = step_id |
| 47 | + |
| 48 | + |
| 49 | +class EndStepEvent(object): |
| 50 | + def __init__(self, epoch_id, step_id): |
| 51 | + self.epoch = epoch_id |
| 52 | + self.step = step_id |
31 | 53 |
|
32 | 54 |
|
33 | 55 | class Trainer(object):
|
| 56 | + """ |
| 57 | +
|
| 58 | + Args: |
| 59 | + network_func(callable): A function which will return loss. The loss must be a scaler. |
| 60 | + optimizer(optimizer.Optimizer): The optimizer should be an instance of Optimizer |
| 61 | + params: |
| 62 | + place: The device place of this trainer. |
| 63 | + """ |
| 64 | + |
34 | 65 | def __init__(self, network_func, optimizer, params=None, place=None):
|
35 | 66 | # 1. we need to generate a framework.Program by calling
|
36 | 67 | # network_func. Reference: fluid.program_guard in
|
37 | 68 | # test_word2vec.py
|
| 69 | + self.scope = self._get_scope_from_params(params) |
| 70 | + |
| 71 | + self.startup_program = framework.Program() |
| 72 | + self.train_program = framework.Program() |
| 73 | + |
| 74 | + with framework.program_guard(self.train_program, self.startup_program): |
| 75 | + loss = network_func() |
| 76 | + if not isinstance(optimizer, opt_module.Optimizer): |
| 77 | + raise TypeError( |
| 78 | + "The optimizer should be an instance of Optimizer") |
| 79 | + |
| 80 | + optimizer.minimize(loss) |
| 81 | + |
| 82 | + self.place = Trainer._check_and_get_place(place) |
38 | 83 |
|
39 | 84 | # 2. move the default_main_program to self.program and run the
|
40 | 85 | # default_startup program on an empty core.Scope()
|
| 86 | + # Run startup program |
| 87 | + if params is None: |
| 88 | + exe = executor.Executor(place) |
| 89 | + exe.run(self.startup_program, scope=self.scope) |
41 | 90 |
|
42 | 91 | # 3. call self.params.add_vars with the initialized scope, it
|
43 | 92 | # will add the new vars of the initialized scope into
|
44 | 93 | # self.params.
|
45 |
| - self.network_func = network_func |
46 |
| - self.optimizer = optimizer |
47 |
| - self.params = params |
48 |
| - self.place = place |
| 94 | + # TODO(yuyang): This depends on parameters implementation. |
| 95 | + |
49 | 96 | # TODO(helin): support distributed training
|
50 | 97 |
|
51 |
| - def train(self, reader, num_epochs, event_handler): |
52 |
| - pass |
| 98 | + def train(self, |
| 99 | + num_epochs, |
| 100 | + event_handler, |
| 101 | + reader=None, |
| 102 | + parallel=False, |
| 103 | + feed_order=None): |
| 104 | + """ |
| 105 | + Train the model. |
| 106 | +
|
| 107 | + Args: |
| 108 | + num_epochs: The number of epoch. An epoch will process all data in reader |
| 109 | + event_handler: The event handler. A function with type (ev:Event)->void |
| 110 | + reader: |
| 111 | + parallel: True if use multi-CPUs or multi-GPUs |
| 112 | + feed_order: Feeding order of reader. None will following the defining |
| 113 | + order in program |
| 114 | +
|
| 115 | + Returns: |
| 116 | +
|
| 117 | + """ |
| 118 | + if parallel: |
| 119 | + raise NotImplementedError( |
| 120 | + "Parallel Executor version of trainer is not implemented") |
| 121 | + |
| 122 | + self._train_by_executor(num_epochs, event_handler, reader, feed_order) |
53 | 123 |
|
54 | 124 | def test(self, reader):
|
55 | 125 | pass
|
| 126 | + |
| 127 | + def _get_scope_from_params(self, params): |
| 128 | + """ |
| 129 | + Get Scope from parameter object. |
| 130 | + Args: |
| 131 | + params(Parameter|None): The parameter object instance. Could be None. |
| 132 | +
|
| 133 | + Returns: New scope if params is None. Or params.scope() |
| 134 | + NOTE: This method is WIP. Not fully implemented. |
| 135 | + """ |
| 136 | + if params is None: |
| 137 | + return core.Scope() # new scope when params is None |
| 138 | + else: |
| 139 | + raise NotImplementedError("Not implemented right now.") |
| 140 | + |
| 141 | + @staticmethod |
| 142 | + def _check_and_get_place(place): |
| 143 | + """ |
| 144 | + Check the type of place or get the default place |
| 145 | + Args: |
| 146 | + place(None|core.CUDAPlace|core.CPUPlace): the place that trainer will be executed on. |
| 147 | +
|
| 148 | + Raises: |
| 149 | + TypeError if the type mismatched. |
| 150 | +
|
| 151 | + Returns: |
| 152 | + the original place if it is not None. |
| 153 | + if fluid is compiled with CUDA, returns CUDAPlace(0) by default. |
| 154 | + Otherwise returns CPUPlace by default. |
| 155 | + """ |
| 156 | + if place is None: |
| 157 | + if core.is_compiled_with_cuda(): |
| 158 | + return core.CUDAPlace(0) |
| 159 | + else: |
| 160 | + return core.CPUPlace() |
| 161 | + else: |
| 162 | + if not isinstance(place, core.CUDAPlace) and not isinstance( |
| 163 | + place, core.CPUPlace): |
| 164 | + raise TypeError("Place should be either CUDAPlace or CPUPlace") |
| 165 | + return place |
| 166 | + |
| 167 | + @contextlib.contextmanager |
| 168 | + def _prog_and_scope_guard(self): |
| 169 | + with framework.program_guard( |
| 170 | + main_program=self.train_program, |
| 171 | + startup_program=self.startup_program): |
| 172 | + with executor.scope_guard(self.scope): |
| 173 | + yield |
| 174 | + |
| 175 | + def _train_by_executor(self, num_epochs, event_handler, reader, feed_order): |
| 176 | + """ |
| 177 | + Train by Executor and single device. |
| 178 | +
|
| 179 | + Args: |
| 180 | + num_epochs: |
| 181 | + event_handler: |
| 182 | + reader: |
| 183 | + feed_order: |
| 184 | +
|
| 185 | + Returns: |
| 186 | +
|
| 187 | + """ |
| 188 | + with self._prog_and_scope_guard(): |
| 189 | + exe = executor.Executor(self.place) |
| 190 | + if feed_order is None: |
| 191 | + feed_var_list = [ |
| 192 | + var |
| 193 | + for var in self.train_program.global_block( |
| 194 | + ).vars.itervalues() |
| 195 | + if hasattr(var, 'is_data') and var.is_data |
| 196 | + ] |
| 197 | + else: |
| 198 | + feed_var_list = [ |
| 199 | + self.train_program.global_block().var(var_name) |
| 200 | + for var_name in feed_order |
| 201 | + ] |
| 202 | + |
| 203 | + feeder = data_feeder.DataFeeder( |
| 204 | + feed_list=feed_var_list, place=self.place) |
| 205 | + for epoch_id in range(num_epochs): |
| 206 | + event_handler(BeginEpochEvent(epoch_id)) |
| 207 | + for step_id, data in enumerate(reader()): |
| 208 | + event_handler(BeginStepEvent(epoch_id, step_id)) |
| 209 | + exe.run(feed=feeder.feed(data), fetch_list=[]) |
| 210 | + event_handler(EndStepEvent(epoch_id, step_id)) |
| 211 | + event_handler(EndEpochEvent(epoch_id)) |
0 commit comments