Skip to content

Commit 142b82f

Browse files
committed
fix: HTCondor tests + a bit of refactoring
1 parent 2627bea commit 142b82f

File tree

3 files changed

+120
-108
lines changed

3 files changed

+120
-108
lines changed

src/DIRAC/Resources/Computing/BatchSystems/Condor.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@
7070
on_exit_hold = ExitCode != 0
7171
# A random subcode to identify who put the job on hold
7272
on_exit_hold_subcode = %(holdReasonSubcode)s
73-
# Jobs are then deleted from the system after N days
74-
period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
73+
# Jobs are then deleted from the system after N days if they are not running
74+
period_remove = (JobStatus != 2) && (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
7575
7676
# Specific options
7777
# ----------------
@@ -105,9 +105,10 @@ def parseCondorStatus(lines, jobID):
105105
# A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
106106
# Details in:
107107
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode
108-
109-
# By default, a held (5) job is defined as Aborted, but there might be some exceptions
110108
if status == 5:
109+
110+
# By default, a held (5) job is defined as Aborted, but there might be some exceptions
111+
status = 3
111112
try:
112113
holdReasonCode = int(l[2])
113114
holdReasonSubcode = int(l[3])
@@ -124,7 +125,7 @@ def parseCondorStatus(lines, jobID):
124125
if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE:
125126
status = 5
126127
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
127-
if holdReasonCode == 16:
128+
elif holdReasonCode == 16:
128129
status = 1
129130

130131
return (STATES_MAP.get(status, "Unknown"), holdReason)

src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py

Lines changed: 80 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -71,50 +71,6 @@
7171
DEFAULT_DAYSTOKEEPLOGS = 15
7272

7373

74-
def logDir(ceName, stamp):
75-
"""Return path to log and output files for pilot.
76-
77-
:param str ceName: Name of the CE
78-
:param str stamp: pilot stamp from/for jobRef
79-
"""
80-
return os.path.join(ceName, stamp[0], stamp[1:3])
81-
82-
83-
def condorIDAndPathToResultFromJobRef(jobRef):
84-
"""Extract tuple of jobURL and jobID from the jobRef string.
85-
The condorID as well as the path leading to the job results are also extracted from the jobID.
86-
87-
:param str jobRef: PilotJobReference of the following form: ``htcondorce://<ceName>/<condorID>:::<pilotStamp>``
88-
89-
:return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef
90-
"""
91-
splits = jobRef.split(":::")
92-
jobURL = splits[0]
93-
stamp = splits[1] if len(splits) > 1 else ""
94-
_, _, ceName, condorID = jobURL.split("/")
95-
96-
# Reconstruct the path leading to the result (log, output)
97-
# Construction of the path can be found in submitJob()
98-
pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else ""
99-
100-
return jobURL, pathToResult, condorID
101-
102-
103-
def findFile(workingDir, fileName, pathToResult):
104-
"""Find a file in a file system.
105-
106-
:param str workingDir: the name of the directory containing the given file to search for
107-
:param str fileName: the name of the file to find
108-
:param str pathToResult: the path to follow from workingDir to find the file
109-
110-
:return: path leading to the file
111-
"""
112-
path = os.path.join(workingDir, pathToResult, fileName)
113-
if os.path.exists(path):
114-
return S_OK(path)
115-
return S_ERROR(errno.ENOENT, f"Could not find {path}")
116-
117-
11874
class HTCondorCEComputingElement(ComputingElement):
11975
"""HTCondorCE computing element class
12076
implementing the functions jobSubmit, getJobOutput
@@ -152,6 +108,45 @@ def __init__(self, ceUniqueID):
152108
self.tokenFile = None
153109

154110
#############################################################################
111+
112+
def _DiracToCondorID(self, diracJobID):
113+
"""Convert a DIRAC jobID into an Condor jobID.
114+
Example: https://<ce>/1234/0 becomes 1234.0
115+
116+
:param str: DIRAC jobID
117+
:return: Condor jobID
118+
"""
119+
# Remove CE and protocol information from arc Job ID
120+
if "://" in diracJobID:
121+
condorJobID = diracJobID.split("/")[-1]
122+
return condorJobID
123+
return diracJobID
124+
125+
def _condorToDiracID(self, condorJobIDs):
126+
"""Get the references from the condor_submit output.
127+
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
128+
129+
:param str condorJobIDs: the output of condor_submit
130+
131+
:return: job references such as htcondorce://<CE name>/<clusterID>.<i>
132+
"""
133+
clusterIDs = condorJobIDs.split("-")
134+
if len(clusterIDs) != 2:
135+
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")
136+
clusterIDs = [clu.strip() for clu in clusterIDs]
137+
self.log.verbose("Cluster IDs parsed:", clusterIDs)
138+
try:
139+
clusterID = clusterIDs[0].split(".")[0]
140+
numJobs = clusterIDs[1].split(".")[1]
141+
except IndexError:
142+
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")
143+
144+
cePrefix = f"htcondorce://{self.ceName}/"
145+
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
146+
return S_OK(jobReferences)
147+
148+
#############################################################################
149+
155150
def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None):
156151
"""Create the Sub File for submission.
157152
@@ -188,7 +183,7 @@ def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=No
188183
scheddOptions = ""
189184
if self.useLocalSchedd:
190185
targetUniverse = "grid"
191-
scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:9619"
186+
scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:{self.port}"
192187

193188
sub = subTemplate % dict(
194189
targetUniverse=targetUniverse,
@@ -301,7 +296,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
301296
jobStamps.append(jobStamp)
302297

303298
# We randomize the location of the pilot output and log, because there are just too many of them
304-
location = logDir(self.ceName, commonJobStampPart)
299+
location = os.path.join(self.ceName, commonJobStampPart[0], commonJobStampPart[1:3])
305300
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
306301
if self.token:
307302
self.tokenFile = tempfile.NamedTemporaryFile(
@@ -324,7 +319,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
324319
return result
325320

326321
stdout = result["Value"]
327-
pilotJobReferences = self.__getPilotReferences(stdout)
322+
pilotJobReferences = self._condorToDiracID(stdout)
328323
if not pilotJobReferences["OK"]:
329324
return pilotJobReferences
330325
pilotJobReferences = pilotJobReferences["Value"]
@@ -352,15 +347,15 @@ def killJob(self, jobIDList):
352347
self.log.verbose("KillJob jobIDList", jobIDList)
353348
self.tokenFile = None
354349

355-
for jobRef in jobIDList:
356-
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
357-
self.log.verbose("Killing pilot", job)
350+
for diracJobID in jobIDList:
351+
condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0])
352+
self.log.verbose("Killing pilot", diracJobID)
358353
cmd = ["condor_rm"]
359354
cmd.extend(self.remoteScheddOptions.strip().split(" "))
360-
cmd.append(jobID)
355+
cmd.append(condorJobID)
361356
result = self._executeCondorCommand(cmd, keepTokenFile=True)
362357
if not result["OK"]:
363-
self.log.error("Failed to kill pilot", f"{job}: {result['Message']}")
358+
self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}")
364359
return result
365360

