15
15
Port added to the CE host name to interact with AREX services.
16
16
17
17
ProxyTimeLeftBeforeRenewal:
18
- Time in seconds before the AREXCE renews proxy of submitted pilots .
18
+ Time in seconds before the AREXCE renews proxy of submitted payloads .
19
19
20
20
RESTVersion:
21
21
Version of the REST interface to use.
@@ -97,34 +97,33 @@ def setToken(self, token, valid):
97
97
super ().setToken (token , valid )
98
98
self .headers ["Authorization" ] = "Bearer " + self .token ["access_token" ]
99
99
100
- def _arcToDiracID (self , arcJobID ):
101
- """Convert an ARC jobID into a DIRAC jobID .
100
+ def _arcIDToJobReference (self , arcJobID ):
101
+ """Convert an ARC jobID into a job reference .
102
102
Example: 1234 becomes https://<ce>:<port>/arex/1234
103
103
104
104
:param str: ARC jobID
105
- :return: DIRAC jobID
105
+ :return: job reference, defined as an ARC jobID with additional details
106
106
"""
107
107
# Add CE and protocol information to arc Job ID
108
108
if "://" in arcJobID :
109
109
self .log .warn ("Identifier already in ARC format" , arcJobID )
110
110
return arcJobID
111
111
112
- diracJobID = "https://" + self .ceHost + ":" + self .port + "/arex/" + arcJobID
113
- return diracJobID
112
+ return f"https://{ self .ceHost } :{ self .port } /arex/{ arcJobID } "
114
113
115
- def _DiracToArcID (self , diracJobID ):
116
- """Convert a DIRAC jobID into an ARC jobID.
114
+ def _jobReferenceToArcID (self , jobReference ):
115
+ """Convert a job reference into an ARC jobID.
117
116
Example: https://<ce>:<port>/arex/1234 becomes 1234
118
117
119
- :param str: DIRAC jobID
118
+ :param str: job reference, defined as an ARC jobID with additional details
120
119
:return: ARC jobID
121
120
"""
122
121
# Remove CE and protocol information from arc Job ID
123
- if "://" in diracJobID :
124
- arcJobID = diracJobID .split ("arex/" )[- 1 ]
122
+ if "://" in jobReference :
123
+ arcJobID = jobReference .split ("arex/" )[- 1 ]
125
124
return arcJobID
126
- self .log .warn ("Identifier already in REST format?" , diracJobID )
127
- return diracJobID
125
+ self .log .warn ("Identifier already in REST format?" , jobReference )
126
+ return jobReference
128
127
129
128
#############################################################################
130
129
@@ -486,12 +485,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
486
485
if not result ["OK" ]:
487
486
break
488
487
489
- jobID = self ._arcToDiracID (arcJobID )
490
- batchIDList .append (jobID )
491
- stampDict [jobID ] = diracStamp
488
+ jobReference = self ._arcIDToJobReference (arcJobID )
489
+ batchIDList .append (jobReference )
490
+ stampDict [jobReference ] = diracStamp
492
491
self .log .debug (
493
492
"Successfully submitted job" ,
494
- f"{ jobID } to CE { self .ceHost } " ,
493
+ f"{ jobReference } to CE { self .ceHost } " ,
495
494
)
496
495
497
496
if batchIDList :
@@ -506,16 +505,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
506
505
def killJob (self , jobIDList ):
507
506
"""Kill the specified jobs
508
507
509
- :param list jobIDList: list of DIRAC Job IDs
508
+ :param list jobIDList: list of Job references
510
509
"""
511
510
if not isinstance (jobIDList , list ):
512
511
jobIDList = [jobIDList ]
513
512
self .log .debug ("Killing jobs" , "," .join (jobIDList ))
514
513
515
- # Convert DIRAC jobs to ARC jobs
516
- # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
517
- jList = [self ._DiracToArcID (job .split (":::" )[0 ]) for job in jobIDList ]
518
- return self ._killJob (jList )
514
+ # Convert job references to ARC jobs
515
+ # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
516
+ arcJobList = [self ._jobReferenceToArcID (job .split (":::" )[0 ]) for job in jobIDList ]
517
+ return self ._killJob (arcJobList )
519
518
520
519
def _killJob (self , arcJobList ):
521
520
"""Kill the specified jobs
@@ -548,16 +547,16 @@ def _killJob(self, arcJobList):
548
547
def cleanJob (self , jobIDList ):
549
548
"""Clean files related to the specified jobs
550
549
551
- :param list jobIDList: list of DIRAC Job IDs
550
+ :param list jobIDList: list of job references
552
551
"""
553
552
if not isinstance (jobIDList , list ):
554
553
jobIDList = [jobIDList ]
555
554
self .log .debug ("Cleaning jobs" , "," .join (jobIDList ))
556
555
557
- # Convert DIRAC jobs to ARC jobs
558
- # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
559
- jList = [self ._DiracToArcID (job .split (":::" )[0 ]) for job in jobIDList ]
560
- return self ._cleanJob (jList )
556
+ # Convert job references to ARC jobs
557
+ # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
558
+ arcJobList = [self ._jobReferenceToArcID (job .split (":::" )[0 ]) for job in jobIDList ]
559
+ return self ._cleanJob (arcJobList )
561
560
562
561
def _cleanJob (self , arcJobList ):
563
562
"""Clean files related to the specified jobs
@@ -713,7 +712,7 @@ def _renewDelegation(self, delegationID):
713
712
def getJobStatus (self , jobIDList ):
714
713
"""Get the status information for the given list of jobs.
715
714
716
- :param list jobIDList: list of DIRAC Job ID , followed by the DIRAC stamp.
715
+ :param list jobIDList: list of job references , followed by the DIRAC stamp.
717
716
"""
718
717
result = self ._checkSession ()
719
718
if not result ["OK" ]:
@@ -724,9 +723,9 @@ def getJobStatus(self, jobIDList):
724
723
jobIDList = [jobIDList ]
725
724
726
725
self .log .debug ("Getting status of jobs:" , jobIDList )
727
- # Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
728
- # DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
729
- arcJobsJson = {"job" : [{"id" : self ._DiracToArcID (job .split (":::" )[0 ])} for job in jobIDList ]}
726
+ # Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
727
+ # Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
728
+ arcJobsJson = {"job" : [{"id" : self ._jobReferenceToArcID (job .split (":::" )[0 ])} for job in jobIDList ]}
730
729
731
730
# Prepare the command
732
731
params = {"action" : "status" }
@@ -749,16 +748,16 @@ def getJobStatus(self, jobIDList):
749
748
arcJobsInfo = [arcJobsInfo ]
750
749
751
750
for arcJob in arcJobsInfo :
752
- jobID = self ._arcToDiracID (arcJob ["id" ])
751
+ jobReference = self ._arcIDToJobReference (arcJob ["id" ])
753
752
# ARC REST interface returns hyperbole
754
753
arcState = arcJob ["state" ].capitalize ()
755
- self .log .debug ("REST ARC status" , f"for job { jobID } is { arcState } " )
756
- resultDict [jobID ] = self .mapStates [arcState ]
754
+ self .log .debug ("REST ARC status" , f"for job { jobReference } is { arcState } " )
755
+ resultDict [jobReference ] = self .mapStates [arcState ]
757
756
758
757
# Cancel held jobs so they don't sit in the queue forever
759
758
if arcState == "Hold" :
760
759
jobsToCancel .append (arcJob ["id" ])
761
- self .log .debug (f"Killing held job { jobID } " )
760
+ self .log .debug (f"Killing held job { jobReference } " )
762
761
763
762
# Renew delegations to renew the proxies of the jobs
764
763
result = self ._getDelegationIDs ()
@@ -785,7 +784,7 @@ def getJobStatus(self, jobIDList):
785
784
def getJobLog (self , jobID ):
786
785
"""Get job logging info
787
786
788
- :param str jobID: DIRAC JobID followed by the DIRAC stamp.
787
+ :param str jobID: Job reference followed by the DIRAC stamp.
789
788
:return: string representing the logging info of a given jobID
790
789
"""
791
790
result = self ._checkSession ()
@@ -794,7 +793,7 @@ def getJobLog(self, jobID):
794
793
return result
795
794
796
795
# Prepare the command: Get output files
797
- arcJob = self ._DiracToArcID (jobID .split (":::" )[0 ])
796
+ arcJob = self ._jobReferenceToArcID (jobID .split (":::" )[0 ])
798
797
query = self ._urlJoin (os .path .join ("jobs" , arcJob , "diagnose" , "errors" ))
799
798
800
799
# Submit the GET request to retrieve outputs
@@ -813,7 +812,7 @@ def getJobLog(self, jobID):
813
812
def _getListOfAvailableOutputs (self , jobID , arcJobID ):
814
813
"""Request a list of outputs available for a given jobID.
815
814
816
- :param str jobID: DIRAC job ID without the DIRAC stamp
815
+ :param str jobID: job reference without the DIRAC stamp
817
816
:param str arcJobID: ARC job ID
818
817
:return list: names of the available outputs
819
818
"""
@@ -833,11 +832,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
833
832
return S_OK (response .json ()["file" ])
834
833
835
834
def getJobOutput (self , jobID , workingDirectory = None ):
836
- """Get the outputs of the given DIRAC job ID .
835
+ """Get the outputs of the given job reference .
837
836
838
837
Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.
839
838
840
- :param str jobID: DIRAC JobID followed by the DIRAC stamp.
839
+ :param str jobID: job reference followed by the DIRAC stamp.
841
840
:param str workingDirectory: name of the directory containing the retrieved outputs.
842
841
:return: content of stdout and stderr
843
842
"""
@@ -851,10 +850,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
851
850
jobRef , stamp = jobID .split (":::" )
852
851
else :
853
852
return S_ERROR (f"DIRAC stamp not defined for { jobID } " )
854
- job = self ._DiracToArcID (jobRef )
853
+ arcJob = self ._jobReferenceToArcID (jobRef )
855
854
856
855
# Get the list of available outputs
857
- result = self ._getListOfAvailableOutputs (jobRef , job )
856
+ result = self ._getListOfAvailableOutputs (jobRef , arcJob )
858
857
if not result ["OK" ]:
859
858
return result
860
859
remoteOutputs = result ["Value" ]
@@ -863,21 +862,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
863
862
if not workingDirectory :
864
863
if "WorkingDirectory" in self .ceParameters :
865
864
# We assume that workingDirectory exists
866
- workingDirectory = os .path .join (self .ceParameters ["WorkingDirectory" ], job )
865
+ workingDirectory = os .path .join (self .ceParameters ["WorkingDirectory" ], arcJob )
867
866
else :
868
- workingDirectory = job
867
+ workingDirectory = arcJob
869
868
os .mkdir (workingDirectory )
870
869
871
870
stdout = None
872
871
stderr = None
873
872
for remoteOutput in remoteOutputs :
874
873
# Prepare the command
875
- query = self ._urlJoin (os .path .join ("jobs" , job , "session" , remoteOutput ))
874
+ query = self ._urlJoin (os .path .join ("jobs" , arcJob , "session" , remoteOutput ))
876
875
877
876
# Submit the GET request to retrieve outputs
878
877
result = self ._request ("get" , query , stream = True )
879
878
if not result ["OK" ]:
880
- self .log .error ("Error downloading" , f"{ remoteOutput } for { job } : { result ['Message' ]} " )
879
+ self .log .error ("Error downloading" , f"{ remoteOutput } for { arcJob } : { result ['Message' ]} " )
881
880
return S_ERROR (f"Error downloading { remoteOutput } for { jobID } " )
882
881
response = result ["Value" ]
883
882
0 commit comments