@@ -164,18 +164,30 @@ class TestDistBase(unittest.TestCase):
164
164
def _setup_config (self ):
165
165
raise NotImplementedError ("tests should have _setup_config implemented" )
166
166
167
+ def _after_setup_config (self ):
168
+ if self ._enforce_place == "CPU" :
169
+ self .__use_cuda = False
170
+ elif self ._enforce_place == "GPU" :
171
+ self .__use_cuda = True
172
+ else :
173
+ if fluid .core .is_compiled_with_cuda ():
174
+ self .__use_cuda = True
175
+ else :
176
+ self .__use_cuda = False
177
+
167
178
def setUp (self ):
168
179
self ._trainers = 2
169
180
self ._pservers = 2
170
181
self ._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
171
182
self ._find_free_port (), self ._find_free_port ())
172
183
self ._python_interp = "python"
173
184
self ._sync_mode = True
174
- self ._use_cuda = True
185
+ self ._enforce_place = None
175
186
self ._mem_opt = False
176
187
self ._use_reduce = False
177
188
self ._use_reader_alloc = True
178
189
self ._setup_config ()
190
+ self ._after_setup_config ()
179
191
180
192
def _find_free_port (self ):
181
193
with closing (socket .socket (socket .AF_INET , socket .SOCK_STREAM )) as s :
@@ -199,13 +211,10 @@ def start_pserver(self, model_file, check_error_log, required_envs):
199
211
ps0_cmd += " --mem_opt"
200
212
ps1_cmd += " --mem_opt"
201
213
202
- ps0_pipe = subprocess .PIPE
203
- ps1_pipe = subprocess .PIPE
204
- if check_error_log :
205
- print (ps0_cmd )
206
- print (ps1_cmd )
207
- ps0_pipe = open ("/tmp/ps0_err.log" , "wb" )
208
- ps1_pipe = open ("/tmp/ps1_err.log" , "wb" )
214
+ print (ps0_cmd )
215
+ print (ps1_cmd )
216
+ ps0_pipe = open ("/tmp/ps0_err.log" , "wb" )
217
+ ps1_pipe = open ("/tmp/ps1_err.log" , "wb" )
209
218
210
219
ps0_proc = subprocess .Popen (
211
220
ps0_cmd .strip ().split (" " ),
@@ -218,10 +227,7 @@ def start_pserver(self, model_file, check_error_log, required_envs):
218
227
stderr = ps1_pipe ,
219
228
env = required_envs )
220
229
221
- if not check_error_log :
222
- return ps0_proc , ps1_proc , None , None
223
- else :
224
- return ps0_proc , ps1_proc , ps0_pipe , ps1_pipe
230
+ return ps0_proc , ps1_proc , ps0_pipe , ps1_pipe
225
231
226
232
def _wait_ps_ready (self , pid ):
227
233
retry_times = 50
@@ -242,15 +248,15 @@ def _run_local(self, model, envs, check_error_log):
242
248
243
249
cmd = "%s %s --role trainer" % (self ._python_interp , model )
244
250
245
- if self ._use_cuda :
251
+ if self .__use_cuda :
246
252
cmd += " --use_cuda"
247
253
env_local = {"CUDA_VISIBLE_DEVICES" : "0" }
248
254
else :
249
255
env_local = {'CPU_NUM' : '1' }
250
256
251
257
envs .update (env_local )
252
258
253
- if not check_error_log :
259
+ if check_error_log :
254
260
err_log = open ("/tmp/trainer.err.log" , "wb" )
255
261
local_proc = subprocess .Popen (
256
262
cmd .split (" " ),
@@ -264,7 +270,6 @@ def _run_local(self, model, envs, check_error_log):
264
270
stderr = subprocess .PIPE ,
265
271
env = envs )
266
272
267
- local_proc .wait ()
268
273
local_out , local_err = local_proc .communicate ()
269
274
local_ret = cpt .to_text (local_out )
270
275
@@ -305,7 +310,7 @@ def _run_cluster(self, model, envs, check_error_log):
305
310
if self ._use_reader_alloc :
306
311
tr0_cmd += " --use_reader_alloc"
307
312
tr1_cmd += " --use_reader_alloc"
308
- if self ._use_cuda :
313
+ if self .__use_cuda :
309
314
tr0_cmd += " --use_cuda"
310
315
tr1_cmd += " --use_cuda"
311
316
env0 = {"CUDA_VISIBLE_DEVICES" : "0" }
@@ -317,15 +322,10 @@ def _run_cluster(self, model, envs, check_error_log):
317
322
env0 .update (envs )
318
323
env1 .update (envs )
319
324
320
- FNULL = open (os .devnull , 'w' )
321
-
322
- tr0_pipe = subprocess .PIPE
323
- tr1_pipe = subprocess .PIPE
324
- if check_error_log :
325
- print ("tr0_cmd:{}, env0: {}" .format (tr0_cmd , env0 ))
326
- print ("tr1_cmd:{}, env1: {}" .format (tr1_cmd , env1 ))
327
- tr0_pipe = open ("/tmp/tr0_err.log" , "wb" )
328
- tr1_pipe = open ("/tmp/tr1_err.log" , "wb" )
325
+ print ("tr0_cmd:{}, env0: {}" .format (tr0_cmd , env0 ))
326
+ print ("tr1_cmd:{}, env1: {}" .format (tr1_cmd , env1 ))
327
+ tr0_pipe = open ("/tmp/tr0_err.log" , "wb" )
328
+ tr1_pipe = open ("/tmp/tr1_err.log" , "wb" )
329
329
330
330
tr0_proc = subprocess .Popen (
331
331
tr0_cmd .strip ().split (" " ),
@@ -338,29 +338,22 @@ def _run_cluster(self, model, envs, check_error_log):
338
338
stderr = tr1_pipe ,
339
339
env = env1 )
340
340
341
- tr0_proc .wait ()
342
- tr1_proc .wait ()
343
-
344
341
tr0_out , tr0_err = tr0_proc .communicate ()
345
342
tr0_loss_text = cpt .to_text (tr0_out )
346
343
tr1_out , tr1_err = tr1_proc .communicate ()
347
344
tr1_loss_text = cpt .to_text (tr1_out )
348
345
349
346
# close trainer file
350
- if check_error_log :
351
- tr0_pipe .close ()
352
- tr1_pipe .close ()
347
+ tr0_pipe .close ()
348
+ tr1_pipe .close ()
353
349
354
- ps0_pipe .close ()
355
- ps1_pipe .close ()
350
+ ps0_pipe .close ()
351
+ ps1_pipe .close ()
356
352
# FIXME: use terminate() instead of sigkill.
357
353
os .kill (ps0 .pid , signal .SIGKILL )
358
354
os .kill (ps1 .pid , signal .SIGKILL )
359
355
ps0 .terminate ()
360
356
ps1 .terminate ()
361
- ps0 .wait ()
362
- ps1 .wait ()
363
- FNULL .close ()
364
357
365
358
# print log
366
359
sys .stderr .write ('trainer 0 stdout:\n %s\n ' % tr0_loss_text )
@@ -385,6 +378,7 @@ def check_with_place(self,
385
378
"LD_LIBRARY_PATH" : os .getenv ("LD_LIBRARY_PATH" , "" ),
386
379
"FLAGS_fraction_of_gpu_memory_to_use" : "0.15" ,
387
380
"FLAGS_cudnn_deterministic" : "1" ,
381
+ "http_proxy" : ""
388
382
}
389
383
390
384
required_envs .update (need_envs )
0 commit comments