366361
self.tokenFile = None
@@ -409,9 +404,10 @@ def getJobStatus(self, jobIDList):
409404
resultDict = {}
410405
condorIDs = {}
411406
# Get all condorIDs so we can just call condor_q and condor_history once
412-
for jobRef in jobIDList:
413-
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
414-
condorIDs[job] = jobID
407+
for diracJobID in jobIDList:
408+
diracJobID = diracJobID.split(":::")[0]
409+
condorJobID = self._DiracToCondorID(diracJobID)
410+
condorIDs[diracJobID] = condorJobID
415411

416412
self.tokenFile = None
417413

@@ -484,13 +480,39 @@ def getJobOutput(self, jobID):
484480

485481
return S_OK((result["Value"]["output"], result["Value"]["error"]))
486482

483+
def _findFile(self, workingDir, fileName, pathToResult):
484+
"""Find a file in a file system.
485+
486+
:param str workingDir: the name of the directory containing the given file to search for
487+
:param str fileName: the name of the file to find
488+
:param str pathToResult: the path to follow from workingDir to find the file
489+
490+
:return: path leading to the file
491+
"""
492+
path = os.path.join(workingDir, pathToResult, fileName)
493+
if os.path.exists(path):
494+
return S_OK(path)
495+
return S_ERROR(errno.ENOENT, f"Could not find {path}")
496+
487497
def __getJobOutput(self, jobID, outTypes):
488498
"""Get job outputs: output, error and logging files from HTCondor
489499
490500
:param str jobID: job identifier
491501
:param list outTypes: output types targeted (output, error and/or logging)
492502
"""
493-
_job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID)
503+
# Extract stamp from the Job ID
504+
if ":::" in jobID:
505+
diracJobID, stamp = jobID.split(":::")
506+
else:
507+
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
508+
509+
# Reconstruct the path leading to the result (log, output)
510+
# Construction of the path can be found in submitJob()
511+
if len(stamp) < 3:
512+
return S_ERROR(f"Stamp is not long enough: {stamp}")
513+
pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3])
514+
515+
condorJobID = self._DiracToCondorID(diracJobID)
494516
iwd = os.path.join(self.workingDirectory, pathToResult)
495517

496518
try:
@@ -501,7 +523,7 @@ def __getJobOutput(self, jobID, outTypes):
501523
return S_ERROR(e.errno, f"{errorMessage} ({iwd})")
502524

503525
if not self.useLocalSchedd:
504-
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorID]
526+
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorJobID]
505527
result = self._executeCondorCommand(cmd)
506528

507529
# Getting 'logging' without 'error' and 'output' is possible but will generate command errors
@@ -513,7 +535,7 @@ def __getJobOutput(self, jobID, outTypes):
513535
outputsSuffix = {"output": "out", "error": "err", "logging": "log"}
514536
outputs = {}
515537
for output, suffix in outputsSuffix.items():
516-
resOut = findFile(self.workingDirectory, f"{condorID}.{suffix}", pathToResult)
538+
resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult)
517539
if not resOut["OK"]:
518540
# Return an error if the output type was targeted, else we continue
519541
if output in outTypes:
@@ -536,29 +558,6 @@ def __getJobOutput(self, jobID, outTypes):
536558

537559
return S_OK(outputs)
538560

539-
def __getPilotReferences(self, jobString):
540-
"""Get the references from the condor_submit output.
541-
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
542-
543-
:param str jobString: the output of condor_submit
544-
545-
:return: job references such as htcondorce://<CE name>/<path to result>-<clusterID>.<i>
546-
"""
547-
self.log.verbose("getPilotReferences:", jobString)
548-
clusterIDs = jobString.split("-")
549-
if len(clusterIDs) != 2:
550-
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
551-
clusterIDs = [clu.strip() for clu in clusterIDs]
552-
self.log.verbose("Cluster IDs parsed:", clusterIDs)
553-
try:
554-
clusterID = clusterIDs[0].split(".")[0]
555-
numJobs = clusterIDs[1].split(".")[1]
556-
except IndexError:
557-
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
558-
cePrefix = f"htcondorce://{self.ceName}/"
559-
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
560-
return S_OK(jobReferences)
561-
562561
def __cleanup(self):
563562
"""Clean the working directory of old jobs"""
564563
if not HTCondorCEComputingElement._cleanupLock.acquire(False):

0 commit comments

Comments
 (0)