6
6
from __future__ import print_function
7
7
from __future__ import absolute_import
8
8
from __future__ import division
9
+ import json
9
10
import re
10
11
import tempfile
11
12
import subprocess
87
88
"""
88
89
89
90
90
- def parseCondorStatus ( lines , jobID ):
91
+ def getCondorStatus ( jobMetadata ):
91
92
"""parse the condor_q or condor_history output for the job status
92
93
93
- :param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode
94
- :type lines: python:list
95
- :param str jobID: jobID of condor job, e.g.: 123.53
94
+ :param jobMetadata: dict with job metadata
95
+ :type lines: dict[str, str | int]
96
96
:returns: Status as known by DIRAC, and a reason if the job is being held
97
97
"""
98
- jobID = str (jobID )
99
-
100
- holdReason = ""
101
- status = None
102
- for line in lines :
103
- l = line .strip ().split ()
104
-
105
- # Make sure the job ID exists
106
- if len (l ) < 1 or l [0 ] != jobID :
107
- continue
108
-
109
- # Make sure the status is present and is an integer
110
- try :
111
- status = int (l [1 ])
112
- except (ValueError , IndexError ):
113
- break
114
-
115
- # Stop here if the status is not held (5): result should be found in STATES_MAP
116
- if status != 5 :
117
- break
118
-
119
- # A job can be held for various reasons,
120
- # we need to further investigate with the holdReasonCode & holdReasonSubCode
121
- # Details in:
122
- # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
123
-
124
- # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
125
- status = 3
126
- try :
127
- holdReasonCode = l [2 ]
128
- holdReasonSubcode = l [3 ]
129
- holdReason = " " .join (l [4 :])
130
- except IndexError :
131
- # This should not happen in theory
132
- # Just set the status to unknown such as
133
- status = None
134
- holdReasonCode = "undefined"
135
- holdReasonSubcode = "undefined"
136
- break
137
-
138
- # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
139
- # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
140
- if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE :
141
- status = 5
142
- # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
143
- elif holdReasonCode == "16" :
144
- status = 1
145
-
146
- return (STATES_MAP .get (status , "Unknown" ), holdReason )
98
+ if jobMetadata ["JobStatus" ] != 5 :
99
+ # If the job is not held, we can return the status directly
100
+ return (STATES_MAP .get (jobMetadata ["JobStatus" ], "Unknown" ), "" )
101
+
102
+ # A job can be held for various reasons,
103
+ # we need to further investigate with the holdReasonCode & holdReasonSubCode
104
+ # Details in:
105
+ # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
106
+
107
+ # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
108
+ status = 3
109
+
110
+ # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
111
+ # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
112
+ if jobMetadata ["HoldReasonCode" ] == 3 and jobMetadata ["HoldReasonSubCode" ] == HOLD_REASON_SUBCODE :
113
+ status = 5
114
+ # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
115
+ elif jobMetadata ["HoldReasonCode" ] == 16 :
116
+ status = 1
117
+
118
+ return (STATES_MAP .get (status , "Unknown" ), jobMetadata ["HoldReason" ])
147
119
148
120
149
121
class Condor (object ):
@@ -283,7 +255,6 @@ def killJob(self, **kwargs):
283
255
284
256
def getJobStatus (self , ** kwargs ):
285
257
"""Get status of the jobs in the given list"""
286
-
287
258
resultDict = {}
288
259
289
260
MANDATORY_PARAMETERS = ["JobIDList" ]
@@ -299,15 +270,12 @@ def getJobStatus(self, **kwargs):
299
270
resultDict ["Message" ] = "Empty job list"
300
271
return resultDict
301
272
302
- user = kwargs .get ("User" )
303
- if not user :
304
- user = os .environ .get ("USER" )
305
- if not user :
306
- resultDict ["Status" ] = - 1
307
- resultDict ["Message" ] = "No user name"
308
- return resultDict
273
+ # Prepare the command to get the status of the jobs
274
+ cmdJobs = " " .join (str (jobID ) for jobID in jobIDList )
275
+ attributes = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason"
309
276
310
- cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user
277
+ # Get the status of the jobs currently active
278
+ cmd = "condor_q %s -attributes %s -json" % (cmdJobs , attributes )
311
279
sp = subprocess .Popen (
312
280
shlex .split (cmd ),
313
281
stdout = subprocess .PIPE ,
@@ -317,16 +285,15 @@ def getJobStatus(self, **kwargs):
317
285
output , error = sp .communicate ()
318
286
status = sp .returncode
319
287
320
- if status != 0 :
288
+ if status != 0 or not output :
321
289
resultDict ["Status" ] = status
322
290
resultDict ["Message" ] = error
323
291
return resultDict
324
292
325
- qList = output . strip (). split ( " \n " )
293
+ jobMetadata = json . loads ( output )
326
294
327
- condorHistCall = (
328
- "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
329
- )
295
+ # Get the status of the jobs in the history
296
+ condorHistCall = "condor_history %s -attributes %s -json" % (cmdJobs , attributes )
330
297
sp = subprocess .Popen (
331
298
shlex .split (condorHistCall ),
332
299
stdout = subprocess .PIPE ,
@@ -335,15 +302,26 @@ def getJobStatus(self, **kwargs):
335
302
)
336
303
output , _ = sp .communicate ()
337
304
status = sp .returncode
338
- if status == 0 :
339
- for line in output .split ("\n " ):
340
- qList .append (line )
305
+
306
+ if status != 0 or not output :
307
+ resultDict ["Status" ] = status
308
+ resultDict ["Message" ] = error
309
+ return resultDict
310
+
311
+ jobMetadata += json .loads (output )
341
312
342
313
statusDict = {}
343
- if len (qList ):
344
- for job in jobIDList :
345
- job = str (job )
346
- statusDict [job ], _ = parseCondorStatus (qList , job )
314
+ # Build a set of job IDs found in jobMetadata
315
+ foundJobIDs = set ()
316
+ for jobDict in jobMetadata :
317
+ jobID = "%s.%s" % (jobDict ["ClusterId" ], jobDict ["ProcId" ])
318
+ statusDict [jobID ], _ = getCondorStatus (jobDict )
319
+ foundJobIDs .add (jobID )
320
+
321
+ # For job IDs not found, set status to "Unknown"
322
+ for jobID in jobIDList :
323
+ if str (jobID ) not in foundJobIDs :
324
+ statusDict [str (jobID )] = "Unknown"
347
325
348
326
# Final output
349
327
status = 0
@@ -355,19 +333,30 @@ def getCEStatus(self, **kwargs):
355
333
"""Get the overall status of the CE"""
356
334
resultDict = {}
357
335
358
- user = kwargs .get ("User" )
359
- if not user :
360
- user = os .environ .get ("USER" )
361
- if not user :
336
+ cmd = "condor_q -totals -json"
337
+ sp = subprocess .Popen (
338
+ shlex .split (cmd ),
339
+ stdout = subprocess .PIPE ,
340
+ stderr = subprocess .PIPE ,
341
+ universal_newlines = True ,
342
+ )
343
+ output , error = sp .communicate ()
344
+ status = sp .returncode
345
+
346
+ if status != 0 or not output :
362
347
resultDict ["Status" ] = - 1
363
- resultDict ["Message" ] = "No user name"
348
+ resultDict ["Message" ] = error
364
349
return resultDict
365
350
366
- waitingJobs = 0
367
- runningJobs = 0
351
+ jresult = json .loads (output )
352
+ resultDict ["Status" ] = 0
353
+ resultDict ["Waiting" ] = jresult [0 ]["Idle" ]
354
+ resultDict ["Running" ] = jresult [0 ]["Running" ]
368
355
356
+ # We also need to check the hold jobs, some of them are actually waiting (e.g. for input files)
357
+ cmd = 'condor_q -json -constraint "JobStatus == 5" -attributes HoldReasonCode'
369
358
sp = subprocess .Popen (
370
- shlex .split ("condor_q -submitter %s" % user ),
359
+ shlex .split (cmd ),
371
360
stdout = subprocess .PIPE ,
372
361
stderr = subprocess .PIPE ,
373
362
universal_newlines = True ,
@@ -376,33 +365,17 @@ def getCEStatus(self, **kwargs):
376
365
status = sp .returncode
377
366
378
367
if status != 0 :
379
- if "no record" in output :
380
- resultDict ["Status" ] = 0
381
- resultDict ["Waiting" ] = waitingJobs
382
- resultDict ["Running" ] = runningJobs
383
- return resultDict
384
- resultDict ["Status" ] = status
368
+ resultDict ["Status" ] = - 1
385
369
resultDict ["Message" ] = error
386
370
return resultDict
387
371
388
- if "no record" in output :
389
- resultDict ["Status" ] = 0
390
- resultDict ["Waiting" ] = waitingJobs
391
- resultDict ["Running" ] = runningJobs
372
+ # If there are no held jobs, we can return the result
373
+ if not output :
392
374
return resultDict
393
375
394
- if output :
395
- lines = output .split ("\n " )
396
- for line in lines :
397
- if not line .strip ():
398
- continue
399
- if " I " in line :
400
- waitingJobs += 1
401
- elif " R " in line :
402
- runningJobs += 1
376
+ jresult = json .loads (output )
377
+ for job_metadata in jresult :
378
+ if job_metadata ["HoldReasonCode" ] == 16 :
379
+ resultDict ["Waiting" ] += 1
403
380
404
- # Final output
405
- resultDict ["Status" ] = 0
406
- resultDict ["Waiting" ] = waitingJobs
407
- resultDict ["Running" ] = runningJobs
408
381
return resultDict
0 commit comments