Skip to content

Commit 1aae0b2

Browse files
authored
Merge pull request #6890 from atsareg/dev-ce-token
[8.0] Perform pilot operations with tokens
2 parents 7f8b09c + 475ad73 commit 1aae0b2

File tree

4 files changed

+90
-50
lines changed

4 files changed

+90
-50
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,12 @@ def _killJob(self, arcJobList):
356356
return S_ERROR("REST interface not initialised. Cannot kill jobs.")
357357

358358
# Get a proxy
359-
result = self._prepareProxy()
360-
if not result["OK"]:
361-
self.log.error("Failed to set up proxy", result["Message"])
362-
return result
363-
self.session.cert = Locations.getProxyLocation()
359+
if self.proxy:
360+
result = self._prepareProxy()
361+
if not result["OK"]:
362+
self.log.error("Failed to set up proxy", result["Message"])
363+
return result
364+
self.session.cert = Locations.getProxyLocation()
364365

365366
# List of jobs in json format for the REST query
366367
jobsJson = {"job": [{"id": job} for job in arcJobList]}
@@ -393,11 +394,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
393394
self.log.verbose("Executable file path:", executableFile)
394395

395396
# Get a proxy
396-
result = self._prepareProxy()
397-
if not result["OK"]:
398-
self.log.error("Failed to set up proxy", result["Message"])
399-
return result
400-
self.session.cert = Locations.getProxyLocation()
397+
if self.proxy:
398+
result = self._prepareProxy()
399+
if not result["OK"]:
400+
self.log.error("Failed to set up proxy", result["Message"])
401+
return result
402+
self.session.cert = Locations.getProxyLocation()
401403

402404
# Get a "delegation" and use the same delegation for all the jobs
403405
delegation = ""
@@ -476,11 +478,12 @@ def getCEStatus(self):
476478
return S_ERROR("REST interface not initialised. Cannot get CE status.")
477479

478480
# Get a proxy
479-
result = self._prepareProxy()
480-
if not result["OK"]:
481-
self.log.error("Failed to set up proxy", result["Message"])
482-
return result
483-
self.session.cert = Locations.getProxyLocation()
481+
if self.proxy:
482+
result = self._prepareProxy()
483+
if not result["OK"]:
484+
self.log.error("Failed to set up proxy", result["Message"])
485+
return result
486+
self.session.cert = Locations.getProxyLocation()
484487

485488
# Try to find out which VO we are running for.
486489
# Essential now for REST interface.
@@ -597,11 +600,12 @@ def getJobStatus(self, jobIDList):
597600
return S_ERROR("REST interface not initialised. Cannot get job status.")
598601

599602
# Get a proxy
600-
result = self._prepareProxy()
601-
if not result["OK"]:
602-
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
603-
return result
604-
self.session.cert = Locations.getProxyLocation()
603+
if self.proxy:
604+
result = self._prepareProxy()
605+
if not result["OK"]:
606+
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
607+
return result
608+
self.session.cert = Locations.getProxyLocation()
605609

606610
if not isinstance(jobIDList, list):
607611
jobIDList = [jobIDList]
@@ -688,11 +692,12 @@ def getJobLog(self, jobID):
688692
return S_ERROR("REST interface not initialised. Cannot get job output.")
689693

690694
# Get a proxy
691-
result = self._prepareProxy()
692-
if not result["OK"]:
693-
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
694-
return result
695-
self.session.cert = Locations.getProxyLocation()
695+
if self.proxy:
696+
result = self._prepareProxy()
697+
if not result["OK"]:
698+
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
699+
return result
700+
self.session.cert = Locations.getProxyLocation()
696701

697702
# Extract stamp from the Job ID
698703
if ":::" in jobID:
@@ -749,11 +754,12 @@ def getJobOutput(self, jobID, workingDirectory=None):
749754
return S_ERROR("REST interface not initialised. Cannot get job output.")
750755

751756
# Get a proxy
752-
result = self._prepareProxy()
753-
if not result["OK"]:
754-
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
755-
return result
756-
self.session.cert = Locations.getProxyLocation()
757+
if self.proxy:
758+
result = self._prepareProxy()
759+
if not result["OK"]:
760+
self.log.error("AREXComputingElement: failed to set up proxy", result["Message"])
761+
return result
762+
self.session.cert = Locations.getProxyLocation()
757763

758764
# Extract stamp from the Job ID
759765
if ":::" in jobID:

src/DIRAC/Resources/Computing/ComputingElement.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,13 @@ def setToken(self, token, valid=0):
9191

