Skip to content

Commit 8b1be49

Browse files
committed
fix: AREX proxy renewal logic
1 parent 4454fcd commit 8b1be49

File tree

1 file changed

+101
-95
lines changed

1 file changed

+101
-95
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 101 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ def __init__(self, ceUniqueID):
4646
self.restVersion = "1.0"
4747
# Time left before proxy renewal: 3 hours is a good default
4848
self.proxyTimeLeftBeforeRenewal = 10800
49+
# Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50+
self._delegationID = None
4951
# Timeout
5052
self.timeout = 5.0
5153
# Request session
@@ -239,6 +241,7 @@ def __uploadCertificate(self, delegationID, csrContent):
239241
return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}")
240242
result = proxy.generateChainFromRequestString(csrContent)
241243
if not result["OK"]:
244+
self.log.error("Problem with the Certificate Signing Request:", result["Message"])
242245
return S_ERROR("Problem with the Certificate Signing Request")
243246

244247
# Submit the certificate
@@ -270,38 +273,44 @@ def _prepareDelegation(self):
270273
return result
271274
return S_OK(delegationID)
272275

273-
def _getDelegationID(self, arcJobID):
274-
"""Query and return the delegation ID of the given job.
276+
def _getDelegationIDs(self):
277+
"""Query and return the delegation IDs.
275278
276-
This happens when the call is from self.renewJobs. This function needs to know the
277-
delegation associated to the job
279+
This happens when the call is from self.renewDelegations.
278280
More info at
279281
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management
280282
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management
281283
282-
:param str jobID: ARC job ID
283-
:return: delegation ID
284+
:return: list of delegation IDs
284285
"""
285-
params = {"action": "delegations"}
286-
query = self._urlJoin("jobs")
286+
query = self._urlJoin("delegations")
287287

288288
# Submit the POST request to get the delegation
289-
jobsJson = {"job": [{"id": arcJobID}]}
290-
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
289+
result = self._request("get", query)
291290
if not result["OK"]:
292-
self.log.error("Issue while interacting with the delegation.", result["Message"])
293-
return S_ERROR("Issue while interacting with the delegation")
291+
self.log.error("Issue while interacting with the delegations.", result["Message"])
292+
return S_ERROR("Issue while interacting with the delegations")
294293
response = result["Value"]
295294

296-
responseDelegation = response.json()
297-
if "delegation_id" not in responseDelegation["job"]:
298-
return S_ERROR(f"Cannot find the Delegation ID for Job {arcJobID}")
295+
# If there is no delegation, response.json is expected to return an exception
296+
try:
297+
responseDelegation = response.json()
298+
except requests.JSONDecodeError:
299+
return S_OK([])
300+
301+
# This is not expected
302+
if "delegation" not in responseDelegation:
303+
return S_OK([])
299304

300-
delegationIDs = responseDelegation["job"]["delegation_id"]
301-
# Documentation says "Array", but a single string is returned if there is only one
302-
if not isinstance(delegationIDs, list):
303-
delegationIDs = [delegationIDs]
304-
return S_OK(delegationIDs[0])
305+
# If there is a single delegationID, then we get an str instead of a list
306+
# Not specified in the documentation
307+
delegations = responseDelegation["delegation"]
308+
if isinstance(delegations, dict):
309+
delegations = [delegations]
310+
311+
# responseDelegation should be {'delegation': [{'id': <delegationID>}, ...]}
312+
delegationIDs = [delegationContent["id"] for delegationContent in delegations]
313+
return S_OK(delegationIDs)
305314

306315
#############################################################################
307316

@@ -382,14 +391,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
382391

383392
self.log.verbose(f"Executable file path: {executableFile}")
384393

385-
# Get a "delegation" and use the same delegation for all the jobs
386-
delegation = ""
387-
result = self._prepareDelegation()
394+
# Get a delegation and use the same delegation for all the jobs
395+
result = self._getDelegationIDs()
388396
if not result["OK"]:
389-
self.log.warn("Could not get a delegation", f"For CE {self.ceHost}")
390-
self.log.warn("Continue without a delegation")
397+
self.log.error("Could not get delegation IDs.", result["Message"])
398+
return S_ERROR("Could not get delegation IDs")
399+
400+
delegationIDs = result["Value"]
401+
if not delegationIDs:
402+
# No existing delegation, we need to prepare one
403+
result = self._prepareDelegation()
404+
if not result["OK"]:
405+
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
406+
return S_ERROR("Could not get a new delegation")
407+
self._delegationID = result["Value"]
391408
else:
392-
delegation = f"\n(delegationid={result['Value']})"
409+
self._delegationID = delegationIDs[0]
410+
delegation = f"\n(delegationid={self._delegationID})"
393411

394412
if not inputs:
395413
inputs = []
@@ -571,73 +589,66 @@ def getCEStatus(self):
571589

572590
#############################################################################
573591

574-
def _renewJobs(self, arcJobList):
575-
"""Written for the REST interface - jobList is already in the ARC format
576-
577-
:param list arcJobList: list of ARC Job ID
578-
"""
579-
# Renew the jobs
580-
for arcJob in arcJobList:
581-
# First get the delegation (proxy)
582-
result = self._getDelegationID(arcJob)
583-
if not result["OK"]:
584-
self.log.warn("Could not get a delegation from", f"Job {arcJob}")
585-
continue
586-
delegationID = result["Value"]
587-
588-
# Prepare the command
589-
params = {"action": "get"}
590-
query = self._urlJoin(os.path.join("delegations", delegationID))
592+
def _renewDelegation(self):
593+
"""Renew the delegations"""
594+
# Prepare the command
595+
params = {"action": "get"}
596+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
591597

592-
# Submit the POST request to get the proxy
593-
result = self._request("post", query, params=params)
594-
if not result["OK"]:
595-
self.log.debug("Could not get a proxy for", f"job {arcJob}: {result['Message']}")
596-
continue
597-
response = result["Value"]
598+
# Submit the POST request to get the proxy
599+
result = self._request("post", query, params=params)
600+
if not result["OK"]:
601+
self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}")
602+
return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}")
603+
response = result["Value"]
598604

599-
proxy = X509Chain()
600-
result = proxy.loadChainFromString(response.text)
601-
if not result["OK"]:
602-
continue
605+
proxy = X509Chain()
606+
result = proxy.loadChainFromString(response.text)
607+
if not result["OK"]:
608+
self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}")
609+
return S_ERROR(f"Could not load proxy for delegation {self._delegationID}")
603610

604-
# Now test and renew the proxy
605-
result = proxy.getRemainingSecs()
606-
if not result["OK"]:
607-
continue
608-
timeLeft = result["Value"]
611+
# Now test and renew the proxy
612+
result = proxy.getRemainingSecs()
613+
if not result["OK"]:
614+
self.log.error(
615+
"Could not get remaining time from the proxy for",
616+
f"delegation {self._delegationID}: {result['Message']}",
617+
)
618+
return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}")
619+
timeLeft = result["Value"]
609620

610-
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
611-
# No need to renew. Proxy is long enough
612-
continue
621+
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
622+
# No need to renew. Proxy is long enough
623+
return S_OK()
613624

614-
self.log.debug(
615-
"Renewing proxy for job",
616-
f"{arcJob} whose proxy expires at {timeLeft}",
625+
self.log.verbose(
626+
"Renewing delegation",
627+
f"{self._delegationID} whose proxy expires at {timeLeft}",
628+
)
629+
# Proxy needs to be renewed - try to renew it
630+
# First, get a new CSR from the delegation
631+
params = {"action": "renew"}
632+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
633+
result = self._request("post", query, params=params)
634+
if not result["OK"]:
635+
self.log.error(
636+
"Proxy not renewed, failed to get CSR",
637+
f"for delegation {self._delegationID}",
617638
)
618-
# Proxy needs to be renewed - try to renew it
619-
# First, get a new CSR from the delegation
620-
params = {"action": "renew"}
621-
query = self._urlJoin(os.path.join("delegations", delegationID))
622-
result = self._request("post", query, params=params)
639+
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}")
640+
response = result["Value"]
623641

624-
if not response.ok:
625-
self.log.debug(
626-
"Proxy not renewed, failed to get CSR",
627-
f"for job {arcJob} with delegation {delegationID}",
628-
)
629-
continue
630-
631-
# Then, sign and upload the certificate
632-
result = self.__uploadCertificate(delegationID, response.text)
633-
if not result["OK"]:
634-
self.log.debug(
635-
"Proxy not renewed, failed to send renewed proxy",
636-
f"for job {arcJob} with delegation {delegationID}: {result['Message']}",
637-
)
638-
continue
642+
# Then, sign and upload the certificate
643+
result = self.__uploadCertificate(self._delegationID, response.text)
644+
if not result["OK"]:
645+
self.log.error(
646+
"Proxy not renewed, failed to send renewed proxy",
647+
f"delegation {self._delegationID}: {result['Message']}",
648+
)
649+
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}")
639650

640-
self.log.debug("Proxy successfully renewed", f"for job {arcJob}")
651+
self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}")
641652

642653
return S_OK()
643654

@@ -673,7 +684,6 @@ def getJobStatus(self, jobIDList):
673684
response = result["Value"]
674685

675686
resultDict = {}
676-
jobsToRenew = []
677687
jobsToCancel = []
678688

679689
# A single job is returned in a dict, while multiple jobs are returned in a list
@@ -689,23 +699,19 @@ def getJobStatus(self, jobIDList):
689699
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
690700
resultDict[jobID] = self.mapStates[arcState]
691701

692-
# Renew proxy only of jobs which are running or queuing
693-
if arcState in ("Running", "Queuing"):
694-
jobsToRenew.append(arcJob["id"])
695702
# Cancel held jobs so they don't sit in the queue forever
696703
if arcState == "Hold":
697704
jobsToCancel.append(arcJob["id"])
698705
self.log.debug(f"Killing held job {jobID}")
699706

700-
# Renew jobs to be renewed
701-
# Does not work at present - wait for a new release of ARC CEs for this.
702-
if jobsToRenew:
703-
result = self._renewJobs(jobsToRenew)
707+
# Renew delegation to renew the proxies of the jobs
708+
if self._delegationID:
709+
result = self._renewDelegation()
704710
if not result["OK"]:
705711
# Only log here as we still want to return statuses
706-
self.log.warn("Failed to renew job proxies:", result["Message"])
712+
self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}")
707713

708-
# Kill jobs to be killed
714+
# Kill held jobs
709715
if jobsToCancel:
710716
result = self._killJob(jobsToCancel)
711717
if not result["OK"]:

0 commit comments

Comments
 (0)