Skip to content

Commit dff8024

Browse files
authored
Merge pull request #7279 from DIRACGridBot/cherry-pick-2-a04d867cd-integration
[sweep:integration] Getting more details about failed/aborted pilots from HTCondor
2 parents 38cde53 + 1cffae5 commit dff8024

File tree

4 files changed

+375
-351
lines changed

4 files changed

+375
-351
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 45 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
Port added to the CE host name to interact with AREX services.
1616
1717
ProxyTimeLeftBeforeRenewal:
18-
Time in seconds before the AREXCE renews proxy of submitted pilots.
18+
Time in seconds before the AREXCE renews proxy of submitted payloads.
1919
2020
RESTVersion:
2121
Version of the REST interface to use.
@@ -97,34 +97,33 @@ def setToken(self, token, valid):
9797
super().setToken(token, valid)
9898
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
9999

100-
def _arcToDiracID(self, arcJobID):
101-
"""Convert an ARC jobID into a DIRAC jobID.
100+
def _arcIDToJobReference(self, arcJobID):
101+
"""Convert an ARC jobID into a job reference.
102102
Example: 1234 becomes https://<ce>:<port>/arex/1234
103103
104104
:param str: ARC jobID
105-
:return: DIRAC jobID
105+
:return: job reference, defined as an ARC jobID with additional details
106106
"""
107107
# Add CE and protocol information to arc Job ID
108108
if "://" in arcJobID:
109109
self.log.warn("Identifier already in ARC format", arcJobID)
110110
return arcJobID
111111

112-
diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID
113-
return diracJobID
112+
return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}"
114113

115-
def _DiracToArcID(self, diracJobID):
116-
"""Convert a DIRAC jobID into an ARC jobID.
114+
def _jobReferenceToArcID(self, jobReference):
115+
"""Convert a job reference into an ARC jobID.
117116
Example: https://<ce>:<port>/arex/1234 becomes 1234
118117
119-
:param str: DIRAC jobID
118+
:param str: job reference, defined as an ARC jobID with additional details
120119
:return: ARC jobID
121120
"""
122121
# Remove CE and protocol information from arc Job ID
123-
if "://" in diracJobID:
124-
arcJobID = diracJobID.split("arex/")[-1]
122+
if "://" in jobReference:
123+
arcJobID = jobReference.split("arex/")[-1]
125124
return arcJobID
126-
self.log.warn("Identifier already in REST format?", diracJobID)
127-
return diracJobID
125+
self.log.warn("Identifier already in REST format?", jobReference)
126+
return jobReference
128127

129128
#############################################################################
130129

@@ -486,12 +485,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
486485
if not result["OK"]:
487486
break
488487

489-
jobID = self._arcToDiracID(arcJobID)
490-
batchIDList.append(jobID)
491-
stampDict[jobID] = diracStamp
488+
jobReference = self._arcIDToJobReference(arcJobID)
489+
batchIDList.append(jobReference)
490+
stampDict[jobReference] = diracStamp
492491
self.log.debug(
493492
"Successfully submitted job",
494-
f"{jobID} to CE {self.ceHost}",
493+
f"{jobReference} to CE {self.ceHost}",
495494
)
496495

497496
if batchIDList:
@@ -506,16 +505,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
506505
def killJob(self, jobIDList):
507506
"""Kill the specified jobs
508507
509-
:param list jobIDList: list of DIRAC Job IDs
508+
:param list jobIDList: list of Job references
510509
"""
511510
if not isinstance(jobIDList, list):
512511
jobIDList = [jobIDList]
513512
self.log.debug("Killing jobs", ",".join(jobIDList))
514513

515-
# Convert DIRAC jobs to ARC jobs
516-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
517-
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
518-
return self._killJob(jList)
514+
# Convert job references to ARC jobs
515+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
516+
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
517+
return self._killJob(arcJobList)
519518

520519
def _killJob(self, arcJobList):
521520
"""Kill the specified jobs
@@ -548,16 +547,16 @@ def _killJob(self, arcJobList):
548547
def cleanJob(self, jobIDList):
549548
"""Clean files related to the specified jobs
550549
551-
:param list jobIDList: list of DIRAC Job IDs
550+
:param list jobIDList: list of job references
552551
"""
553552
if not isinstance(jobIDList, list):
554553
jobIDList = [jobIDList]
555554
self.log.debug("Cleaning jobs", ",".join(jobIDList))
556555

557-
# Convert DIRAC jobs to ARC jobs
558-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
559-
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
560-
return self._cleanJob(jList)
556+
# Convert job references to ARC jobs
557+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
558+
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
559+
return self._cleanJob(arcJobList)
561560

562561
def _cleanJob(self, arcJobList):
563562
"""Clean files related to the specified jobs
@@ -713,7 +712,7 @@ def _renewDelegation(self, delegationID):
713712
def getJobStatus(self, jobIDList):
714713
"""Get the status information for the given list of jobs.
715714
716-
:param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp.
715+
:param list jobIDList: list of job references, followed by the DIRAC stamp.
717716
"""
718717
result = self._checkSession()
719718
if not result["OK"]:
@@ -724,9 +723,9 @@ def getJobStatus(self, jobIDList):
724723
jobIDList = [jobIDList]
725724