9292
def _prepareProxy(self):
9393
"""Set the environment variable X509_USER_PROXY"""
94-
if not self.proxy:
95-
result = getProxyInfo()
96-
if not result["OK"]:
97-
return S_ERROR("No proxy available")
98-
if "path" in result["Value"]:
99-
os.environ["X509_USER_PROXY"] = result["Value"]["path"]
100-
return S_OK()
101-
else:
94+
if self.proxy:
10295
result = gProxyManager.dumpProxyToFile(self.proxy, requiredTimeLeft=self.minProxyTime)
10396
if not result["OK"]:
10497
return result
10598
os.environ["X509_USER_PROXY"] = result["Value"]
10699

107-
self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}")
100+
self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}")
108101
return S_OK()
109102

110103
def isProxyValid(self, valid=1000):

src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
1515
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
1616
getPilotCE,
17-
getPilotProxy,
17+
setPilotCredentials,
1818
getPilotRef,
1919
killPilotsInQueues,
2020
)
@@ -135,13 +135,10 @@ def export_getPilotOutput(self, pilotReference):
135135
if not hasattr(ce, "getJobOutput"):
136136
return S_ERROR(f"Pilot output not available for {pilotDict['GridType']} CEs")
137137

138-
result = getPilotProxy(pilotDict)
138+
result = setPilotCredentials(ce, pilotDict)
139139
if not result["OK"]:
140140
return result
141141

142-
proxy = result["Value"]
143-
ce.setProxy(proxy)
144-
145142
result = getPilotRef(pilotReference, pilotDict)
146143
if not result["OK"]:
147144
return result
@@ -213,13 +210,11 @@ def export_getPilotLoggingInfo(self, pilotReference):
213210
self.log.info("Pilot logging not available for", f"{pilotDict['GridType']} CEs")
214211
return S_ERROR(f"Pilot logging not available for {pilotDict['GridType']} CEs")
215212

216-
result = getPilotProxy(pilotDict)
213+
# Set proxy or token for the CE
214+
result = setPilotCredentials(ce, pilotDict)
217215
if not result["OK"]:
218216
return result
219217

220-
proxy = result["Value"]
221-
ce.setProxy(proxy)
222-
223218
result = getPilotRef(pilotReference, pilotDict)
224219
if not result["OK"]:
225220
return result

src/DIRAC/WorkloadManagementSystem/Service/WMSUtilities.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
88
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueue
9-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getGroupOption
9+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getGroupOption, getUsernameForDN
1010
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
11+
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
1112
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
1213

1314

@@ -49,7 +50,11 @@ def getPilotCE(pilotDict):
4950

5051

5152
def getPilotProxy(pilotDict):
52-
"""Get a proxy bound to a pilot"""
53+
"""Get a proxy bound to a pilot
54+
55+
:param dict pilotDict: pilot parameters
56+
:return: S_OK/S_ERROR with proxy as Value
57+
"""
5358
owner = pilotDict["OwnerDN"]
5459
group = pilotDict["OwnerGroup"]
5560

@@ -62,6 +67,47 @@ def getPilotProxy(pilotDict):
6267
return S_OK(proxy)
6368

6469

70+
def getPilotToken(pilotDict):
71+
"""Get a token corresponding to the pilot
72+
73+
:param dict pilotDict: pilot parameters
74+
:return: S_OK/S_ERROR with token as Value
75+
"""
76+
ownerDN = pilotDict["OwnerDN"]
77+
group = pilotDict["OwnerGroup"]
78+
79+
result = getUsernameForDN(ownerDN)
80+
if not result["OK"]:
81+
return result
82+
username = result["Value"]
83+
result = gTokenManager.getToken(
84+
username=username,
85+
userGroup=group,
86+
requiredTimeLeft=3600,
87+
)
88+
return result
89+
90+
91+
def setPilotCredentials(ce, pilotDict):
92+
"""Instrument the given CE with proxy or token
93+
94+
:param obj ce: CE object
95+
:param pilotDict: pilot parameter dictionary
96+
:return: S_OK/S_ERROR
97+
"""
98+
if "Token" in ce.ceParameters.get("Tag", []):
99+
result = getPilotToken(pilotDict)
100+
if not result["OK"]:
101+
return result
102+
ce.setToken(result["Value"], 3500)
103+
else:
104+
result = getPilotProxy(pilotDict)
105+
if not result["OK"]:
106+
return result
107+
ce.setProxy(result["Value"])
108+
return S_OK()
109+
110+
65111
def getPilotRef(pilotReference, pilotDict):
66112
"""Add the pilotStamp to the pilotReference, if the pilotStamp is in the dictionary,
67113
otherwise return unchanged pilotReference.

0 commit comments

Comments
 (0)