1
+ import time
2
+
1
3
from dpdispatcher .machine import Machine
2
4
from dpdispatcher import dlog
3
5
from dpdispatcher .JobStatus import JobStatus
@@ -62,14 +64,25 @@ def gen_script_header(self, job):
62
64
63
65
return lsf_script_header
64
66
65
- def do_submit (self , job ):
67
+ def do_submit (self , job , retry = 0 , max_retry = 3 ):
66
68
script_file_name = job .script_file_name
67
69
script_str = self .gen_script (job )
68
70
job_id_name = job .job_hash + '_job_id'
69
71
self .context .write_file (fname = script_file_name , write_str = script_str )
70
- stdin , stdout , stderr = self .context .block_checkcall (
71
- 'cd %s && %s %s' % (self .context .remote_root , 'bsub < ' , script_file_name )
72
- )
72
+
73
+ try :
74
+ stdin , stdout , stderr = self .context .block_checkcall (
75
+ 'cd %s && %s %s' % (self .context .remote_root , 'bsub < ' , script_file_name )
76
+ )
77
+ except RuntimeError as err :
78
+ if retry < max_retry :
79
+ dlog .warning (err )
80
+ dlog .warning ("Sleep 60 s and retry submitting..." )
81
+ # rest 60s
82
+ time .sleep (60 )
83
+ return self .do_submit (job , retry = retry + 1 , max_retry = max_retry )
84
+ raise
85
+
73
86
subret = (stdout .readlines ())
74
87
job_id = subret [0 ].split ()[1 ][1 :- 1 ]
75
88
self .context .write_file (job_id_name , job_id )
@@ -85,7 +98,7 @@ def sub_script_cmd(self, res):
85
98
def sub_script_head (self , res ):
86
99
pass
87
100
88
- def check_status (self , job ):
101
+ def check_status (self , job , retry = 0 , max_retry = 3 ):
89
102
try :
90
103
job_id = job .job_id
91
104
except AttributeError :
@@ -101,6 +114,14 @@ def check_status(self, job):
101
114
else :
102
115
return JobStatus .terminated
103
116
elif ret != 0 :
117
+ # just retry when any unknown error raised.
118
+ if retry < max_retry :
119
+ dlog .warning ("Get error code %d in checking status through ssh with job: %s . message: %s" %
120
+ (ret , job .job_hash , err_str ))
121
+ dlog .warning ("Sleep 60 s and retry checking..." )
122
+ # rest 60s
123
+ time .sleep (60 )
124
+ return self .check_status (job , retry = retry + 1 , max_retry = max_retry )
104
125
raise RuntimeError ("status command bjobs fails to execute.\n error info: %s \n return code %d\n "
105
126
% (err_str , ret ))
106
127
status_out = stdout .read ().decode ('utf-8' ).split ('\n ' )
0 commit comments