Skip to content

Commit a04d867

Browse files
authored
Merge pull request #7069 from aldbr/rel-v8r0_FIX_HTCondorCEStatus
[8.0] Getting more details about failed/aborted pilots from HTCondor
2 parents d761887 + b5afceb commit a04d867

File tree

4 files changed

+375
-351
lines changed

4 files changed

+375
-351
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
Port added to the CE host name to interact with AREX services.
1616
1717
ProxyTimeLeftBeforeRenewal:
18-
Time in seconds before the AREXCE renews proxy of submitted pilots.
18+
Time in seconds before the AREXCE renews proxy of submitted payloads.
1919
2020
RESTVersion:
2121
Version of the REST interface to use.
@@ -105,34 +105,33 @@ def setToken(self, token, valid):
105105
super().setToken(token, valid)
106106
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
107107

108-
def _arcToDiracID(self, arcJobID):
109-
"""Convert an ARC jobID into a DIRAC jobID.
108+
def _arcIDToJobReference(self, arcJobID):
109+
"""Convert an ARC jobID into a job reference.
110110
Example: 1234 becomes https://<ce>:<port>/arex/1234
111111
112112
:param str: ARC jobID
113-
:return: DIRAC jobID
113+
:return: job reference, defined as an ARC jobID with additional details
114114
"""
115115
# Add CE and protocol information to arc Job ID
116116
if "://" in arcJobID:
117117
self.log.warn("Identifier already in ARC format", arcJobID)
118118
return arcJobID
119119

120-
diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID
121-
return diracJobID
120+
return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}"
122121

123-
def _DiracToArcID(self, diracJobID):
124-
"""Convert a DIRAC jobID into an ARC jobID.
122+
def _jobReferenceToArcID(self, jobReference):
123+
"""Convert a job reference into an ARC jobID.
125124
Example: https://<ce>:<port>/arex/1234 becomes 1234
126125
127-
:param str: DIRAC jobID
126+
:param str: job reference, defined as an ARC jobID with additional details
128127
:return: ARC jobID
129128
"""
130129
# Remove CE and protocol information from arc Job ID
131-
if "://" in diracJobID:
132-
arcJobID = diracJobID.split("arex/")[-1]
130+
if "://" in jobReference:
131+
arcJobID = jobReference.split("arex/")[-1]
133132
return arcJobID
134-
self.log.warn("Identifier already in REST format?", diracJobID)
135-
return diracJobID
133+
self.log.warn("Identifier already in REST format?", jobReference)
134+
return jobReference
136135

137136
#############################################################################
138137

@@ -494,12 +493,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
494493
if not result["OK"]:
495494
break
496495

497-
jobID = self._arcToDiracID(arcJobID)
498-
batchIDList.append(jobID)
499-
stampDict[jobID] = diracStamp
496+
jobReference = self._arcIDToJobReference(arcJobID)
497+
batchIDList.append(jobReference)
498+
stampDict[jobReference] = diracStamp
500499
self.log.debug(
501500
"Successfully submitted job",
502-
f"{jobID} to CE {self.ceHost}",
501+
f"{jobReference} to CE {self.ceHost}",
503502
)
504503

505504
if batchIDList:
@@ -514,16 +513,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
514513
def killJob(self, jobIDList):
515514
"""Kill the specified jobs
516515
517-
:param list jobIDList: list of DIRAC Job IDs
516+
:param list jobIDList: list of Job references
518517
"""
519518
if not isinstance(jobIDList, list):
520519
jobIDList = [jobIDList]
521520
self.log.debug("Killing jobs", ",".join(jobIDList))
522521

523-
# Convert DIRAC jobs to ARC jobs
524-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
525-
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
526-
return self._killJob(jList)
522+
# Convert job references to ARC jobs
523+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
524+
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
525+
return self._killJob(arcJobList)
527526

528527
def _killJob(self, arcJobList):
529528
"""Kill the specified jobs
@@ -556,16 +555,16 @@ def _killJob(self, arcJobList):
556555
def cleanJob(self, jobIDList):
557556
"""Clean files related to the specified jobs
558557
559-
:param list jobIDList: list of DIRAC Job IDs
558+
:param list jobIDList: list of job references
560559
"""
561560
if not isinstance(jobIDList, list):
562561
jobIDList = [jobIDList]
563562
self.log.debug("Cleaning jobs", ",".join(jobIDList))
564563

565-
# Convert DIRAC jobs to ARC jobs
566-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
567-
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
568-
return self._cleanJob(jList)
564+
# Convert job references to ARC jobs
565+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
566+
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
567+
return self._cleanJob(arcJobList)
569568

570569
def _cleanJob(self, arcJobList):
571570
"""Clean files related to the specified jobs
@@ -721,7 +720,7 @@ def _renewDelegation(self, delegationID):
721720
def getJobStatus(self, jobIDList):
722721
"""Get the status information for the given list of jobs.
723722
724-
:param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp.
723+
:param list jobIDList: list of job references, followed by the DIRAC stamp.
725724
"""
726725
result = self._checkSession()
727726
if not result["OK"]:
@@ -732,9 +731,9 @@ def getJobStatus(self, jobIDList):
732731
jobIDList = [jobIDList]
733732

