51
51
logger .setLevel (logging .INFO )
52
52
log_handler = logging .StreamHandler ()
53
53
log_format = logging .Formatter (
54
- '%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s: %(message)s' )
54
+ '%(levelname)s %( asctime)s %(filename)s:%(lineno)d] %(message)s' )
55
55
log_handler .setFormatter (log_format )
56
56
logger .addHandler (log_handler )
57
57
@@ -71,7 +71,7 @@ def _parse_args():
71
71
parser = ArgumentParser (
72
72
description = '''start paddle training using multi-process mode.
73
73
NOTE: your train program ***must*** run as distributed nccl2 mode,
74
- see: http://www.paddlepaddle.org/documentation/docs/zh/1.2 /user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
74
+ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6 /user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
75
75
And your train program must read environment variables below in order to let different
76
76
process init properly:
77
77
FLAGS_selected_gpus
@@ -147,9 +147,6 @@ def terminate_procs(procs):
147
147
def start_procs (args ):
148
148
"""
149
149
"""
150
- procs = []
151
- log_fns = []
152
-
153
150
default_env = os .environ .copy ()
154
151
155
152
current_node_ip = args .node_ip
@@ -213,48 +210,49 @@ def start_procs(args):
213
210
current_env .pop ("https_proxy" , None )
214
211
215
212
procs = []
213
+ log_fns = []
216
214
cmds = []
215
+ ranks = []
217
216
for i in range (0 , selected_gpus_num ):
217
+ rank = (node_id * selected_gpus_num + i )
218
218
current_env .update ({
219
219
"FLAGS_selected_gpus" : "%s" % selected_gpus [i ],
220
- "PADDLE_TRAINER_ID" : "%d" % ( node_id * selected_gpus_num + i ) ,
220
+ "PADDLE_TRAINER_ID" : "%d" % rank ,
221
221
"PADDLE_CURRENT_ENDPOINT" :
222
222
"%s:%d" % (current_node_ip , args .started_port + i ),
223
223
"PADDLE_TRAINERS_NUM" : "%d" % nranks ,
224
224
"PADDLE_TRAINER_ENDPOINTS" : trainers_endpoints
225
225
})
226
226
227
- if num_nodes > 1 :
228
- current_env .update ({"FLAGS_sync_nccl_allreduce" : "0" })
229
-
230
227
cmd = [sys .executable , "-u" , args .training_script
231
228
] + args .training_script_args
232
-
233
229
cmds .append (cmd )
234
230
235
231
if args .log_dir is not None :
236
232
os .system ("mkdir -p {}" .format (args .log_dir ))
237
233
fn = open ("%s/workerlog.%d" % (args .log_dir , i ), "w" )
238
234
log_fns .append (fn )
239
-
240
235
proc = subprocess .Popen (cmd , env = current_env , stdout = fn , stderr = fn )
241
236
else :
242
237
proc = subprocess .Popen (cmd , env = current_env )
243
238
244
239
procs .append (proc )
240
+ ranks .append (rank )
245
241
246
242
try :
247
243
alive = True
248
244
error = False
245
+ error_rank = []
249
246
# wait all process finish or one error
250
247
while alive and not error :
251
248
alive = False
252
- for p in procs :
249
+ for rank , p in zip ( ranks , procs ) :
253
250
ret = p .poll ()
254
251
if ret is None :
255
252
alive = True
256
253
elif ret != 0 :
257
254
error = True
255
+ error_rank .append (rank )
258
256
time .sleep (1 )
259
257
260
258
if error :
@@ -266,11 +264,15 @@ def start_procs(args):
266
264
terminate_procs (procs )
267
265
raise
268
266
except SystemExit :
269
- logger .error ("One trainer process abort, exit" )
267
+ logger .error (
268
+ "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log." .
269
+ format (nranks , error_rank ))
270
270
terminate_procs (procs )
271
271
raise
272
272
except :
273
- logger .error ("Trainer process abort, exit" )
273
+ logger .error (
274
+ "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log." .
275
+ format (nranks , error_rank ))
274
276
terminate_procs (procs )
275
277
raise
276
278
finally :
0 commit comments