@@ -30,83 +30,83 @@ class QJobInfo:
30
30
:author Hans J. Johnson
31
31
"""
32
32
33
- def __init__ (self , jobNum , jobQueueState , jobTime , jobQueueName , jobSlots , qsub_command_line ):
33
+ def __init__ (self , job_num , job_queue_state , job_time , job_queue_name , job_slots , qsub_command_line ):
34
34
# self._jobName = None # Ascii text name of job not unique
35
- self ._jobNum = int (
36
- jobNum ) # The primary unique identifier for this job, must be an integer!
35
+ self ._job_num = int (
36
+ job_num ) # The primary unique identifier for this job, must be an integer!
37
37
# self._jobOwn = None # Who owns this job
38
- self ._jobQueueState = str (
39
- jobQueueState ) # ["running","zombie",...??]
38
+ self ._job_queue_state = str (
39
+ job_queue_state ) # ["running","zombie",...??]
40
40
# self._jobActionState = str(jobActionState) # ['r','qw','S',...??]
41
- self ._jobTime = jobTime # The job start time
42
- self ._jobInfoCreationTime = time .time (
41
+ self ._job_time = job_time # The job start time
42
+ self ._job_info_creation_time = time .time (
43
43
) # When this job was created (for comparing against initalization)
44
- self ._jobQueueName = jobQueueName # Where the job is running
45
- self ._jobSlots = jobSlots # How many slots are being used
44
+ self ._job_queue_name = job_queue_name # Where the job is running
45
+ self ._job_slots = job_slots # How many slots are being used
46
46
self ._qsub_command_line = qsub_command_line
47
47
48
48
def __repr__ (self ):
49
- return str (self ._jobNum ).ljust (8 ) \
50
- + str (self ._jobQueueState ).ljust (12 ) \
51
- + str (self ._jobSlots ).ljust (3 ) \
52
- + time .strftime ("%Y-%m-%dT%H:%M:%S" , time .gmtime (self ._jobTime )).ljust (20 ) \
53
- + str (self ._jobQueueName ).ljust (8 ) \
49
+ return str (self ._job_num ).ljust (8 ) \
50
+ + str (self ._job_queue_state ).ljust (12 ) \
51
+ + str (self ._job_slots ).ljust (3 ) \
52
+ + time .strftime ("%Y-%m-%dT%H:%M:%S" , time .gmtime (self ._job_time )).ljust (20 ) \
53
+ + str (self ._job_queue_name ).ljust (8 ) \
54
54
+ str (self ._qsub_command_line )
55
55
56
56
def is_initializing (self ):
57
- return self ._jobQueueState == "initializing"
57
+ return self ._job_queue_state == "initializing"
58
58
59
59
def is_zombie (self ):
60
- return self ._jobQueueState == "zombie"
60
+ return self ._job_queue_state == "zombie"
61
61
62
62
def is_running (self ):
63
- return self ._jobQueueState == "running"
63
+ return self ._job_queue_state == "running"
64
64
65
65
def is_pending (self ):
66
- return self ._jobQueueState == "pending"
66
+ return self ._job_queue_state == "pending"
67
67
68
68
def is_job_state_pending (self ):
69
69
""" Return True, unless job is in the "zombie" status
70
70
"""
71
- time_diff = (time .time () - self ._jobInfoCreationTime )
71
+ time_diff = (time .time () - self ._job_info_creation_time )
72
72
if self .is_zombie ():
73
73
sge_debug_print (
74
74
"DONE! QJobInfo.IsPending found in 'zombie' list, returning False so claiming done!\n {0}" .format (self ))
75
- isPendingStatus = False # Job explicitly found as being completed!
75
+ is_pending_status = False # Job explicitly found as being completed!
76
76
elif self .is_initializing () and (time_diff > 600 ):
77
77
# if initializing for more than 5 minute, failure due to
78
78
# initialization and completion before registration
79
79
sge_debug_print (
80
80
"FAILURE! QJobInfo.IsPending found long running at {1} seconds"
81
81
"'initializing' returning False for to break loop!\n {0}" .format (self , time_diff ))
82
- isPendingStatus = True # Job initialization took too long, so report!
82
+ is_pending_status = True # Job initialization took too long, so report!
83
83
else : # self.is_running() || self.is_pending():
84
- isPendingStatus = True # Job cache last listed as running
85
- return isPendingStatus # The job is in one of the hold states
84
+ is_pending_status = True # Job cache last listed as running
85
+ return is_pending_status # The job is in one of the hold states
86
86
87
- def update_info (self , jobQueueState , jobTime , jobQueueName , jobSlots ):
88
- self ._jobQueueState = jobQueueState
89
- self ._jobTime = jobTime
90
- self ._jobQueueName = jobQueueName
91
- self ._jobSlots = jobSlots
87
+ def update_info (self , job_queue_state , job_time , job_queue_name , job_slots ):
88
+ self ._job_queue_state = job_queue_state
89
+ self ._job_time = job_time
90
+ self ._job_queue_name = job_queue_name
91
+ self ._job_slots = job_slots
92
92
93
93
def set_state (self , new_state ):
94
- self ._jobQueueState = new_state
94
+ self ._job_queue_state = new_state
95
95
96
96
97
97
class QstatSubstitute :
98
98
99
99
"""A wrapper for Qstat to avoid overloading the
100
100
SGE/OGS server with rapid continuous qstat requests"""
101
101
102
- def __init__ (self , qstatInstantExecutable = 'qstat' , qstatCachedExecutable = 'qstat' ):
102
+ def __init__ (self , qstat_instant_executable = 'qstat' , qstat_cached_executable = 'qstat' ):
103
103
"""
104
- :param qstatInstantExecutable :
105
- :param qstatCachedExecutable :
104
+ :param qstat_instant_executable :
105
+ :param qstat_cached_executable :
106
106
"""
107
- self ._qstatInstantExecutable = qstatInstantExecutable
108
- self ._qstatCachedExecutable = qstatCachedExecutable
109
- self ._OutOfScopeJobs = list () # Initialize first
107
+ self ._qstat_instant_executable = qstat_instant_executable
108
+ self ._qstat_cached_executable = qstat_cached_executable
109
+ self ._out_of_scope_jobs = list () # Initialize first
110
110
self ._task_dictionary = dict (
111
111
) # {'taskid': QJobInfo(), .... } The dictionaryObject
112
112
self ._remove_old_jobs ()
@@ -117,47 +117,49 @@ def _remove_old_jobs(self):
117
117
are jobs that existed prior to starting a new jobs, so they are irrelevant.
118
118
"""
119
119
self ._run_qstat ("QstatInitialization" , True )
120
- # If qstat does not exist on this system, then quietly
121
- # fail during init
120
+ # If qstat does not exist on this system, then quietly
121
+ # fail during init
122
122
123
123
def add_startup_job (self , taskid , qsub_command_line ):
124
124
"""
125
125
:param taskid: The job id
126
- :param scriptFile : When initializing, re-use the jobQueue name
126
+ :param qsub_command_line : When initializing, re-use the job_queue_name
127
127
:return: NONE
128
128
"""
129
129
taskid = int (taskid ) # Ensure that it is an integer
130
- self ._task_dictionary [taskid ] = QJobInfo (
131
- taskid , "initializing" , time . time (), "noQueue" , 1 , qsub_command_line )
130
+ self ._task_dictionary [taskid ] = QJobInfo (taskid , "initializing" , time . time (),
131
+ "noQueue" , 1 , qsub_command_line )
132
132
133
- def _qacct_verified_complete (self , taskid ):
133
+ @staticmethod
134
+ def _qacct_verified_complete (taskid ):
134
135
""" request definitive job completion information for the current job
135
136
from the qacct report
136
137
"""
137
- sge_debug_print (
138
- "WARNING: CONTACTING qacct for finished jobs, {0}: {1}" .format (time .time (), "Verifying Completion" ))
138
+ sge_debug_print ("WARNING: "
139
+ "CONTACTING qacct for finished jobs, "
140
+ "{0}: {1}" .format (time .time (), "Verifying Completion" ))
139
141
140
- thisCommand = 'qacct'
142
+ this_command = 'qacct'
141
143
qacct_retries = 10
142
- isComplete = False
144
+ is_complete = False
143
145
while qacct_retries > 0 :
144
146
qacct_retries -= 1
145
147
try :
146
148
proc = subprocess .Popen (
147
- [thisCommand , '-o' , os .getlogin (), '-j' , str (taskid )],
149
+ [this_command , '-o' , os .getlogin (), '-j' , str (taskid )],
148
150
stdout = subprocess .PIPE ,
149
151
stderr = subprocess .PIPE )
150
152
qacct_result , _ = proc .communicate ()
151
153
if qacct_result .find (str (taskid )):
152
- isComplete = True
154
+ is_complete = True
153
155
sge_debug_print (
154
156
"NOTE: qacct for jobs\n {0}" .format (qacct_result ))
155
157
break
156
158
except :
157
159
sge_debug_print ("NOTE: qacct call failed" )
158
160
time .sleep (5 )
159
161
pass
160
- return isComplete
162
+ return is_complete
161
163
162
164
def _parse_qstat_job_list (self , xml_job_list ):
163
165
current_jobs_parsed = list ()
@@ -166,38 +168,39 @@ def _parse_qstat_job_list(self, xml_job_list):
166
168
# jobown =
167
169
# current_job_element.getElementsByTagName('JB_owner')[0].childNodes[0].data
168
170
try :
169
- jobQueueName = current_job_element .getElementsByTagName (
171
+ job_queue_name = current_job_element .getElementsByTagName (
170
172
'queue_name' )[0 ].childNodes [0 ].data
171
173
except :
172
- jobQueueName = "unknown"
174
+ job_queue_name = "unknown"
173
175
try :
174
- jobSlots = current_job_element .getElementsByTagName (
176
+ job_slots = current_job_element .getElementsByTagName (
175
177
'slots' )[0 ].childNodes [0 ].data
176
178
except :
177
- jobSlots = "uknown"
178
- jobQueueState = current_job_element .getAttribute ('state' )
179
- jobNum = int (current_job_element .getElementsByTagName (
179
+ job_slots = "uknown"
180
+ job_queue_state = current_job_element .getAttribute ('state' )
181
+ job_num = int (current_job_element .getElementsByTagName (
180
182
'JB_job_number' )[0 ].childNodes [0 ].data )
181
183
try :
182
- jobtimeText = current_job_element .getElementsByTagName (
184
+ job_time_text = current_job_element .getElementsByTagName (
183
185
'JAT_start_time' )[0 ].childNodes [0 ].data
184
- jobTime = float (time .mktime (time .strptime (
185
- jobtimeText , "%Y-%m-%dT%H:%M:%S" )))
186
+ job_time = float (time .mktime (time .strptime (
187
+ job_time_text , "%Y-%m-%dT%H:%M:%S" )))
186
188
except :
187
- jobTime = float (0.0 )
189
+ job_time = float (0.0 )
188
190
# Make job entry
189
191
190
- taskId = int (jobNum )
191
- if taskId in self ._task_dictionary :
192
- self ._task_dictionary [taskId ].update_info (
193
- jobQueueState , jobTime , jobQueueName , jobSlots )
192
+ task_id = int (job_num )
193
+ if task_id in self ._task_dictionary :
194
+ self ._task_dictionary [task_id ].update_info (
195
+ job_queue_state , job_time , job_queue_name , job_slots )
194
196
sge_debug_print ("Updating job: {0}" .format (
195
- self ._task_dictionary [taskId ]))
196
- current_jobs_parsed .append (jobNum )
197
+ self ._task_dictionary [task_id ]))
198
+ current_jobs_parsed .append (job_num )
199
+ # Changed from job_num as "in" is used to check which does not cast
197
200
else :
198
201
# Any Job that was not explicitly added with qsub command is
199
202
# out of scope
200
- self ._OutOfScopeJobs .append (int (taskId ))
203
+ self ._out_of_scope_jobs .append (int (task_id ))
201
204
202
205
# To ensure that every job is in the dictionary has a state reported
203
206
# by the SGE environment, it is necessary to explicitly check jobs
@@ -211,41 +214,41 @@ def _parse_qstat_job_list(self, xml_job_list):
211
214
if is_completed :
212
215
self ._task_dictionary [dictionary_job ].set_state ("zombie" )
213
216
else :
214
- sge_debug_print (
215
- "ERROR: Job not in current parselist, and not in done list {0}: {1}" .format (
216
- dictionary_job , self ._task_dictionary [dictionary_job ]))
217
+ sge_debug_print ("ERROR: Job not in current parselist, "
218
+ " and not in done list {0}: {1}" .format (dictionary_job ,
219
+ self ._task_dictionary [dictionary_job ]))
217
220
pass
218
221
if self ._task_dictionary [dictionary_job ].is_initializing ():
219
222
is_completed = self ._qacct_verified_complete (dictionary_job )
220
223
if is_completed :
221
224
self ._task_dictionary [dictionary_job ].set_state ("zombie" )
222
225
else :
223
- sge_debug_print (
224
- "ERROR: Job not in still in intializing mode, and not in done list {0}: {1}" .format (
225
- dictionary_job , self ._task_dictionary [dictionary_job ]))
226
+ sge_debug_print ("ERROR: Job not in still in intializing mode, "
227
+ " and not in done list {0}: {1}" .format (dictionary_job ,
228
+ self ._task_dictionary [dictionary_job ]))
226
229
pass
227
230
228
- def _run_qstat (self , reasonForQstat , forceInstant = True ):
231
+ def _run_qstat (self , reason_for_qstat , force_instant = True ):
229
232
""" request all job information for the current user in xmlformat.
230
233
See documentation from java documentation:
231
234
http://arc.liv.ac.uk/SGE/javadocs/jgdi/com/sun/grid/jgdi/monitoring/filter/JobStateFilter.html
232
235
-s r gives running jobs
233
236
-s z gives recently completed jobs (**recently** is very ambiguous)
234
237
-s s suspended jobs
235
238
"""
236
- sge_debug_print (
237
- "WARNING: CONTACTING qmaster for jobs, {0}: {1}" .format (time .time (), reasonForQstat ))
238
- if forceInstant :
239
- thisCommand = self ._qstatInstantExecutable
239
+ sge_debug_print ("WARNING: CONTACTING qmaster for jobs, "
240
+ " {0}: {1}" .format (time .time (), reason_for_qstat ))
241
+ if force_instant :
242
+ this_command = self ._qstat_instant_executable
240
243
else :
241
- thisCommand = self ._qstatCachedExecutable
244
+ this_command = self ._qstat_cached_executable
242
245
243
246
qstat_retries = 10
244
247
while qstat_retries > 0 :
245
248
qstat_retries -= 1
246
249
try :
247
250
proc = subprocess .Popen (
248
- [thisCommand , '-u' , os .getlogin (), '-xml' , '-s' , 'psrz' ],
251
+ [this_command , '-u' , os .getlogin (), '-xml' , '-s' , 'psrz' ],
249
252
stdout = subprocess .PIPE ,
250
253
stderr = subprocess .PIPE )
251
254
qstat_xml_result , _ = proc .communicate ()
@@ -256,12 +259,12 @@ def _run_qstat(self, reasonForQstat, forceInstant=True):
256
259
self ._parse_qstat_job_list (runjobs )
257
260
break
258
261
except Exception as inst :
259
- exceptionMessage = "QstatParsingError:\n \t {0}\n \t {1}\n " .format (
262
+ exception_message = "QstatParsingError:\n \t {0}\n \t {1}\n " .format (
260
263
type (
261
264
inst ), # the exception instance
262
265
inst # __str__ allows args to printed directly
263
266
)
264
- sge_debug_print (exceptionMessage )
267
+ sge_debug_print (exception_message )
265
268
time .sleep (5 )
266
269
pass
267
270
@@ -270,36 +273,32 @@ def print_dictionary(self):
270
273
for vv in self ._task_dictionary .values ():
271
274
sge_debug_print (str (vv ))
272
275
273
- def is_job_pending (self , taskId , recursionNumber = 12 ):
274
- taskId = int (taskId ) # Ensure that it is an integer
275
- self ._run_qstat (
276
- "checking job pending status {0}" . format ( taskId ), False )
277
- if taskId in self ._task_dictionary :
276
+ def is_job_pending (self , task_id , recursion_number = 12 ):
277
+ task_id = int (task_id ) # Ensure that it is an integer
278
+ self ._run_qstat ("checking job pending status {0}" . format ( task_id ), False )
279
+ # Check if the task is in the dictionary first (before running qstat )
280
+ if task_id in self ._task_dictionary :
278
281
# Trust the cache, only False if state='zombie'
279
- jobIsPending = self ._task_dictionary [taskId ].is_job_state_pending ()
282
+ job_is_pending = self ._task_dictionary [task_id ].is_job_state_pending ()
280
283
else :
281
- self ._run_qstat (
282
- "checking job pending status {0}" .format (taskId ), True )
283
- if taskId in self ._task_dictionary :
284
+ self ._run_qstat ("checking job pending status {0}" .format (task_id ), True )
285
+ if task_id in self ._task_dictionary :
284
286
# Trust the cache, only False if state='zombie'
285
- jobIsPending = self ._task_dictionary [
286
- taskId ].is_job_state_pending ()
287
+ job_is_pending = self ._task_dictionary [task_id ].is_job_state_pending ()
287
288
else :
288
- sge_debug_print (
289
- "ERROR: Job {0} not in task list, even after forced qstat!" .format (taskId ))
290
- jobIsPending = False
291
- if not jobIsPending :
292
- sge_debug_print (
293
- "DONE! Returning for {0} claiming done!" .format (taskId ))
294
- if taskId in self ._task_dictionary :
295
- sge_debug_print (
296
- "NOTE: Adding {0} to OutOfScopeJobs list!" .format (taskId ))
297
- self ._OutOfScopeJobs .append (int (taskId ))
298
- self ._task_dictionary .pop (taskId )
289
+ sge_debug_print ("ERROR: Job {0} not in task list, "
290
+ "even after forced qstat!" .format (task_id ))
291
+ job_is_pending = False
292
+ if not job_is_pending :
293
+ sge_debug_print ("DONE! Returning for {0} claiming done!" .format (task_id ))
294
+ if task_id in self ._task_dictionary :
295
+ sge_debug_print ("NOTE: Adding {0} to OutOfScopeJobs list!" .format (task_id ))
296
+ self ._out_of_scope_jobs .append (int (task_id ))
297
+ self ._task_dictionary .pop (task_id )
299
298
else :
300
- sge_debug_print (
301
- "ERROR: Job {0} not in task list, but attempted to be removed!" .format (taskId ))
302
- return jobIsPending
299
+ sge_debug_print ("ERROR: Job {0} not in task list, "
300
+ " but attempted to be removed!" .format (task_id ))
301
+ return job_is_pending
303
302
304
303
305
304
def qsub_sanitize_job_name (testjobname ):
@@ -338,19 +337,19 @@ def __init__(self, **kwargs):
338
337
"""
339
338
self ._retry_timeout = 2
340
339
self ._max_tries = 2
341
- instantQstat = 'qstat'
342
- cachedQstat = 'qstat'
340
+ instant_qstat = 'qstat'
341
+ cached_qstat = 'qstat'
343
342
344
343
if 'plugin_args' in kwargs and kwargs ['plugin_args' ]:
345
344
if 'retry_timeout' in kwargs ['plugin_args' ]:
346
345
self ._retry_timeout = kwargs ['plugin_args' ]['retry_timeout' ]
347
346
if 'max_tries' in kwargs ['plugin_args' ]:
348
347
self ._max_tries = kwargs ['plugin_args' ]['max_tries' ]
349
348
if 'qstatProgramPath' in kwargs ['plugin_args' ]:
350
- instantQstat = kwargs ['plugin_args' ]['qstatProgramPath' ]
349
+ instant_qstat = kwargs ['plugin_args' ]['qstatProgramPath' ]
351
350
if 'qstatCachedProgramPath' in kwargs ['plugin_args' ]:
352
- cachedQstat = kwargs ['plugin_args' ]['qstatCachedProgramPath' ]
353
- self ._refQstatSubstitute = QstatSubstitute (instantQstat , cachedQstat )
351
+ cached_qstat = kwargs ['plugin_args' ]['qstatCachedProgramPath' ]
352
+ self ._refQstatSubstitute = QstatSubstitute (instant_qstat , cached_qstat )
354
353
355
354
super (SGEPlugin , self ).__init__ (template , ** kwargs )
356
355
0 commit comments