|
15 | 15 | import unittest
|
16 | 16 | import paddle.fluid as fluid
|
17 | 17 | from paddle.fluid.transpiler.distribute_transpiler import delete_ops
|
| 18 | +import traceback |
18 | 19 |
|
19 |
| -from transpiler_test import TranspilerTest |
20 | 20 |
|
21 |
| - |
22 |
| -class TestDistTranspiler(TranspilerTest): |
| 21 | +class TranspilerTest(unittest.TestCase): |
23 | 22 | def setUp(self):
|
24 |
| - self.current_pserver_ep = "127.0.0.1:6174" |
| 23 | + self.trainer_id = 0 |
| 24 | + self.trainers = 2 |
| 25 | + self.pservers = 2 |
| 26 | + # NOTE: we do not actually bind this port |
| 27 | + self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" |
| 28 | + self.pserver1_ep = "127.0.0.1:6174" |
| 29 | + self.pserver2_ep = "127.0.0.1:6175" |
| 30 | + self.slice_var_up = True |
| 31 | + self.sync_mode = True |
| 32 | + self.transpiler = None |
| 33 | + |
| 34 | + def net_conf(self): |
| 35 | + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') |
| 36 | + y_predict = fluid.layers.fc(input=x, |
| 37 | + size=1000, |
| 38 | + act=None, |
| 39 | + param_attr=fluid.ParamAttr(name='fc_w'), |
| 40 | + bias_attr=fluid.ParamAttr(name='fc_b')) |
| 41 | + y = fluid.layers.data(name='y', shape=[1], dtype='float32') |
| 42 | + cost = fluid.layers.square_error_cost(input=y_predict, label=y) |
| 43 | + avg_cost = fluid.layers.mean(cost) |
| 44 | + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) |
| 45 | + sgd_optimizer.minimize(avg_cost) |
| 46 | + return |
| 47 | + |
| 48 | + def get_main_program(self): |
| 49 | + main = fluid.Program() |
| 50 | + with fluid.program_guard(main): |
| 51 | + self.net_conf() |
| 52 | + self.origin_prog = main.clone() |
| 53 | + return main |
| 54 | + |
| 55 | + def get_trainer(self): |
| 56 | + t = self._transpiler_instance() |
| 57 | + return t.get_trainer_program() |
| 58 | + |
| 59 | + def get_pserver(self, ep): |
| 60 | + t = self._transpiler_instance() |
| 61 | + pserver = t.get_pserver_program(ep) |
| 62 | + startup = t.get_startup_program(ep, pserver) |
| 63 | + return pserver, startup |
| 64 | + |
| 65 | + def _transpiler_instance(self): |
| 66 | + if not self.transpiler: |
| 67 | + main = self.get_main_program() |
| 68 | + self.transpiler = fluid.DistributeTranspiler() |
| 69 | + self.transpiler.transpile( |
| 70 | + self.trainer_id, |
| 71 | + program=main, |
| 72 | + pservers=self.pserver_eps, |
| 73 | + trainers=self.trainers, |
| 74 | + slice_var_up=self.slice_var_up, |
| 75 | + sync_mode=self.sync_mode) |
| 76 | + return self.transpiler |
25 | 77 |
|
| 78 | + |
| 79 | +class TestBasicModel(TranspilerTest): |
26 | 80 | def test_transpiler(self):
|
| 81 | + pserver, startup = self.get_pserver(self.pserver1_ep) |
| 82 | + pserver2, startup2 = self.get_pserver(self.pserver2_ep) |
| 83 | + |
27 | 84 | trainer = self.get_trainer()
|
28 |
| - pserver, startup = self.get_pserver(self.current_pserver_ep) |
29 |
| - self.assertEqual([op.type for op in trainer.global_block().ops], |
30 |
| - self.get_expect_trainer_ops()) |
| 85 | + |
| 86 | + self.assertEqual([op.type for op in trainer.global_block().ops], [ |
| 87 | + 'mul', 'elementwise_add', 'elementwise_sub', 'square', 'mean', |
| 88 | + 'fill_constant', 'mean_grad', 'square_grad', 'elementwise_sub_grad', |
| 89 | + 'elementwise_add_grad', 'send', 'mul_grad', 'split_byref', 'send', |
| 90 | + 'send_barrier', 'recv', 'recv', 'fetch_barrier', 'concat' |
| 91 | + ]) |
31 | 92 |
|
32 | 93 | self.assertEqual(len(pserver.blocks), 3)
|
33 | 94 | # block0: listen_and_serv
|
34 | 95 | self.assertEqual([op.type for op in pserver.blocks[0].ops],
|
35 | 96 | ["listen_and_serv"])
|
36 |
| - # block2: optimize pass |
| 97 | + # block1~2: optimize pass |
37 | 98 | self.assertEqual([op.type for op in pserver.blocks[1].ops],
|
38 | 99 | ["sum", "scale", "sgd"])
|
39 |
| - |
40 | 100 | # confirm startup program
|
41 |
| - |
42 |
| - self.assertEqual([op.type for op in startup.global_block().ops], [ |
43 |
| - "fill_constant", "fill_constant", "uniform_random", "uniform_random" |
44 |
| - ]) |
45 |
| - |
| 101 | + self.assertEqual([op.type for op in startup.global_block().ops], |
| 102 | + ["fill_constant", "fill_constant", "uniform_random"]) |
46 | 103 | # the variable #fc_w will be split into two blocks
|
47 | 104 | fc_w_var = startup.global_block().var("fc_w.block1")
|
48 | 105 | self.assertEqual(fc_w_var.shape, (500, 1000))
|
| 106 | + # all parameters should be optimized on pserver |
| 107 | + |
| 108 | + pserver_params = [] |
| 109 | + for prog in [pserver, pserver2]: |
| 110 | + for blk in prog.blocks: |
| 111 | + for op in blk.ops: |
| 112 | + if "Param" in op.input_names: |
| 113 | + param_name = op.input("Param")[0] |
| 114 | + is_block_idx = param_name.find(".block") |
| 115 | + if is_block_idx != -1: |
| 116 | + origin_param_name = param_name[:is_block_idx] |
| 117 | + else: |
| 118 | + origin_param_name = param_name |
| 119 | + pserver_params.append(origin_param_name) |
| 120 | + trainer_params = [] |
| 121 | + for op in self.origin_prog.global_block().ops: |
| 122 | + if "Param" in op.input_names: |
| 123 | + trainer_params.append(op.input("Param")[0]) |
| 124 | + self.assertEqual(set(pserver_params), set(trainer_params)) |
| 125 | + |
| 126 | + |
| 127 | +class TestNoSliceVar(TranspilerTest): |
| 128 | + def setUp(self): |
| 129 | + super(TestNoSliceVar, self).setUp() |
| 130 | + self.slice_var_up = False |
| 131 | + |
| 132 | + def test_transpiler(self): |
| 133 | + _, startup = self.get_pserver(self.pserver1_ep) |
| 134 | + _, startup2 = self.get_pserver(self.pserver2_ep) |
| 135 | + |
| 136 | + if startup.global_block().vars.has_key("fc_w"): |
| 137 | + fc_w_var = startup.global_block().vars["fc_w"] |
| 138 | + elif startup2.global_block().vars.has_key("fc_w"): |
| 139 | + fc_w_var = startup2.global_block().vars["fc_w"] |
| 140 | + |
| 141 | + self.assertEqual(fc_w_var.shape, (1000, 1000)) |
49 | 142 |
|
50 |
| - def get_expect_trainer_ops(self): |
51 |
| - trainer = fluid.Program() |
52 | 143 |
|
53 |
| - with fluid.program_guard(trainer): |
54 |
| - optimize_ops, params_grads = self.net_conf() |
| 144 | +class TestLRDecay(TranspilerTest): |
| 145 | + def net_conf(self): |
| 146 | + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') |
| 147 | + y_predict = fluid.layers.fc(input=x, |
| 148 | + size=1000, |
| 149 | + act=None, |
| 150 | + param_attr=fluid.ParamAttr(name='fc_w'), |
| 151 | + bias_attr=fluid.ParamAttr(name='fc_b')) |
| 152 | + y = fluid.layers.data(name='y', shape=[1], dtype='float32') |
| 153 | + cost = fluid.layers.square_error_cost(input=y_predict, label=y) |
| 154 | + avg_cost = fluid.layers.mean(cost) |
| 155 | + sgd_optimizer = fluid.optimizer.SGD( |
| 156 | + learning_rate=fluid.layers.exponential_decay( |
| 157 | + learning_rate=1.0, |
| 158 | + decay_steps=2100, |
| 159 | + decay_rate=0.1, |
| 160 | + staircase=True)) |
| 161 | + sgd_optimizer.minimize(avg_cost) |
| 162 | + return |
| 163 | + |
| 164 | + def test_transpiler(self): |
| 165 | + pserver, startup = self.get_pserver(self.pserver1_ep) |
| 166 | + trainer = self.get_trainer() |
| 167 | + |
| 168 | + self.assertEqual(len(pserver.blocks), 4) |
| 169 | + lr_decay_ops = [op.type for op in pserver.blocks[1].ops] |
| 170 | + self.assertEqual(lr_decay_ops, [ |
| 171 | + "increment", "cast", "fill_constant", "elementwise_div", "floor", |
| 172 | + "fill_constant", "elementwise_pow", "fill_constant", |
| 173 | + "elementwise_mul" |
| 174 | + ]) |
| 175 | + |
| 176 | + |
| 177 | +class TestLRDecayConditional(TranspilerTest): |
| 178 | + def net_conf(self): |
| 179 | + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') |
| 180 | + y_predict = fluid.layers.fc(input=x, |
| 181 | + size=1000, |
| 182 | + act=None, |
| 183 | + param_attr=fluid.ParamAttr(name='fc_w'), |
| 184 | + bias_attr=fluid.ParamAttr(name='fc_b')) |
| 185 | + y = fluid.layers.data(name='y', shape=[1], dtype='float32') |
| 186 | + cost = fluid.layers.square_error_cost(input=y_predict, label=y) |
| 187 | + avg_cost = fluid.layers.mean(cost) |
| 188 | + sgd_optimizer = fluid.optimizer.SGD( |
| 189 | + learning_rate=fluid.layers.piecewise_decay([10000, 20000], |
| 190 | + [1.0, 0.5, 1.0])) |
| 191 | + sgd_optimizer.minimize(avg_cost) |
| 192 | + return |
| 193 | + |
| 194 | + def test_transpiler(self): |
| 195 | + pserver, startup = self.get_pserver(self.pserver1_ep) |
| 196 | + trainer = self.get_trainer() |
| 197 | + |
| 198 | + serv_op = pserver.blocks[0].ops[0] |
| 199 | + sub_blocks = [] |
| 200 | + optimize_blocks = [] |
| 201 | + for b in serv_op.attrs["optimize_blocks"]: |
| 202 | + optimize_blocks.append(b.idx) |
| 203 | + for b in pserver.blocks: |
| 204 | + if b.idx not in optimize_blocks: |
| 205 | + sub_blocks.append(b.idx) |
| 206 | + |
| 207 | + self.assertEqual(len(pserver.blocks), 7) |
| 208 | + lr_decay_ops = [op.type for op in pserver.blocks[1].ops] |
| 209 | + self.assertEqual(lr_decay_ops, [ |
| 210 | + "increment", "cast", "fill_constant", "fill_constant", "less_than", |
| 211 | + "logical_not", "conditional_block", "fill_constant", |
| 212 | + "fill_constant", "less_than", "logical_not", "logical_and", |
| 213 | + "logical_and", "conditional_block", "fill_constant", |
| 214 | + "conditional_block" |
| 215 | + ]) |
| 216 | + # test the condition blocks |
| 217 | + for b in sub_blocks: |
| 218 | + if b == 0: |
| 219 | + continue |
| 220 | + block = pserver.blocks[b] |
| 221 | + self.assertEqual([op.type for op in block.ops], ["assign"]) |
| 222 | + |
| 223 | + |
| 224 | +class TestL2Decay(TranspilerTest): |
| 225 | + def net_conf(self): |
| 226 | + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') |
| 227 | + y_predict = fluid.layers.fc( |
| 228 | + input=x, |
| 229 | + size=1000, |
| 230 | + act=None, |
| 231 | + param_attr=fluid.ParamAttr( |
| 232 | + name='fc_w', |
| 233 | + regularizer=fluid.regularizer.L2Decay(), |
| 234 | + gradient_clip=fluid.clip.GradientClipByValue(0.1)), |
| 235 | + bias_attr=fluid.ParamAttr(name='fc_b')) |
| 236 | + y = fluid.layers.data(name='y', shape=[1], dtype='float32') |
| 237 | + cost = fluid.layers.square_error_cost(input=y_predict, label=y) |
| 238 | + avg_cost = fluid.layers.mean(cost) |
| 239 | + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) |
| 240 | + sgd_optimizer.minimize(avg_cost) |
| 241 | + return |
| 242 | + |
| 243 | + def test_transpiler(self): |
| 244 | + pserver, startup = self.get_pserver(self.pserver1_ep) |
| 245 | + trainer = self.get_trainer() |
| 246 | + |
| 247 | + self.assertEqual(len(pserver.blocks), 3) |
| 248 | + self.assertEqual([op.type for op in pserver.blocks[1].ops], |
| 249 | + ["sum", "scale", "clip", "sgd"]) |
| 250 | + self.assertEqual( |
| 251 | + [op.type for op in pserver.blocks[2].ops], |
| 252 | + ["sum", "scale", "clip", "scale", "elementwise_add", "sgd"]) |
| 253 | + # TODO(typhoonzero): test clipping and L2Decay ops are removed from trainer |
| 254 | + |
55 | 255 |
|
56 |
| - delete_ops(trainer.global_block(), optimize_ops) |
57 |
| - ops = [op.type for op in trainer.global_block().ops] + [ |
58 |
| - "split_byref", "send", "send_barrier", "recv", "recv", |
59 |
| - "fetch_barrier", "concat" |
60 |
| - ] |
61 |
| - ops.insert(ops.index("elementwise_add_grad") + 1, "send") |
62 |
| - return ops |
| 256 | + # FIXME(typhoonzero): need to add test for async case: |
| 257 | + # see https://github.com/PaddlePaddle/Paddle/issues/11691 |
| 258 | +class TestAsyncSGD(TranspilerTest): |
| 259 | + pass |
63 | 260 |
|
64 | 261 |
|
65 | 262 | if __name__ == "__main__":
|
|
0 commit comments