@@ -166,18 +166,30 @@ class TestDistBase(unittest.TestCase):
166
166
def _setup_config (self ):
167
167
raise NotImplementedError ("tests should have _setup_config implemented" )
168
168
169
+ def _after_setup_config (self ):
170
+ if self ._enforce_place == "CPU" :
171
+ self .__use_cuda = False
172
+ elif self ._enforce_place == "GPU" :
173
+ self .__use_cuda = True
174
+ else :
175
+ if fluid .core .is_compiled_with_cuda ():
176
+ self .__use_cuda = True
177
+ else :
178
+ self .__use_cuda = False
179
+
169
180
def setUp (self ):
170
181
self ._trainers = 2
171
182
self ._pservers = 2
172
183
self ._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
173
184
self ._find_free_port (), self ._find_free_port ())
174
185
self ._python_interp = "python"
175
186
self ._sync_mode = True
176
- self ._use_cuda = True
187
+ self ._enforce_place = None
177
188
self ._mem_opt = False
178
189
self ._use_reduce = False
179
190
self ._use_reader_alloc = True
180
191
self ._setup_config ()
192
+ self ._after_setup_config ()
181
193
182
194
def _find_free_port (self ):
183
195
with closing (socket .socket (socket .AF_INET , socket .SOCK_STREAM )) as s :
@@ -201,13 +213,10 @@ def start_pserver(self, model_file, check_error_log, required_envs):
201
213
ps0_cmd += " --mem_opt"
202
214
ps1_cmd += " --mem_opt"
203
215
204
- ps0_pipe = subprocess .PIPE
205
- ps1_pipe = subprocess .PIPE
206
- if check_error_log :
207
- print (ps0_cmd )
208
- print (ps1_cmd )
209
- ps0_pipe = open ("/tmp/ps0_err.log" , "wb" )
210
- ps1_pipe = open ("/tmp/ps1_err.log" , "wb" )
216
+ print (ps0_cmd )
217
+ print (ps1_cmd )
218
+ ps0_pipe = open ("/tmp/ps0_err.log" , "wb" )
219
+ ps1_pipe = open ("/tmp/ps1_err.log" , "wb" )
211
220
212
221
ps0_proc = subprocess .Popen (
213
222
ps0_cmd .strip ().split (" " ),
@@ -220,10 +229,7 @@ def start_pserver(self, model_file, check_error_log, required_envs):
220
229
stderr = ps1_pipe ,
221
230
env = required_envs )
222
231
223
- if not check_error_log :
224
- return ps0_proc , ps1_proc , None , None
225
- else :
226
- return ps0_proc , ps1_proc , ps0_pipe , ps1_pipe
232
+ return ps0_proc , ps1_proc , ps0_pipe , ps1_pipe
227
233
228
234
def _wait_ps_ready (self , pid ):
229
235
retry_times = 50
@@ -244,15 +250,15 @@ def _run_local(self, model, envs, check_error_log):
244
250
245
251
cmd = "%s %s --role trainer" % (self ._python_interp , model )
246
252
247
- if self ._use_cuda :
253
+ if self .__use_cuda :
248
254
cmd += " --use_cuda"
249
255
env_local = {"CUDA_VISIBLE_DEVICES" : "0" }
250
256
else :
251
257
env_local = {'CPU_NUM' : '1' }
252
258
253
259
envs .update (env_local )
254
260
255
- if not check_error_log :
261
+ if check_error_log :
256
262
err_log = open ("/tmp/trainer.err.log" , "wb" )
257
263
local_proc = subprocess .Popen (
258
264
cmd .split (" " ),
@@ -266,7 +272,6 @@ def _run_local(self, model, envs, check_error_log):
266
272
stderr = subprocess .PIPE ,
267
273
env = envs )
268
274
269
- local_proc .wait ()
270
275
local_out , local_err = local_proc .communicate ()
271
276
local_ret = cpt .to_text (local_out )
272
277
@@ -307,7 +312,7 @@ def _run_cluster(self, model, envs, check_error_log):
307
312
if self ._use_reader_alloc :
308
313
tr0_cmd += " --use_reader_alloc"
309
314
tr1_cmd += " --use_reader_alloc"
310
- if self ._use_cuda :
315
+ if self .__use_cuda :
311
316
tr0_cmd += " --use_cuda"
312
317
tr1_cmd += " --use_cuda"
313
318
env0 = {"CUDA_VISIBLE_DEVICES" : "0" }
@@ -319,15 +324,10 @@ def _run_cluster(self, model, envs, check_error_log):
319
324
env0 .update (envs )
320
325
env1 .update (envs )
321
326
322
- FNULL = open (os .devnull , 'w' )
323
-
324
- tr0_pipe = subprocess .PIPE
325
- tr1_pipe = subprocess .PIPE
326
- if check_error_log :
327
- print ("tr0_cmd:{}, env0: {}" .format (tr0_cmd , env0 ))
328
- print ("tr1_cmd:{}, env1: {}" .format (tr1_cmd , env1 ))
329
- tr0_pipe = open ("/tmp/tr0_err.log" , "wb" )
330
- tr1_pipe = open ("/tmp/tr1_err.log" , "wb" )
327
+ print ("tr0_cmd:{}, env0: {}" .format (tr0_cmd , env0 ))
328
+ print ("tr1_cmd:{}, env1: {}" .format (tr1_cmd , env1 ))
329
+ tr0_pipe = open ("/tmp/tr0_err.log" , "wb" )
330
+ tr1_pipe = open ("/tmp/tr1_err.log" , "wb" )
331
331
332
332
tr0_proc = subprocess .Popen (
333
333
tr0_cmd .strip ().split (" " ),
@@ -340,29 +340,22 @@ def _run_cluster(self, model, envs, check_error_log):
340
340
stderr = tr1_pipe ,
341
341
env = env1 )
342
342
343
- tr0_proc .wait ()
344
- tr1_proc .wait ()
345
-
346
343
tr0_out , tr0_err = tr0_proc .communicate ()
347
344
tr0_loss_text = cpt .to_text (tr0_out )
348
345
tr1_out , tr1_err = tr1_proc .communicate ()
349
346
tr1_loss_text = cpt .to_text (tr1_out )
350
347
351
348
# close trainer file
352
- if check_error_log :
353
- tr0_pipe .close ()
354
- tr1_pipe .close ()
349
+ tr0_pipe .close ()
350
+ tr1_pipe .close ()
355
351
356
- ps0_pipe .close ()
357
- ps1_pipe .close ()
352
+ ps0_pipe .close ()
353
+ ps1_pipe .close ()
358
354
# FIXME: use terminate() instead of sigkill.
359
355
os .kill (ps0 .pid , signal .SIGKILL )
360
356
os .kill (ps1 .pid , signal .SIGKILL )
361
357
ps0 .terminate ()
362
358
ps1 .terminate ()
363
- ps0 .wait ()
364
- ps1 .wait ()
365
- FNULL .close ()
366
359
367
360
# print log
368
361
sys .stderr .write ('trainer 0 stdout:\n %s\n ' % tr0_loss_text )
@@ -387,6 +380,7 @@ def check_with_place(self,
387
380
"LD_LIBRARY_PATH" : os .getenv ("LD_LIBRARY_PATH" , "" ),
388
381
"FLAGS_fraction_of_gpu_memory_to_use" : "0.15" ,
389
382
"FLAGS_cudnn_deterministic" : "1" ,
383
+ "http_proxy" : ""
390
384
}
391
385
392
386
required_envs .update (need_envs )
0 commit comments