6
6
from __future__ import print_function
7
7
from __future__ import absolute_import
8
8
from __future__ import division
9
+ import json
9
10
import re
10
11
import tempfile
11
12
import subprocess
25
26
26
27
HOLD_REASON_SUBCODE = "55"
27
28
29
+ STATE_ATTRIBUTES = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason"
30
+
28
31
subTemplate = """
29
32
# Environment
30
33
# -----------
62
65
# Requirements
63
66
# ------------
64
67
request_cpus = %(processors)s
68
+ requirements = NumJobStarts == 0
65
69
66
70
# Exit options
67
71
# ------------
73
77
# A subcode of our choice to identify who put the job on hold
74
78
on_exit_hold_subcode = %(holdReasonSubcode)s
75
79
# Jobs are then deleted from the system after N days if they are not idle or running
76
- periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))
80
+ periodic_remove = ((JobStatus == 1) && (NumJobStarts > 0)) || \
81
+ ((JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600))
77
82
78
83
# Specific options
79
84
# ----------------
87
92
"""
88
93
89
94
90
- def parseCondorStatus ( lines , jobID ):
95
+ def getCondorStatus ( jobMetadata ):
91
96
"""parse the condor_q or condor_history output for the job status
92
97
93
- :param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode
94
- :type lines: python:list
95
- :param str jobID: jobID of condor job, e.g.: 123.53
98
+ :param jobMetadata: dict with job metadata
99
+ :type jobMetadata: dict[str, str | int]
96
100
:returns: Status as known by DIRAC, and a reason if the job is being held
97
101
"""
98
- jobID = str (jobID )
99
-
100
- holdReason = ""
101
- status = None
102
- for line in lines :
103
- l = line .strip ().split ()
104
-
105
- # Make sure the job ID exists
106
- if len (l ) < 1 or l [0 ] != jobID :
107
- continue
108
-
109
- # Make sure the status is present and is an integer
110
- try :
111
- status = int (l [1 ])
112
- except (ValueError , IndexError ):
113
- break
114
-
115
- # Stop here if the status is not held (5): result should be found in STATES_MAP
116
- if status != 5 :
117
- break
118
-
119
- # A job can be held for various reasons,
120
- # we need to further investigate with the holdReasonCode & holdReasonSubCode
121
- # Details in:
122
- # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
123
-
124
- # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
125
- status = 3
126
- try :
127
- holdReasonCode = l [2 ]
128
- holdReasonSubcode = l [3 ]
129
- holdReason = " " .join (l [4 :])
130
- except IndexError :
131
- # This should not happen in theory
132
- # Just set the status to unknown such as
133
- status = None
134
- holdReasonCode = "undefined"
135
- holdReasonSubcode = "undefined"
136
- break
137
-
138
- # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
139
- # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
140
- if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE :
141
- status = 5
142
- # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
143
- elif holdReasonCode == "16" :
144
- status = 1
145
-
146
- return (STATES_MAP .get (status , "Unknown" ), holdReason )
102
+ if jobMetadata ["JobStatus" ] != 5 :
103
+ # If the job is not held, we can return the status directly
104
+ return (STATES_MAP .get (jobMetadata ["JobStatus" ], "Unknown" ), "" )
105
+
106
+ # A job can be held for various reasons,
107
+ # we need to further investigate with the holdReasonCode & holdReasonSubCode
108
+ # Details in:
109
+ # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
110
+
111
+ # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions
112
+ status = 3
113
+
114
+ # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
115
+ # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
116
+ if jobMetadata ["HoldReasonCode" ] == 3 and jobMetadata ["HoldReasonSubCode" ] == HOLD_REASON_SUBCODE :
117
+ status = 5
118
+ # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
119
+ elif jobMetadata ["HoldReasonCode" ] == 16 :
120
+ status = 1
121
+
122
+ return (STATES_MAP .get (status , "Unknown" ), jobMetadata ["HoldReason" ])
147
123
148
124
149
125
class Condor (object ):
@@ -171,8 +147,6 @@ def submitJob(self, **kwargs):
171
147
preamble = kwargs .get ("Preamble" )
172
148
173
149
jdlFile = tempfile .NamedTemporaryFile (dir = outputDir , suffix = ".jdl" , mode = "wt" )
174
- scheddOptions = 'requirements = OpSys == "LINUX"\n '
175
- scheddOptions += "gentenv = False"
176
150
jdlFile .write (
177
151
subTemplate
178
152
% dict (
@@ -185,15 +159,15 @@ def submitJob(self, **kwargs):
185
159
holdReasonSubcode = HOLD_REASON_SUBCODE ,
186
160
daysToKeepRemoteLogs = 1 ,
187
161
scheddOptions = "" ,
188
- extraString = "" ,
162
+ extraString = submitOptions ,
189
163
pilotStampList = "," .join (stamps ),
190
164
)
191
165
)
192
166
193
167
jdlFile .flush ()
194
168
195
169
cmd = "%s; " % preamble if preamble else ""
196
- cmd += "condor_submit %s %s " % ( submitOptions , jdlFile .name )
170
+ cmd += "condor_submit %s" % jdlFile .name
197
171
sp = subprocess .Popen (
198
172
cmd ,
199
173
shell = True ,
@@ -283,7 +257,6 @@ def killJob(self, **kwargs):
283
257
284
258
def getJobStatus (self , ** kwargs ):
285
259
"""Get status of the jobs in the given list"""
286
-
287
260
resultDict = {}
288
261
289
262
MANDATORY_PARAMETERS = ["JobIDList" ]
@@ -299,15 +272,11 @@ def getJobStatus(self, **kwargs):
299
272
resultDict ["Message" ] = "Empty job list"
300
273
return resultDict
301
274
302
- user = kwargs .get ("User" )
303
- if not user :
304
- user = os .environ .get ("USER" )
305
- if not user :
306
- resultDict ["Status" ] = - 1
307
- resultDict ["Message" ] = "No user name"
308
- return resultDict
275
+ # Prepare the command to get the status of the jobs
276
+ cmdJobs = " " .join (str (jobID ) for jobID in jobIDList )
309
277
310
- cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user
278
+ # Get the status of the jobs currently active
279
+ cmd = "condor_q %s -attributes %s -json" % (cmdJobs , STATE_ATTRIBUTES )
311
280
sp = subprocess .Popen (
312
281
shlex .split (cmd ),
313
282
stdout = subprocess .PIPE ,
@@ -317,16 +286,15 @@ def getJobStatus(self, **kwargs):
317
286
output , error = sp .communicate ()
318
287
status = sp .returncode
319
288
320
- if status != 0 :
289
+ if status != 0 or not output :
321
290
resultDict ["Status" ] = status
322
291
resultDict ["Message" ] = error
323
292
return resultDict
324
293
325
- qList = output . strip (). split ( " \n " )
294
+ jobsMetadata = json . loads ( output )
326
295
327
- condorHistCall = (
328
- "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
329
- )
296
+ # Get the status of the jobs in the history
297
+ condorHistCall = "condor_history %s -attributes %s -json" % (cmdJobs , STATE_ATTRIBUTES )
330
298
sp = subprocess .Popen (
331
299
shlex .split (condorHistCall ),
332
300
stdout = subprocess .PIPE ,
@@ -335,15 +303,26 @@ def getJobStatus(self, **kwargs):
335
303
)
336
304
output , _ = sp .communicate ()
337
305
status = sp .returncode
338
- if status == 0 :
339
- for line in output .split ("\n " ):
340
- qList .append (line )
306
+
307
+ if status != 0 or not output :
308
+ resultDict ["Status" ] = status
309
+ resultDict ["Message" ] = error
310
+ return resultDict
311
+
312
+ jobsMetadata += json .loads (output )
341
313
342
314
statusDict = {}
343
- if len (qList ):
344
- for job in jobIDList :
345
- job = str (job )
346
- statusDict [job ], _ = parseCondorStatus (qList , job )
315
+ # Build a set of job IDs found in jobsMetadata
316
+ foundJobIDs = set ()
317
+ for jobDict in jobsMetadata :
318
+ jobID = "%s.%s" % (jobDict ["ClusterId" ], jobDict ["ProcId" ])
319
+ statusDict [jobID ], _ = getCondorStatus (jobDict )
320
+ foundJobIDs .add (jobID )
321
+
322
+ # For job IDs not found, set status to "Unknown"
323
+ for jobID in jobIDList :
324
+ if str (jobID ) not in foundJobIDs :
325
+ statusDict [str (jobID )] = "Unknown"
347
326
348
327
# Final output
349
328
status = 0
@@ -355,19 +334,30 @@ def getCEStatus(self, **kwargs):
355
334
"""Get the overall status of the CE"""
356
335
resultDict = {}
357
336
358
- user = kwargs .get ("User" )
359
- if not user :
360
- user = os .environ .get ("USER" )
361
- if not user :
337
+ cmd = "condor_q -totals -json"
338
+ sp = subprocess .Popen (
339
+ shlex .split (cmd ),
340
+ stdout = subprocess .PIPE ,
341
+ stderr = subprocess .PIPE ,
342
+ universal_newlines = True ,
343
+ )
344
+ output , error = sp .communicate ()
345
+ status = sp .returncode
346
+
347
+ if status != 0 or not output :
362
348
resultDict ["Status" ] = - 1
363
- resultDict ["Message" ] = "No user name"
349
+ resultDict ["Message" ] = error
364
350
return resultDict
365
351
366
- waitingJobs = 0
367
- runningJobs = 0
352
+ jresult = json .loads (output )
353
+ resultDict ["Status" ] = 0
354
+ resultDict ["Waiting" ] = jresult [0 ]["Idle" ]
355
+ resultDict ["Running" ] = jresult [0 ]["Running" ]
368
356
357
+ # We also need to check the hold jobs, some of them are actually waiting (e.g. for input files)
358
+ cmd = 'condor_q -json -constraint "JobStatus == 5" -attributes HoldReasonCode'
369
359
sp = subprocess .Popen (
370
- shlex .split ("condor_q -submitter %s" % user ),
360
+ shlex .split (cmd ),
371
361
stdout = subprocess .PIPE ,
372
362
stderr = subprocess .PIPE ,
373
363
universal_newlines = True ,
@@ -376,33 +366,42 @@ def getCEStatus(self, **kwargs):
376
366
status = sp .returncode
377
367
378
368
if status != 0 :
379
- if "no record" in output :
380
- resultDict ["Status" ] = 0
381
- resultDict ["Waiting" ] = waitingJobs
382
- resultDict ["Running" ] = runningJobs
383
- return resultDict
384
- resultDict ["Status" ] = status
369
+ resultDict ["Status" ] = - 1
385
370
resultDict ["Message" ] = error
386
371
return resultDict
387
372
388
- if "no record" in output :
389
- resultDict ["Status" ] = 0
390
- resultDict ["Waiting" ] = waitingJobs
391
- resultDict ["Running" ] = runningJobs
373
+ # If there are no held jobs, we can return the result
374
+ if not output :
392
375
return resultDict
393
376
394
- if output :
395
- lines = output .split ("\n " )
396
- for line in lines :
397
- if not line .strip ():
398
- continue
399
- if " I " in line :
400
- waitingJobs += 1
401
- elif " R " in line :
402
- runningJobs += 1
377
+ jresult = json .loads (output )
378
+ for job_metadata in jresult :
379
+ if job_metadata ["HoldReasonCode" ] == 16 :
380
+ resultDict ["Waiting" ] += 1
381
+
382
+ return resultDict
383
+
384
+ def getJobOutputFiles (self , ** kwargs ):
385
+ """Get output file names and templates for the specific CE"""
386
+ resultDict = {}
387
+
388
+ MANDATORY_PARAMETERS = ["JobIDList" , "OutputDir" , "ErrorDir" ]
389
+ for argument in MANDATORY_PARAMETERS :
390
+ if argument not in kwargs :
391
+ resultDict ["Status" ] = - 1
392
+ resultDict ["Message" ] = "No %s" % argument
393
+ return resultDict
394
+
395
+ outputDir = kwargs ["OutputDir" ]
396
+ errorDir = kwargs ["ErrorDir" ]
397
+ jobIDList = kwargs ["JobIDList" ]
398
+
399
+ jobDict = {}
400
+ for jobID in jobIDList :
401
+ jobDict [jobID ] = {}
402
+ jobDict [jobID ]["Output" ] = "%s/%s.out" % (outputDir , jobID )
403
+ jobDict [jobID ]["Error" ] = "%s/%s.err" % (errorDir , jobID )
403
404
404
- # Final output
405
405
resultDict ["Status" ] = 0
406
- resultDict ["Waiting" ] = waitingJobs
407
- resultDict ["Running" ] = runningJobs
406
+ resultDict ["Jobs" ] = jobDict
408
407
return resultDict
0 commit comments