@@ -200,14 +200,29 @@ class TestParallelExecutorBase(unittest.TestCase):
200
200
def check_network_convergence (self ,
201
201
method ,
202
202
memory_opt = True ,
203
- iter = 10 ,
203
+ iter = 50 ,
204
204
batch_size = None ,
205
205
allow_op_delay = False ,
206
- feed_dict = None ):
206
+ feed_dict = None ,
207
+ seed = None ,
208
+ use_parallel_executor = True ):
209
+ def run_executor (exe , feed , fetch_list , program = None ):
210
+ if isinstance (exe , fluid .ParallelExecutor ):
211
+ res = exe .run (fetch_list = fetch_list , feed = feed )
212
+ elif isinstance (exe , fluid .Executor ):
213
+ if program is None :
214
+ program = fluid .default_main_program ()
215
+ res = exe .run (program = program , feed = feed , fetch_list = fetch_list )
216
+ else :
217
+ raise ValueError ('Unkown type exe' )
218
+ return res
219
+
207
220
main = fluid .Program ()
208
221
startup = fluid .Program ()
209
222
startup .random_seed = 1 # Fix random seed
210
223
with fluid .program_guard (main , startup ):
224
+ if seed is not None :
225
+ startup .random_seed = seed
211
226
loss = method (use_feed = feed_dict is not None )
212
227
adam = fluid .optimizer .Adam ()
213
228
adam .minimize (loss )
@@ -217,18 +232,24 @@ def check_network_convergence(self,
217
232
startup_exe = fluid .Executor (place )
218
233
startup_exe .run (startup )
219
234
220
- exe = fluid .ParallelExecutor (
221
- True , loss_name = loss .name , allow_op_delay = allow_op_delay )
235
+ if use_parallel_executor :
236
+ exe = fluid .ParallelExecutor (
237
+ True , loss_name = loss .name , allow_op_delay = allow_op_delay )
238
+ else :
239
+ exe = fluid .Executor (place = place )
240
+
222
241
if batch_size is not None :
223
242
batch_size *= fluid .core .get_cuda_device_count ()
224
243
begin = time .time ()
225
- first_loss , = exe .run ([loss .name ], feed = feed_dict )
244
+ first_loss , = run_executor (
245
+ exe = exe , feed = feed_dict , fetch_list = [loss .name ])
226
246
first_loss = numpy .array (first_loss )
227
247
228
248
for i in xrange (iter ):
229
- exe . run ([] , feed = feed_dict )
249
+ run_executor ( exe = exe , feed = feed_dict , fetch_list = [] )
230
250
231
- last_loss , = exe .run ([loss .name ], feed = feed_dict )
251
+ last_loss , = run_executor (
252
+ exe = exe , feed = feed_dict , fetch_list = [loss .name ])
232
253
end = time .time ()
233
254
234
255
if batch_size is not None :
@@ -239,6 +260,7 @@ def check_network_convergence(self,
239
260
240
261
print first_loss , last_loss
241
262
# self.assertGreater(first_loss[0], last_loss[0])
263
+ return first_loss , last_loss
242
264
243
265
244
266
class TestMNIST (TestParallelExecutorBase ):
@@ -268,6 +290,27 @@ def test_simple_fc(self):
268
290
simple_fc_net , feed_dict = {"image" : img ,
269
291
"label" : label })
270
292
293
+ def test_simple_fc_parallel_accuracy (self ):
294
+ img = numpy .zeros (shape = [32 , 784 ], dtype = 'float32' )
295
+ label = numpy .ones (shape = [32 , 1 ], dtype = 'int64' )
296
+ single_first_loss , single_last_loss = self .check_network_convergence (
297
+ method = simple_fc_net ,
298
+ seed = 1000 ,
299
+ feed_dict = {"image" : img ,
300
+ "label" : label },
301
+ use_parallel_executor = False )
302
+ parallel_first_loss , parallel_last_loss = self .check_network_convergence (
303
+ method = simple_fc_net ,
304
+ seed = 1000 ,
305
+ feed_dict = {"image" : img ,
306
+ "label" : label },
307
+ use_parallel_executor = True )
308
+
309
+ for p_f in parallel_first_loss :
310
+ self .assertAlmostEquals (p_f , single_first_loss [0 ], delta = 1e-6 )
311
+ for p_l in parallel_last_loss :
312
+ self .assertAlmostEquals (p_l , single_last_loss [0 ], delta = 1e-6 )
313
+
271
314
def test_batchnorm_fc (self ):
272
315
self .check_network_convergence (fc_with_batchnorm )
273
316
img = numpy .zeros (shape = [32 , 784 ], dtype = 'float32' )
@@ -496,10 +539,10 @@ def test_parallel_testing(self):
496
539
share_vars_from = train_exe )
497
540
498
541
for i in xrange (5 ):
499
- test_loss , = test_exe .run ([loss .name ], feed_dict = feed_dict )
542
+ test_loss , = test_exe .run ([loss .name ], feed = feed_dict )
500
543
test_loss = numpy .array (test_loss )
501
544
502
- train_loss , = train_exe .run ([loss .name ], feed_dict = feed_dict )
545
+ train_loss , = train_exe .run ([loss .name ], feed = feed_dict )
503
546
train_loss = numpy .array (train_loss )
504
547
self .assertTrue (
505
548
numpy .allclose (
0 commit comments