1
- #########################################################################################
2
- # Condor.py
3
- # 10.11.2014
4
- # Author: A.T.
5
- #########################################################################################
6
-
7
1
""" Condor.py is a DIRAC independent class representing Condor batch system.
8
2
Condor objects are used as backend batch system representation for
9
3
LocalComputingElement and SSHComputingElement classes
19
13
import os
20
14
21
15
16
+ # Cannot use the PilotStatus module here as Condor is meant to be executed on a remote machine
17
+ # DIRAC might not be available
18
+ STATES_MAP = {
19
+ 1 : "Waiting" ,
20
+ 2 : "Running" ,
21
+ 3 : "Aborted" ,
22
+ 4 : "Done" ,
23
+ 5 : "Failed" ,
24
+ }
25
+
26
+ HOLD_REASON_SUBCODE = 55
27
+
28
+ subTemplate = """
29
+ # Environment
30
+ # -----------
31
+ # There exist many universe:
32
+ # https://htcondor.readthedocs.io/en/latest/users-manual/choosing-an-htcondor-universe.html
33
+ universe = %(targetUniverse)s
34
+
35
+ # Inputs/Outputs
36
+ # --------------
37
+ # Inputs: executable to submit
38
+ executable = %(executable)s
39
+
40
+ # Directory that will contain the outputs
41
+ initialdir = %(initialDir)s
42
+
43
+ # Outputs: stdout, stderr, log
44
+ output = $(Cluster).$(Process).out
45
+ error = $(Cluster).$(Process).err
46
+ log = $(Cluster).$(Process).log
47
+
48
+ # Transfer all output files, even if the job is failed
49
+ transfer_output_files = ""
50
+ should_transfer_files = YES
51
+ when_to_transfer_output = ON_EXIT_OR_EVICT
52
+
53
+ # Environment variables to pass to the job
54
+ environment = "DIRAC_PILOT_STAMP=$(stamp) %(environment)s"
55
+
56
+ # Credentials
57
+ # -----------
58
+ %(useCredentials)s
59
+
60
+ # Requirements
61
+ # ------------
62
+ request_cpus = %(processors)s
63
+
64
+ # Exit options
65
+ # ------------
66
+ # Specify the signal sent to the job when HTCondor needs to vacate the worker node
67
+ kill_sig=SIGTERM
68
+ # By default, HTCondor marked jobs as completed regardless of its status
69
+ # This option allows to mark jobs as Held if they don't finish successfully
70
+ on_exit_hold = ExitCode != 0
71
+ # A random subcode to identify who put the job on hold
72
+ on_exit_hold_subcode = %(holdReasonSubcode)s
73
+ # Jobs are then deleted from the system after N days
74
+ period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
75
+
76
+ # Specific options
77
+ # ----------------
78
+ # Local vs Remote schedd
79
+ %(scheddOptions)s
80
+ # CE-specific options
81
+ %(extraString)s
82
+
83
+
84
+ Queue stamp in %(pilotStampList)s
85
+ """
86
+
87
+
22
88
def parseCondorStatus (lines , jobID ):
23
89
"""parse the condor_q or condor_history output for the job status
24
90
25
- :param lines: list of lines from the output of the condor commands, each line is a pair of jobID and statusID
91
+ :param lines: list of lines from the output of the condor commands, each line is a pair of jobID, statusID, and holdReasonCode
26
92
:type lines: python:list
27
93
:param str jobID: jobID of condor job, e.g.: 123.53
28
- :returns: Status as known by DIRAC
94
+ :returns: Status as known by DIRAC, and a reason if the job is being held
29
95
"""
30
96
jobID = str (jobID )
31
97
for line in lines :
@@ -34,35 +100,35 @@ def parseCondorStatus(lines, jobID):
34
100
status = int (l [1 ])
35
101
except (ValueError , IndexError ):
36
102
continue
103
+ holdReason = ""
37
104
if l [0 ] == jobID :
38
- return {1 : "Waiting" , 2 : "Running" , 3 : "Aborted" , 4 : "Done" , 5 : "HELD" }.get (status , "Unknown" )
39
- return "Unknown"
40
-
41
-
42
- def treatCondorHistory (condorHistCall , qList ):
43
- """concatenate clusterID and processID to get the same output as condor_q
44
- until we can expect condor version 8.5.3 everywhere
45
-
46
- :param str condorHistCall: condor_history command to run
47
- :param qList: list of jobID and status from condor_q output, will be modified in this function
48
- :type qList: python:list
49
- :returns: None
50
- """
51
- sp = subprocess .Popen (
52
- shlex .split (condorHistCall ),
53
- stdout = subprocess .PIPE ,
54
- stderr = subprocess .PIPE ,
55
- universal_newlines = True ,
56
- )
57
- output , _ = sp .communicate ()
58
- status = sp .returncode
59
-
60
- # Join the ClusterId and the ProcId and add to existing list of statuses
61
- if status == 0 :
62
- for line in output .split ("\n " ):
63
- values = line .strip ().split ()
64
- if len (values ) == 3 :
65
- qList .append ("%s.%s %s" % tuple (values ))
105
+ # A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
106
+ # Details in:
107
+ # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
108
+
109
+ # By default, a held (5) job is defined as Aborted, but there might be some exceptions
110
+ if status == 5 :
111
+ try :
112
+ holdReasonCode = int (l [2 ])
113
+ holdReasonSubcode = int (l [3 ])
114
+ holdReason = l [4 :]
115
+ except (ValueError , IndexError ):
116
+ # This should not happen in theory
117
+ # Just set the status to unknown such as
118
+ status = - 1
119
+ holdReasonCode = - 1
120
+ holdReasonSubcode = - 1
121
+
122
+ # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
123
+ # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
124
+ if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE :
125
+ status = 5
126
+ # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
127
+ if holdReasonCode == 16 :
128
+ status = 1
129
+
130
+ return (STATES_MAP .get (status , "Unknown" ), holdReason )
131
+ return ("Unknown" , holdReason )
66
132
67
133
68
134
class Condor (object ):
@@ -96,24 +162,23 @@ def submitJob(self, **kwargs):
96
162
return resultDict
97
163
98
164
jdlFile = tempfile .NamedTemporaryFile (dir = outputDir , suffix = ".jdl" )
165
+ scheddOptions = 'requirements = OpSys == "LINUX"\n '
166
+ scheddOptions += "gentenv = False"
99
167
jdlFile .write (
100
- """
101
- Executable = %s
102
- Universe = vanilla
103
- Requirements = OpSys == "LINUX"
104
- Initialdir = %s
105
- Output = $(Cluster).$(Process).out
106
- Error = $(Cluster).$(Process).err
107
- Log = test.log
108
- Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
109
- Getenv = False
110
-
111
- request_cpus = %s
112
-
113
- Queue stamp in %s
114
-
115
- """
116
- % (executable , outputDir , numberOfProcessors , "," .join (stamps ))
168
+ subTemplate
169
+ % dict (
170
+ targetUniverse = "vanilla" ,
171
+ executable = executable ,
172
+ initialDir = outputDir ,
173
+ environment = "CONDOR_JOBID=$(Cluster).$(Process)" ,
174
+ useCredentials = "" ,
175
+ processors = numberOfProcessors ,
176
+ holdReasonSubcode = HOLD_REASON_SUBCODE ,
177
+ daysToKeepRemoteLogs = 1 ,
178
+ scheddOptions = "" ,
179
+ extraString = "" ,
180
+ pilotStampList = "," .join (stamps ),
181
+ )
117
182
)
118
183
119
184
jdlFile .flush ()
@@ -233,7 +298,7 @@ def getJobStatus(self, **kwargs):
233
298
resultDict ["Message" ] = "No user name"
234
299
return resultDict
235
300
236
- cmd = "condor_q -submitter %s -af:j JobStatus" % user
301
+ cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason " % user
237
302
sp = subprocess .Popen (
238
303
shlex .split (cmd ),
239
304
stdout = subprocess .PIPE ,
@@ -250,19 +315,26 @@ def getJobStatus(self, **kwargs):
250
315
251
316
qList = output .strip ().split ("\n " )
252
317
253
- # FIXME: condor_history does only support j for autoformat from 8.5.3,
254
- # format adds whitespace for each field This will return a list of 1245 75 3
255
- # needs to cocatenate the first two with a dot
256
- condorHistCall = "condor_history -af ClusterId ProcId JobStatus -submitter %s" % user
257
- treatCondorHistory (condorHistCall , qList )
318
+ condorHistCall = (
319
+ "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
320
+ )
321
+ sp = subprocess .Popen (
322
+ shlex .split (condorHistCall ),
323
+ stdout = subprocess .PIPE ,
324
+ stderr = subprocess .PIPE ,
325
+ universal_newlines = True ,
326
+ )
327
+ output , _ = sp .communicate ()
328
+ status = sp .returncode
329
+ if status == 0 :
330
+ for line in output .split ("\n " ):
331
+ qList .append (line )
258
332
259
333
statusDict = {}
260
334
if len (qList ):
261
335
for job in jobIDList :
262
336
job = str (job )
263
- statusDict [job ] = parseCondorStatus (qList , job )
264
- if statusDict [job ] == "HELD" :
265
- statusDict [job ] = "Unknown"
337
+ statusDict [job ], _ = parseCondorStatus (qList , job )
266
338
267
339
# Final output
268
340
status = 0
0 commit comments