Skip to content

Commit 7530e17

Browse files
authored
Merge pull request #7048 from aldbr/rel-v7r3_FIX_AREXRenewDelegation
[8.0] AREX fixes: proxy renewal logic + submission with tokens + correctly report aborted pilots
2 parents a51ff78 + bb5842d commit 7530e17

File tree

2 files changed

+124
-106
lines changed

2 files changed

+124
-106
lines changed

src/DIRAC/Resources/Computing/ARCComputingElement.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
MANDATORY_PARAMETERS = ["Queue"] # Mandatory for ARC CEs in GLUE2?
6464
# See https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#rest-interface-job-states
6565
# We let "Deleted, Hold, Undefined" for the moment as we are not sure whether they are still used
66+
# "None" is a special case: it is returned when the job ID is not found in the system
6667
STATES_MAP = {
6768
"Accepting": PilotStatus.WAITING,
6869
"Accepted": PilotStatus.WAITING,
@@ -83,6 +84,7 @@
8384
"Wiped": PilotStatus.ABORTED,
8485
"Deleted": PilotStatus.ABORTED,
8586
"Hold": PilotStatus.FAILED,
87+
"None": PilotStatus.ABORTED,
8688
"Undefined": PilotStatus.UNKNOWN,
8789
}
8890

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 122 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,16 @@ def __init__(self, ceUniqueID):
4646
self.restVersion = "1.0"
4747
# Time left before proxy renewal: 3 hours is a good default
4848
self.proxyTimeLeftBeforeRenewal = 10800
49+
# Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50+
self._delegationID = None
4951
# Timeout
5052
self.timeout = 5.0
5153
# Request session
5254
self.session = None
53-
self.headers = {}
55+
self.headers = {
56+
"Accept": "application/json",
57+
"Content-Type": "application/json",
58+
}
5459
# URL used to communicate with the REST interface
5560
self.base_url = ""
5661

@@ -88,13 +93,6 @@ def _reset(self):
8893
# Set up the request framework
8994
self.session = requests.Session()
9095
self.session.verify = Locations.getCAsLocation()
91-
self.headers = {
92-
"Accept": "application/json",
93-
"Content-Type": "application/json",
94-
}
95-
# Attach the token to the headers if present
96-
if os.environ.get("BEARER_TOKEN"):
97-
self.headers["Authorization"] = "Bearer " + os.environ["BEARER_TOKEN"]
9896

9997
return S_OK()
10098

@@ -184,11 +182,22 @@ def _checkSession(self):
184182
if not self.session:
185183
return S_ERROR("REST interface not initialised.")
186184

187-
# Get a proxy
185+
# Reinitialize the authentication parameters
186+
self.session.cert = None
187+
self.headers.pop("Authorization", None)
188+
189+
# Get a proxy: still mandatory, even if tokens are used to authenticate
188190
result = self._prepareProxy()
189191
if not result["OK"]:
190192
self.log.error("Failed to set up proxy", result["Message"])
191193
return result
194+
195+
if self.token:
196+
# Attach the token to the headers if present
197+
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
198+
return S_OK()
199+
200+
# Attach the proxy to the session, only if the token is unavailable
192201
self.session.cert = Locations.getProxyLocation()
193202
return S_OK()
194203

@@ -234,11 +243,15 @@ def __uploadCertificate(self, delegationID, csrContent):
234243

235244
# Get a proxy and sign the CSR
236245
proxy = X509Chain()
237-
result = proxy.loadProxyFromFile(self.session.cert)
246+
proxyFile = Locations.getProxyLocation()
247+
if not proxyFile:
248+
return S_ERROR(f"No proxy available")
249+
result = proxy.loadProxyFromFile(proxyFile)
238250
if not result["OK"]:
239-
return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}")
251+
return S_ERROR(f"Can't load {proxyFile}: {result['Message']}")
240252
result = proxy.generateChainFromRequestString(csrContent)
241253
if not result["OK"]:
254+
self.log.error("Problem with the Certificate Signing Request:", result["Message"])
242255
return S_ERROR("Problem with the Certificate Signing Request")
243256

244257
# Submit the certificate
@@ -270,38 +283,44 @@ def _prepareDelegation(self):
270283
return result
271284
return S_OK(delegationID)
272285