726725
self.log.debug("Getting status of jobs:", jobIDList)
727-
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
728-
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
729-
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
726+
# Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
727+
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
728+
arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]}
730729

731730
# Prepare the command
732731
params = {"action": "status"}
@@ -749,16 +748,16 @@ def getJobStatus(self, jobIDList):
749748
arcJobsInfo = [arcJobsInfo]
750749

751750
for arcJob in arcJobsInfo:
752-
jobID = self._arcToDiracID(arcJob["id"])
751+
jobReference = self._arcIDToJobReference(arcJob["id"])
753752
# ARC REST interface returns hyperbole
754753
arcState = arcJob["state"].capitalize()
755-
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
756-
resultDict[jobID] = self.mapStates[arcState]
754+
self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}")
755+
resultDict[jobReference] = self.mapStates[arcState]
757756

758757
# Cancel held jobs so they don't sit in the queue forever
759758
if arcState == "Hold":
760759
jobsToCancel.append(arcJob["id"])
761-
self.log.debug(f"Killing held job {jobID}")
760+
self.log.debug(f"Killing held job {jobReference}")
762761

763762
# Renew delegations to renew the proxies of the jobs
764763
result = self._getDelegationIDs()
@@ -785,7 +784,7 @@ def getJobStatus(self, jobIDList):
785784
def getJobLog(self, jobID):
786785
"""Get job logging info
787786
788-
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
787+
:param str jobID: Job reference followed by the DIRAC stamp.
789788
:return: string representing the logging info of a given jobID
790789
"""
791790
result = self._checkSession()
@@ -794,7 +793,7 @@ def getJobLog(self, jobID):
794793
return result
795794

796795
# Prepare the command: Get output files
797-
arcJob = self._DiracToArcID(jobID.split(":::")[0])
796+
arcJob = self._jobReferenceToArcID(jobID.split(":::")[0])
798797
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))
799798

800799
# Submit the GET request to retrieve outputs
@@ -813,7 +812,7 @@ def getJobLog(self, jobID):
813812
def _getListOfAvailableOutputs(self, jobID, arcJobID):
814813
"""Request a list of outputs available for a given jobID.
815814
816-
:param str jobID: DIRAC job ID without the DIRAC stamp
815+
:param str jobID: job reference without the DIRAC stamp
817816
:param str arcJobID: ARC job ID
818817
:return list: names of the available outputs
819818
"""
@@ -833,11 +832,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
833832
return S_OK(response.json()["file"])
834833

835834
def getJobOutput(self, jobID, workingDirectory=None):
836-
"""Get the outputs of the given DIRAC job ID.
835+
"""Get the outputs of the given job reference.
837836
838837
Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.
839838
840-
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
839+
:param str jobID: job reference followed by the DIRAC stamp.
841840
:param str workingDirectory: name of the directory containing the retrieved outputs.
842841
:return: content of stdout and stderr
843842
"""
@@ -851,10 +850,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
851850
jobRef, stamp = jobID.split(":::")
852851
else:
853852
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
854-
job = self._DiracToArcID(jobRef)
853+
arcJob = self._jobReferenceToArcID(jobRef)
855854

856855
# Get the list of available outputs
857-
result = self._getListOfAvailableOutputs(jobRef, job)
856+
result = self._getListOfAvailableOutputs(jobRef, arcJob)
858857
if not result["OK"]:
859858
return result
860859
remoteOutputs = result["Value"]
@@ -863,21 +862,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
863862
if not workingDirectory:
864863
if "WorkingDirectory" in self.ceParameters:
865864
# We assume that workingDirectory exists
866-
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
865+
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
867866
else:
868-
workingDirectory = job
867+
workingDirectory = arcJob
869868
os.mkdir(workingDirectory)
870869

871870
stdout = None
872871
stderr = None
873872
for remoteOutput in remoteOutputs:
874873
# Prepare the command
875-
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
874+
query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))
876875

877876
# Submit the GET request to retrieve outputs
878877
result = self._request("get", query, stream=True)
879878
if not result["OK"]:
880-
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
879+
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
881880
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
882881
response = result["Value"]
883882

0 commit comments

Comments
 (0)