Skip to content

Commit 7b5fedf

Browse files
authored
Merge pull request #7102 from aldbr/cherry-pick-2-7530e17f8-integration
[sweep:integration] AREX fixes: proxy renewal logic + submission with tokens + correctly report aborted pilots
2 parents d0b65b8 + 42b31e1 commit 7b5fedf

File tree

2 files changed

+124
-106
lines changed

2 files changed

+124
-106
lines changed

src/DIRAC/Resources/Computing/ARCComputingElement.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
MANDATORY_PARAMETERS = ["Queue"] # Mandatory for ARC CEs in GLUE2?
6565
# See https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#rest-interface-job-states
6666
# We let "Deleted, Hold, Undefined" for the moment as we are not sure whether they are still used
67+
# "None" is a special case: it is returned when the job ID is not found in the system
6768
STATES_MAP = {
6869
"Accepting": PilotStatus.WAITING,
6970
"Accepted": PilotStatus.WAITING,
@@ -84,6 +85,7 @@
8485
"Wiped": PilotStatus.ABORTED,
8586
"Deleted": PilotStatus.ABORTED,
8687
"Hold": PilotStatus.FAILED,
88+
"None": PilotStatus.ABORTED,
8789
"Undefined": PilotStatus.UNKNOWN,
8890
}
8991

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 122 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,16 @@ def __init__(self, ceUniqueID):
4646
self.restVersion = "1.0"
4747
# Time left before proxy renewal: 3 hours is a good default
4848
self.proxyTimeLeftBeforeRenewal = 10800
49+
# Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50+
self._delegationID = None
4951
# Timeout
5052
self.timeout = 5.0
5153
# Request session
5254
self.session = None
53-
self.headers = {}
55+
self.headers = {
56+
"Accept": "application/json",
57+
"Content-Type": "application/json",
58+
}
5459
# URL used to communicate with the REST interface
5560
self.base_url = ""
5661

@@ -80,13 +85,6 @@ def _reset(self):
8085
# Set up the request framework
8186
self.session = requests.Session()
8287
self.session.verify = Locations.getCAsLocation()
83-
self.headers = {
84-
"Accept": "application/json",
85-
"Content-Type": "application/json",
86-
}
87-
# Attach the token to the headers if present
88-
if os.environ.get("BEARER_TOKEN"):
89-
self.headers["Authorization"] = "Bearer " + os.environ["BEARER_TOKEN"]
9088

9189
return S_OK()
9290

@@ -176,11 +174,22 @@ def _checkSession(self):
176174
if not self.session:
177175
return S_ERROR("REST interface not initialised.")
178176

179-
# Get a proxy
177+
# Reinitialize the authentication parameters
178+
self.session.cert = None
179+
self.headers.pop("Authorization", None)
180+
181+
# Get a proxy: still mandatory, even if tokens are used to authenticate
180182
result = self._prepareProxy()
181183
if not result["OK"]:
182184
self.log.error("Failed to set up proxy", result["Message"])
183185
return result
186+
187+
if self.token:
188+
# Attach the token to the headers if present
189+
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
190+
return S_OK()
191+
192+
# Attach the proxy to the session, only if the token is unavailable
184193
self.session.cert = Locations.getProxyLocation()
185194
return S_OK()
186195

@@ -226,11 +235,15 @@ def __uploadCertificate(self, delegationID, csrContent):
226235

227236
# Get a proxy and sign the CSR
228237
proxy = X509Chain()
229-
result = proxy.loadProxyFromFile(self.session.cert)
238+
proxyFile = Locations.getProxyLocation()
239+
if not proxyFile:
240+
return S_ERROR(f"No proxy available")
241+
result = proxy.loadProxyFromFile(proxyFile)
230242
if not result["OK"]:
231-
return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}")
243+
return S_ERROR(f"Can't load {proxyFile}: {result['Message']}")
232244
result = proxy.generateChainFromRequestString(csrContent)
233245
if not result["OK"]:
246+
self.log.error("Problem with the Certificate Signing Request:", result["Message"])
234247
return S_ERROR("Problem with the Certificate Signing Request")
235248

236249
# Submit the certificate
@@ -262,38 +275,44 @@ def _prepareDelegation(self):
262275
return result
263276
return S_OK(delegationID)
264277