273-
def _getDelegationID(self, arcJobID):
274-
"""Query and return the delegation ID of the given job.
286+
def _getDelegationIDs(self):
287+
"""Query and return the delegation IDs.
275288
276-
This happens when the call is from self.renewJobs. This function needs to know the
277-
delegation associated to the job
289+
This happens when the call is from self.renewDelegations.
278290
More info at
279291
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management
280292
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management
281293
282-
:param str jobID: ARC job ID
283-
:return: delegation ID
294+
:return: list of delegation IDs
284295
"""
285-
params = {"action": "delegations"}
286-
query = self._urlJoin("jobs")
296+
query = self._urlJoin("delegations")
287297

288298
# Submit the POST request to get the delegation
289-
jobsJson = {"job": [{"id": arcJobID}]}
290-
result = self._request("post", query, params=params, data=json.dumps(jobsJson))
299+
result = self._request("get", query)
291300
if not result["OK"]:
292-
self.log.error("Issue while interacting with the delegation.", result["Message"])
293-
return S_ERROR("Issue while interacting with the delegation")
301+
self.log.error("Issue while interacting with the delegations.", result["Message"])
302+
return S_ERROR("Issue while interacting with the delegations")
294303
response = result["Value"]
295304

296-
responseDelegation = response.json()
297-
if "delegation_id" not in responseDelegation["job"]:
298-
return S_ERROR(f"Cannot find the Delegation ID for Job {arcJobID}")
305+
# If there is no delegation, response.json is expected to return an exception
306+
try:
307+
responseDelegation = response.json()
308+
except requests.JSONDecodeError:
309+
return S_OK([])
310+
311+
# This is not expected
312+
if "delegation" not in responseDelegation:
313+
return S_OK([])
314+
315+
# If there is a single delegationID, then we get an str instead of a list
316+
# Not specified in the documentation
317+
delegations = responseDelegation["delegation"]
318+
if isinstance(delegations, dict):
319+
delegations = [delegations]
299320

300-
delegationIDs = responseDelegation["job"]["delegation_id"]
301-
# Documentation says "Array", but a single string is returned if there is only one
302-
if not isinstance(delegationIDs, list):
303-
delegationIDs = [delegationIDs]
304-
return S_OK(delegationIDs[0])
321+
# responseDelegation should be {'delegation': [{'id': <delegationID>}, ...]}
322+
delegationIDs = [delegationContent["id"] for delegationContent in delegations]
323+
return S_OK(delegationIDs)
305324

306325
#############################################################################
307326

@@ -382,14 +401,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
382401

383402
self.log.verbose(f"Executable file path: {executableFile}")
384403

385-
# Get a "delegation" and use the same delegation for all the jobs
386-
delegation = ""
387-
result = self._prepareDelegation()
404+
# Get a delegation and use the same delegation for all the jobs
405+
result = self._getDelegationIDs()
388406
if not result["OK"]:
389-
self.log.warn("Could not get a delegation", f"For CE {self.ceHost}")
390-
self.log.warn("Continue without a delegation")
407+
self.log.error("Could not get delegation IDs.", result["Message"])
408+
return S_ERROR("Could not get delegation IDs")
409+
410+
delegationIDs = result["Value"]
411+
if not delegationIDs:
412+
# No existing delegation, we need to prepare one
413+
result = self._prepareDelegation()
414+
if not result["OK"]:
415+
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
416+
return S_ERROR("Could not get a new delegation")
417+
self._delegationID = result["Value"]
391418
else:
392-
delegation = f"\n(delegationid={result['Value']})"
419+
self._delegationID = delegationIDs[0]
420+
delegation = f"\n(delegationid={self._delegationID})"
393421

394422
if not inputs:
395423
inputs = []
@@ -571,73 +599,66 @@ def getCEStatus(self):
571599

572600
#############################################################################
573601

574-
def _renewJobs(self, arcJobList):
575-
"""Written for the REST interface - jobList is already in the ARC format
576-
577-
:param list arcJobList: list of ARC Job ID
578-
"""
579-
# Renew the jobs
580-
for arcJob in arcJobList:
581-
# First get the delegation (proxy)
582-
result = self._getDelegationID(arcJob)
583-
if not result["OK"]:
584-
self.log.warn("Could not get a delegation from", f"Job {arcJob}")
585-
continue
586-
delegationID = result["Value"]
587-
588-
# Prepare the command
589-
params = {"action": "get"}
590-
query = self._urlJoin(os.path.join("delegations", delegationID))
602+
def _renewDelegation(self):
603+
"""Renew the delegations"""
604+
# Prepare the command
605+
params = {"action": "get"}
606+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
591607

592-
# Submit the POST request to get the proxy
593-
result = self._request("post", query, params=params)
594-
if not result["OK"]:
595-
self.log.debug("Could not get a proxy for", f"job {arcJob}: {result['Message']}")
596-
continue
597-
response = result["Value"]
608+
# Submit the POST request to get the proxy
609+
result = self._request("post", query, params=params)
610+
if not result["OK"]:
611+
self.log.error("Could not get a proxy for", f"delegation {self._delegationID}: {result['Message']}")
612+
return S_ERROR(f"Could not get a proxy for delegation {self._delegationID}")
613+
response = result["Value"]
598614

599-
proxy = X509Chain()
600-
result = proxy.loadChainFromString(response.text)
601-
if not result["OK"]:
602-
continue
615+
proxy = X509Chain()
616+
result = proxy.loadChainFromString(response.text)
617+
if not result["OK"]:
618+
self.log.error("Could not load proxy for", f"delegation {self._delegationID}: {result['Message']}")
619+
return S_ERROR(f"Could not load proxy for delegation {self._delegationID}")
603620

604-
# Now test and renew the proxy
605-
result = proxy.getRemainingSecs()
606-
if not result["OK"]:
607-
continue
608-
timeLeft = result["Value"]
621+
# Now test and renew the proxy
622+
result = proxy.getRemainingSecs()
623+
if not result["OK"]:
624+
self.log.error(
625+
"Could not get remaining time from the proxy for",
626+
f"delegation {self._delegationID}: {result['Message']}",
627+
)
628+
return S_ERROR(f"Could not get remaining time from the proxy for delegation {self._delegationID}")
629+
timeLeft = result["Value"]
609630

610-
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
611-
# No need to renew. Proxy is long enough
612-
continue
631+
if timeLeft >= self.proxyTimeLeftBeforeRenewal:
632+
# No need to renew. Proxy is long enough
633+
return S_OK()
613634

614-
self.log.debug(
615-
"Renewing proxy for job",
616-
f"{arcJob} whose proxy expires at {timeLeft}",
635+
self.log.verbose(
636+
"Renewing delegation",
637+
f"{self._delegationID} whose proxy expires at {timeLeft}",
638+
)
639+
# Proxy needs to be renewed - try to renew it
640+
# First, get a new CSR from the delegation
641+
params = {"action": "renew"}
642+
query = self._urlJoin(os.path.join("delegations", self._delegationID))
643+
result = self._request("post", query, params=params)
644+
if not result["OK"]:
645+
self.log.error(
646+
"Proxy not renewed, failed to get CSR",
647+
f"for delegation {self._delegationID}",
617648
)
618-
# Proxy needs to be renewed - try to renew it
619-
# First, get a new CSR from the delegation
620-
params = {"action": "renew"}
621-
query = self._urlJoin(os.path.join("delegations", delegationID))
622-
result = self._request("post", query, params=params)
649+
return S_ERROR(f"Proxy not renewed, failed to get CSR for delegation {self._delegationID}")
650+
response = result["Value"]
623651

624-
if not response.ok:
625-
self.log.debug(
626-
"Proxy not renewed, failed to get CSR",
627-
f"for job {arcJob} with delegation {delegationID}",
628-
)
629-
continue
630-
631-
# Then, sign and upload the certificate
632-
result = self.__uploadCertificate(delegationID, response.text)
633-
if not result["OK"]:
634-
self.log.debug(
635-
"Proxy not renewed, failed to send renewed proxy",
636-
f"for job {arcJob} with delegation {delegationID}: {result['Message']}",
637-
)
638-
continue
652+
# Then, sign and upload the certificate
653+
result = self.__uploadCertificate(self._delegationID, response.text)
654+
if not result["OK"]:
655+
self.log.error(
656+
"Proxy not renewed, failed to send renewed proxy",
657+
f"delegation {self._delegationID}: {result['Message']}",
658+
)
659+
return S_ERROR(f"Proxy not renewed, failed to send renewed proxy for delegation {self._delegationID}")
639660

640-
self.log.debug("Proxy successfully renewed", f"for job {arcJob}")
661+
self.log.verbose("Proxy successfully renewed", f"for delegation {self._delegationID}")
641662

642663
return S_OK()
643664

@@ -673,7 +694,6 @@ def getJobStatus(self, jobIDList):
673694
response = result["Value"]
674695

675696
resultDict = {}
676-
jobsToRenew = []
677697
jobsToCancel = []
678698

679699
# A single job is returned in a dict, while multiple jobs are returned in a list
@@ -689,23 +709,19 @@ def getJobStatus(self, jobIDList):
689709
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
690710
resultDict[jobID] = self.mapStates[arcState]
691711

692-
# Renew proxy only of jobs which are running or queuing
693-
if arcState in ("Running", "Queuing"):
694-
jobsToRenew.append(arcJob["id"])
695712
# Cancel held jobs so they don't sit in the queue forever
696713
if arcState == "Hold":
697714
jobsToCancel.append(arcJob["id"])
698715
self.log.debug(f"Killing held job {jobID}")
699716

700-
# Renew jobs to be renewed
701-
# Does not work at present - wait for a new release of ARC CEs for this.
702-
if jobsToRenew:
703-
result = self._renewJobs(jobsToRenew)
717+
# Renew delegation to renew the proxies of the jobs
718+
if self._delegationID:
719+
result = self._renewDelegation()
704720
if not result["OK"]:
705721
# Only log here as we still want to return statuses
706-
self.log.warn("Failed to renew job proxies:", result["Message"])
722+
self.log.warn("Failed to renew delegation", f"{self._delegationID}: {result['Message']}")
707723

708-
# Kill jobs to be killed
724+
# Kill held jobs
709725
if jobsToCancel:
710726
result = self._killJob(jobsToCancel)
711727
if not result["OK"]:

0 commit comments

Comments
 (0)