734733
self.log.debug("Getting status of jobs:", jobIDList)
735-
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
736-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
737-
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
734+
# Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
735+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
736+
arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]}
738737

739738
# Prepare the command
740739
params = {"action": "status"}
@@ -757,16 +756,16 @@ def getJobStatus(self, jobIDList):
757756
arcJobsInfo = [arcJobsInfo]
758757

759758
for arcJob in arcJobsInfo:
760-
jobID = self._arcToDiracID(arcJob["id"])
759+
jobReference = self._arcIDToJobReference(arcJob["id"])
761760
# ARC REST interface returns hyperbole
762761
arcState = arcJob["state"].capitalize()
763-
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
764-
resultDict[jobID] = self.mapStates[arcState]
762+
self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}")
763+
resultDict[jobReference] = self.mapStates[arcState]
765764

766765
# Cancel held jobs so they don't sit in the queue forever
767766
if arcState == "Hold":
768767
jobsToCancel.append(arcJob["id"])
769-
self.log.debug(f"Killing held job {jobID}")
768+
self.log.debug(f"Killing held job {jobReference}")
770769

771770
# Renew delegations to renew the proxies of the jobs
772771
result = self._getDelegationIDs()
@@ -793,7 +792,7 @@ def getJobStatus(self, jobIDList):
793792
def getJobLog(self, jobID):
794793
"""Get job logging info
795794
796-
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
795+
:param str jobID: Job reference followed by the DIRAC stamp.
797796
:return: string representing the logging info of a given jobID
798797
"""
799798
result = self._checkSession()
@@ -802,7 +801,7 @@ def getJobLog(self, jobID):
802801
return result
803802

804803
# Prepare the command: Get output files
805-
arcJob = self._DiracToArcID(jobID.split(":::")[0])
804+
arcJob = self._jobReferenceToArcID(jobID.split(":::")[0])
806805
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))
807806

808807
# Submit the GET request to retrieve outputs
@@ -821,7 +820,7 @@ def getJobLog(self, jobID):
821820
def _getListOfAvailableOutputs(self, jobID, arcJobID):
822821
"""Request a list of outputs available for a given jobID.
823822
824-
:param str jobID: DIRAC job ID without the DIRAC stamp
823+
:param str jobID: job reference without the DIRAC stamp
825824
:param str arcJobID: ARC job ID
826825
:return list: names of the available outputs
827826
"""
@@ -841,11 +840,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
841840
return S_OK(response.json()["file"])
842841

843842
def getJobOutput(self, jobID, workingDirectory=None):
844-
"""Get the outputs of the given DIRAC job ID.
843+
"""Get the outputs of the given job reference.
845844
846845
Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.
847846
848-
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
847+
:param str jobID: job reference followed by the DIRAC stamp.
849848
:param str workingDirectory: name of the directory containing the retrieved outputs.
850849
:return: content of stdout and stderr
851850
"""
@@ -859,10 +858,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
859858
jobRef, stamp = jobID.split(":::")
860859
else:
861860
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
862-
job = self._DiracToArcID(jobRef)
861+
arcJob = self._jobReferenceToArcID(jobRef)
863862

864863
# Get the list of available outputs
865-
result = self._getListOfAvailableOutputs(jobRef, job)
864+
result = self._getListOfAvailableOutputs(jobRef, arcJob)
866865
if not result["OK"]:
867866
return result
868867
remoteOutputs = result["Value"]
@@ -871,21 +870,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
871870
if not workingDirectory:
872871
if "WorkingDirectory" in self.ceParameters:
873872
# We assume that workingDirectory exists
874-
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
873+
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
875874
else:
876-
workingDirectory = job
875+
workingDirectory = arcJob
877876
os.mkdir(workingDirectory)
878877

879878
stdout = None
880879
stderr = None
881880
for remoteOutput in remoteOutputs:
882881
# Prepare the command
883-
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
882+
query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))
884883

885884
# Submit the GET request to retrieve outputs
886885
result = self._request("get", query, stream=True)
887886
if not result["OK"]:
888-
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
887+
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
889888
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
890889
response = result["Value"]
891890

0 commit comments

Comments
 (0)