265-
def _getDelegationID(self, arcJobID):
266-
"""Query and return the delegation ID of the given job.
278+
def _getDelegationIDs(self):
279+
"""Query and return the delegation IDs.
267280
268-
This happens when the call is from self.renewJobs. This function needs to know the
269-
delegation associated to the job
281+
This happens when the call is from self.renewDelegations.
270282
More info at
271283
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management
272284
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management
273285
274-
:param str jobID: ARC job ID
275-
:return: delegation ID
286+
:return: list of delegation IDs
276287
"""
277-
params = {"action": "delegations"}
278-
query = self._urlJoin("jobs")
288+
query = self._urlJoin("delegations")
279289

280290
# Submit the POST request to get the delegation
281-
jobsJson = {"job": [{"id": arcJobID}]}
282-
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
291+
result = self._request("get", query)
283292
if not result["OK"]:
284-
self.log.error("Issue while interacting with the delegation.", result["Message"])
285-
return S_ERROR("Issue while interacting with the delegation")
293+
self.log.error("Issue while interacting with the delegations.", result["Message"])
294+
return S_ERROR("Issue while interacting with the delegations")
286295
response = result["Value"]
287296

288-
responseDelegation = response.json()
289-
if "delegation_id" not in responseDelegation["job"]:
290-
return S_ERROR(f"Cannot find the Delegation ID for Job {arcJobID}")
297+
# If there is no delegation, response.json is expected to return an exception
298+
try:
299+
responseDelegation = response.json()
300+
except requests.JSONDecodeError:
301+
return S_OK([])
302+
303+
# This is not expected
304+
if "delegation" not in responseDelegation:
305+
return S_OK([])
306+
307+
# If there is a single delegationID, then we get an str instead of a list
308+
# Not specified in the documentation
309+
delegations = responseDelegation["delegation"]
310+
if isinstance(delegations, dict):
311+
delegations = [delegations]
291312

292-
delegationIDs = responseDelegation["job"]["delegation_id"]
293-
# Documentation says "Array", but a single string is returned if there is only one
294-
if not isinstance(delegationIDs, list):
295-
delegationIDs = [delegationIDs]
296-
return S_OK(delegationIDs[0])
313+
# responseDelegation should be {'delegation': [{'id': <delegationID>}, ...]}
314+
delegationIDs = [delegationContent["id"] for delegationContent in delegations]
315+
return S_OK(delegationIDs)
297316

298317
#############################################################################
299318

@@ -374,14 +393,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
374393

375394
self.log.verbose(f"Executable file path: {executableFile}")
376395

377-
# Get a "delegation" and use the same delegation for all the jobs
378-
delegation = ""
379-
result = self._prepareDelegation()
396+
# Get a delegation and use the same delegation for all the jobs
397+
result = self._getDelegationIDs()
380398
if not result["OK"]:
381-
self.log.warn("Could not get a delegation", f"For CE {self.ceHost}")
382-
self.log.warn("Continue without a delegation")
399+
self.log.error("Could not get delegation IDs.", result["Message"])
400+
return S_ERROR("Could not get delegation IDs")
401+
402+
delegationIDs = result["Value"]
403+
if not delegationIDs:
404+
# No existing delegation, we need to prepare one
405+
result = self._prepareDelegation()
406+
if not result["OK"]:
407+
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
408+
return S_ERROR("Could not get a new delegation")
409+
self._delegationID = result["Value"]
383410
else:
384-
delegation = f"\n(delegationid={result['Value']})"
411+
self._delegationID = delegationIDs[0]
412+
delegation = f"\n(delegationid={self._delegationID})"
385413

386414
if not inputs:
387415
inputs = []
@@ -563,73 +591,66 @@ def getCEStatus(self):
563591

564592
#############################################################################
565593

566-
def _renewJobs(self, arcJobList):
567-
"""Written for the REST interface - jobList is already in the ARC format
568-
569-
:param list arcJobList: list of ARC Job ID
570-
"""
571-
# Renew the jobs
572-
for arcJob in arcJobList:
573-
# First get the delegation (proxy)
574-
result = self._getDelegationID(arcJob)
575-
if not result["OK"]:
576-
self.log.warn("Could not get a delegation from", f"Job {arcJob}")
577-
continue
578-
delegationID = result["Value"]
579-
580-
# Prepare the command
581-
params = {"action": "get"}
582-
query = self._urlJoin(os.path.join("delegations", delegationID))
594+
def _renewDelegation(self):
595+
"""Renew the delegations"""
596+
# Prepare the command
597+
params = {"action": "get"}
598+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
583599

584-
# Submit the POST request to get the proxy
585-
result = self._request("post", query, params=params)
586-
if not result["OK"]:
587-
self.log.debug("Could not get a proxy for", f"job {arcJob}: {result['Message']}")
588-
continue
589-
response = result["Value"]
600+
# Submit the POST request to get the proxy
601+
result = self._request("post", query, params=params)
602+
if not result["OK"]:
603+
self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}")
604+
return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}")
605+
response = result["Value"]
590606

591-
proxy = X509Chain()
592-
result = proxy.loadChainFromString(response.text)
593-
if not result["OK"]:
594-
continue
607+
proxy = X509Chain()
608+
result = proxy.loadChainFromString(response.text)
609+
if not result["OK"]:
610+
self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}")
611+
return S_ERROR(f"Could not load proxy for delegation {self._delegationID}")
595612

596-
# Now test and renew the proxy
597-
result = proxy.getRemainingSecs()
598-
if not result["OK"]:
599-
continue
600-
timeLeft = result["Value"]
613+
# Now test and renew the proxy
614+
result = proxy.getRemainingSecs()
615+
if not result["OK"]:
616+
self.log.error(
617+
"Could not get remaining time from the proxy for",
618+
f"delegation {self._delegationID}: {result['Message']}",
619+
)
620+
return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}")
621+
timeLeft = result["Value"]
601622

602-
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
603-
# No need to renew. Proxy is long enough
604-
continue
623+
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
624+
# No need to renew. Proxy is long enough
625+
return S_OK()
605626

606-
self.log.debug(
607-
"Renewing proxy for job",
608-
f"{arcJob} whose proxy expires at {timeLeft}",
627+
self.log.verbose(
628+
"Renewing delegation",
629+
f"{self._delegationID} whose proxy expires at {timeLeft}",
630+
)
631+
# Proxy needs to be renewed - try to renew it
632+
# First, get a new CSR from the delegation
633+
params = {"action": "renew"}
634+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
635+
result = self._request("post", query, params=params)
636+
if not result["OK"]:
637+
self.log.error(
638+
"Proxy not renewed, failed to get CSR",
639+
f"for delegation {self._delegationID}",
609640
)
610-
# Proxy needs to be renewed - try to renew it
611-
# First, get a new CSR from the delegation
612-
params = {"action": "renew"}
613-
query = self._urlJoin(os.path.join("delegations", delegationID))
614-
result = self._request("post", query, params=params)
641+
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}")
642+
response = result["Value"]
615643

616-
if not response.ok:
617-
self.log.debug(
618-
"Proxy not renewed, failed to get CSR",
619-
f"for job {arcJob} with delegation {delegationID}",
620-
)
621-
continue
622-
623-
# Then, sign and upload the certificate
624-
result = self.__uploadCertificate(delegationID, response.text)
625-
if not result["OK"]:
626-
self.log.debug(
627-
"Proxy not renewed, failed to send renewed proxy",
628-
f"for job {arcJob} with delegation {delegationID}: {result['Message']}",
629-
)
630-
continue
644+
# Then, sign and upload the certificate
645+
result = self.__uploadCertificate(self._delegationID, response.text)
646+
if not result["OK"]:
647+
self.log.error(
648+
"Proxy not renewed, failed to send renewed proxy",
649+
f"delegation {self._delegationID}: {result['Message']}",
650+
)
651+
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}")
631652

632-
self.log.debug("Proxy successfully renewed", f"for job {arcJob}")
653+
self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}")
633654

634655
return S_OK()
635656

@@ -665,7 +686,6 @@ def getJobStatus(self, jobIDList):
665686
response = result["Value"]
666687

667688
resultDict = {}
668-
jobsToRenew = []
669689
jobsToCancel = []
670690

671691
# A single job is returned in a dict, while multiple jobs are returned in a list
@@ -681,23 +701,19 @@ def getJobStatus(self, jobIDList):
681701
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
682702
resultDict[jobID] = self.mapStates[arcState]
683703

684-
# Renew proxy only of jobs which are running or queuing
685-
if arcState in ("Running", "Queuing"):
686-
jobsToRenew.append(arcJob["id"])
687704
# Cancel held jobs so they don't sit in the queue forever
688705
if arcState == "Hold":
689706
jobsToCancel.append(arcJob["id"])
690707
self.log.debug(f"Killing held job {jobID}")
691708

692-
# Renew jobs to be renewed
693-
# Does not work at present - wait for a new release of ARC CEs for this.
694-
if jobsToRenew:
695-
result = self._renewJobs(jobsToRenew)
709+
# Renew delegation to renew the proxies of the jobs
710+
if self._delegationID:
711+
result = self._renewDelegation()
696712
if not result["OK"]:
697713
# Only log here as we still want to return statuses
698-
self.log.warn("Failed to renew job proxies:", result["Message"])
714+
self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}")
699715

700-
# Kill jobs to be killed
716+
# Kill held jobs
701717
if jobsToCancel:
702718
result = self._killJob(jobsToCancel)
703719
if not result["OK"]:

0 commit comments

Comments
 (0)