Skip to content

Commit b96dbdd

Browse files
author
Robin VAN DE MERGHEL
committed
feat: Add secrets in Arex CE
1 parent e413b2b commit b96dbdd

File tree

10 files changed

+110
-43
lines changed

10 files changed

+110
-43
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def _getProxyFromDelegationID(self, delegationID):
417417

418418
#############################################################################
419419

420-
def _writeXRSL(self, executableFile, inputs, outputs):
420+
def _writeXRSL(self, executableFile, inputs, outputs, diracXSecret):
421421
"""Create the JDL for submission
422422
423423
:param str executableFile: executable to wrap in a XRSL file
@@ -465,7 +465,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):
465465
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
466466
(stdout="{diracStamp}.out")
467467
(stderr="{diracStamp}.err")
468-
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}"))
468+
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}") ("DIRACX_SECRET" "{diracXSecret}"))
469469
(outputFiles={xrslOutputFiles})
470470
(queue={queue})
471471
{xrslMPAdditions}
@@ -476,6 +476,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):
476476
executable=os.path.basename(executableFile),
477477
xrslInputAdditions=xrslInputs,
478478
diracStamp=diracStamp,
479+
diracXSecret=diracXSecret,
479480
queue=self.queue,
480481
xrslOutputFiles=xrslOutputs,
481482
xrslMPAdditions=xrslMPAdditions,
@@ -501,7 +502,7 @@ def _bundlePreamble(self, executableFile):
501502
bundleFile.write(wrapperContent)
502503
return bundleFile.name
503504

504-
def _getArcJobID(self, executableFile, inputs, outputs, delegation):
505+
def _getArcJobID(self, executableFile, inputs, outputs, delegation, diracXSecret):
505506
"""Get an ARC JobID endpoint to upload executables and inputs.
506507
507508
:param str executableFile: executable to submit
@@ -516,7 +517,7 @@ def _getArcJobID(self, executableFile, inputs, outputs, delegation):
516517
query = self._urlJoin("jobs")
517518

518519
# Get the job into the ARC way
519-
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs)
520+
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, diracXSecret)
520521
xrslString += delegation
521522
self.log.debug("XRSL string submitted", f"is {xrslString}")
522523
self.log.debug("DIRAC stamp for job", f"is {diracStamp}")
@@ -569,10 +570,12 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs):
569570
self.log.verbose("Input correctly uploaded", fileToSubmit)
570571
return S_OK()
571572

572-
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None):
573+
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None, diracXSecrets=[]):
573574
"""Method to submit job
574575
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
575576
And none of our supported batch systems have a "-" in their name
577+
578+
For V9+: Will give back also a {"stamp": "secret"} dictionnary.
576579
"""
577580
result = self._checkSession()
578581
if not result["OK"]:
@@ -650,8 +653,13 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
650653
# Also : https://bugzilla.nordugrid.org/show_bug.cgi?id=4069
651654
batchIDList = []
652655
stampDict = {}
653-
for _ in range(numberOfJobs):
654-
result = self._getArcJobID(executableFile, inputs, outputs, delegation)
656+
secretDict = {}
657+
for i in range(numberOfJobs):
658+
if i > len(diracXSecrets):
659+
currentSecret = ""
660+
else:
661+
currentSecret = diracXSecrets[i]
662+
result = self._getArcJobID(executableFile, inputs, outputs, delegation, currentSecret)
655663
if not result["OK"]:
656664
break
657665
arcJobID, diracStamp = result["Value"]
@@ -665,6 +673,8 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
665673
jobReference = self._arcIDToJobReference(arcJobID)
666674
batchIDList.append(jobReference)
667675
stampDict[jobReference] = diracStamp
676+
secretDict[currentSecret] = {}
677+
secretDict[currentSecret]["PilotStamps"] = [diracStamp] # Used by DiracX to associate secrets and pilots
668678
self.log.debug(
669679
"Successfully submitted job",
670680
f"{jobReference} to CE {self.ceName}",
@@ -677,6 +687,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
677687
if batchIDList:
678688
result = S_OK(batchIDList)
679689
result["PilotStampDict"] = stampDict
690+
result["SecretDict"] = secretDict
680691
else:
681692
result = S_ERROR("No ID obtained from the ARC job submission")
682693
return result

src/DIRAC/Resources/Computing/CloudComputingElement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ def __init__(self, *args, **kwargs):
372372
self.ceType = CE_NAME
373373
self._cloudDriver = None
374374

375-
def submitJob(self, executableFile, proxy, numberOfJobs=1):
375+
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
376376
"""Creates VM instances
377377
378378
:param str executableFile: Path to pilot job wrapper file to use

src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False):
308308
return S_OK(stdout.strip())
309309

310310
#############################################################################
311-
def submitJob(self, executableFile, proxy, numberOfJobs=1):
311+
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
312312
"""Method to submit job"""
313313

314314
self.log.verbose("Executable file path:", executableFile)

src/DIRAC/Resources/Computing/LocalComputingElement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def _prepareHost(self):
148148

149149
return S_OK()
150150

151-
def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
151+
def submitJob(self, executableFile, proxy=None, numberOfJobs=1, diracXSecrets=[]):
152152
if not os.access(executableFile, 5):
153153
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
154154

src/DIRAC/Resources/Computing/SSHBatchComputingElement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def _reset(self):
5050
return S_OK()
5151

5252
#############################################################################
53-
def submitJob(self, executableFile, proxy, numberOfJobs=1):
53+
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
5454
"""Method to submit job"""
5555

5656
# Choose eligible hosts, rank them by the number of available slots

src/DIRAC/Resources/Computing/SSHComputingElement.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ def __executeHostCommand(self, command, options, ssh=None, host=None):
526526
else:
527527
return S_ERROR("\n".join([sshStdout, sshStderr]))
528528

529-
def submitJob(self, executableFile, proxy, numberOfJobs=1):
529+
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
530530
# self.log.verbose( "Executable file path: %s" % executableFile )
531531
if not os.access(executableFile, 5):
532532
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
3838
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
3939
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
40+
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
4041
from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import (
4142
_writePilotWrapperFile,
4243
getPilotFilesCompressedEncodedDict,
@@ -103,6 +104,7 @@ def initialize(self):
103104
self.rssClient = ResourceStatus()
104105
self.pilotAgentsDB = getPilotAgentsDB()
105106
self.matcherClient = MatcherClient()
107+
self.pilotManagementClient = PilotManagerClient()
106108

107109
return S_OK()
108110

@@ -348,15 +350,15 @@ def _submitPilotsPerQueue(self, queueName: str):
348350
if not result["OK"]:
349351
self.log.info("Failed pilot submission", f"Queue: {queueName}")
350352
return result
351-
pilotList, stampDict = result["Value"]
353+
stampDict, secretDict = result["Value"]
352354

353-
# updating the pilotAgentsDB... done by default but maybe not strictly necessary
354-
result = self._addPilotReferences(queueName, pilotList, stampDict)
355+
submittedPilots = len(stampDict)
356+
self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}")
357+
358+
result = self._addPilotReferences(queueName, stampDict, secretDict)
355359
if not result["OK"]:
356360
return result
357361

358-
submittedPilots = len(pilotList)
359-
self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}")
360362
return S_OK(submittedPilots)
361363

362364
def _getQueueSlots(self, queue: str):
@@ -460,8 +462,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
460462
jobProxy = result["Value"]
461463
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)
462464

465+
secrets = self.pilotManagementClient.createNSecrets(vo=self.vo, n=pilotsToSubmit)
466+
463467
# Submit the job
464-
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
468+
# NOTE FOR DIRACX /!\ : We need in each CE to create a secret
469+
submitResult = ce.submitJob(executable, "", pilotsToSubmit, diracXSecrets=secrets)
470+
465471
# In case the CE does not need the executable after the submission, we delete it
466472
# Else, we keep it, the CE will delete it after the end of the pilot execution
467473
if submitResult.get("ExecutableToKeep") != executable:
@@ -531,34 +537,56 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
531537
if not result["OK"]:
532538
self.log.error("Failure submitting Monitoring report", result["Message"])
533539

534-
return S_OK((pilotList, stampDict))
540+
secretDict = {}
541+
if "SecretDict" in submitResult:
542+
# TODO: Update this comment as we add DiracX support
543+
# V9+, only for:
544+
# 1. Arex
545+
546+
# Result body: {"secret": "PilotStamps": ["stamp"]}
547+
secretDict = submitResult["SecretDict"]
548+
549+
references = stampDict.keys()
550+
stamps = stampDict.values()
551+
stampDict = dict(zip(stamps, references))
552+
553+
return S_OK((stampDict, secretDict))
535554

536-
def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[str, str]):
555+
def _addPilotReferences(self, queue: str, stampDict: dict[str, str], secretDict: dict[str, str]):
537556
"""Add pilotReference to pilotAgentsDB
538557
539558
:param queue: the queue name
540559
:param pilotList: list of pilots
541-
:param stampDict: dictionary of pilots timestamps
560+
:param refDict: dictionary {"pilotstamp":"pilotref"}
561+
:param secretDict: dictionary {"pilotstamp":"secret"}
542562
"""
543-
result = self.pilotAgentsDB.addPilotReferences(
544-
pilotList,
545-
self.vo,
546-
self.queueDict[queue]["CEType"],
547-
stampDict,
563+
# FIXME: Change for a client or at least request to DiracX
564+
565+
# First, create pilots
566+
stamps = stampDict.keys()
567+
result = self.pilotManagementClient.addPilotReferences(
568+
stamps, self.vo, self.queueDict[queue]["CEType"], stampDict
548569
)
549570
if not result["OK"]:
550-
self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"])
551571
return result
552572

553-
for pilot in pilotList:
554-
result = self.pilotAgentsDB.setPilotStatus(
555-
pilot,
556-
PilotStatus.SUBMITTED,
557-
self.queueDict[queue]["CEName"],
558-
"Successfully submitted by the SiteDirector",
559-
self.queueDict[queue]["Site"],
560-
self.queueDict[queue]["QueueName"],
573+
# We associate all of the pilots with their secrets
574+
if secretDict:
575+
result = self.pilotManagementClient.associatePilotWithSecret(secretDict)
576+
if not result["OK"]:
577+
return result
578+
579+
for stamp in stamps:
580+
result = self.pilotManagementClient.set_pilot_field(
581+
stamp,
582+
{
583+
"DestinationSite": self.queueDict[queue]["CEName"],
584+
"StatusReason": "Successfully submitted by the SiteDirector",
585+
"GridSite": self.queueDict[queue]["Site"],
586+
"Queue": self.queueDict[queue]["QueueName"],
587+
},
561588
)
589+
562590
if not result["OK"]:
563591
self.log.error("Failed to set pilot status", result["Message"])
564592
return result
@@ -591,14 +619,13 @@ def _getExecutable(self, queue: str, proxy: X509Chain, jobExecDir: str = "", env
591619
ce = self.queueCECache[queue]["CE"]
592620
workingDirectory = getattr(ce, "workingDirectory", self.workingDirectory)
593621

594-
executable = self._writePilotScript(
622+
return self._writePilotScript(
595623
workingDirectory=workingDirectory,
596624
pilotOptions=pilotOptions,
597625
proxy=proxy,
598626
pilotExecDir=jobExecDir,
599627
envVariables=envVariables,
600628
)
601-
return executable
602629

603630
def _getPilotOptions(self, queue: str) -> list[str]:
604631
"""Prepare pilot options
@@ -680,6 +707,13 @@ def _getPilotOptions(self, queue: str) -> list[str]:
680707
if "PipInstallOptions" in queueDict:
681708
pilotOptions.append(f"--pipInstallOptions={queueDict['PipInstallOptions']}")
682709

710+
# FIXME: Get secret
711+
# if "secret" in queueDict:
712+
# pilotOptions.append(f"--pilotSecret={queueDict['...']}")
713+
# FIXME: Get clientID
714+
# pilotOptions.append(f"--clientID={opsHelper.getValue('TO CHANGE')})
715+
pilotOptions.append(f"--diracx_URL={DIRAC.gConfig.getValue('/DiracX/URL')}")
716+
683717
return pilotOptions
684718

685719
def _writePilotScript(

src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ def addPilotReferences(self, pilot_stamps, VO, gridType="DIRAC", pilot_reference
99
with DiracXClient() as api:
1010
# We will move toward a stamp as identifier for the pilot
1111
return api.pilots.add_pilot_stamps(
12-
{"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_references}
13-
)
12+
{"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_references, "generate_secrets": False} # type: ignore
13+
) # type: ignore
1414

1515
def set_pilot_field(self, pilot_stamp, values_dict):
1616
with DiracXClient() as api:
1717
values_dict["PilotStamp"] = pilot_stamp
18-
return api.pilots.update_pilot_fields(values_dict)
18+
return api.pilots.update_pilot_fields({"pilot_stamps_to_fields_mapping": [values_dict]}) # type: ignore
1919

2020
@convertToReturnValue
2121
def setPilotBenchmark(self, pilotStamp, mark):
@@ -48,7 +48,7 @@ def clearPilots(self, interval=30, aborted_interval=7):
4848
def deletePilots(self, pilot_stamps):
4949
with DiracXClient() as api:
5050
pilot_ids = None
51-
if isinstance(pilot_stamps, list[int]):
51+
if isinstance(pilot_stamps, list[int]): # type: ignore
5252
# Multiple elements (int)
5353
pilot_ids = pilot_stamps # Semantic
5454
elif isinstance(pilot_stamps, int):
@@ -66,12 +66,12 @@ def deletePilots(self, pilot_stamps):
6666
pilots = api.pilots.search(parameters=["PilotStamp"], search=query, sort=[])
6767
pilot_stamps = [pilot["PilotStamp"] for pilot in pilots]
6868

69-
api.pilots.delete_pilots(pilot_stamps=pilot_stamps)
69+
api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore
7070

7171
@convertToReturnValue
7272
def setJobForPilot(self, job_id, pilot_stamp, destination=None):
7373
with DiracXClient() as api:
74-
api.pilots.add_jobs_to_pilot({"pilot_stamp": pilot_stamp, "job_ids": [job_id]})
74+
api.pilots.add_jobs_to_pilot({"pilot_stamp": pilot_stamp, "job_ids": [job_id]}) # type: ignore
7575

7676
self.set_pilot_field(
7777
pilot_stamp,
@@ -95,3 +95,21 @@ def getPilotInfo(self, pilot_stamp):
9595
query = [{"parameter": "PilotStamp", "operator": "eq", "value": pilot_stamp}]
9696

9797
return api.pilots.search(parameters=[], search=query, sort=[])
98+
99+
@convertToReturnValue
100+
def associatePilotWithSecret(self, secretDict):
101+
# secretDict format: {"secret": ["stamp"]}
102+
with DiracXClient() as api:
103+
return api.pilots.update_secrets_constraints(secretDict) # type: ignore
104+
105+
@convertToReturnValue
106+
def createNSecrets(self, vo, n=100, expiration_minutes=120, pilot_secret_use_count_max=1):
107+
with DiracXClient() as api:
108+
return api.pilots.create_pilot_secrets(
109+
{
110+
"n": n,
111+
"expiration_minutes": expiration_minutes,
112+
"pilot_secret_use_count_max": pilot_secret_use_count_max,
113+
"vo": vo,
114+
}
115+
) # type: ignore

src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ def pilotWrapperScript(
182182

183183
if envVariables is None:
184184
envVariables = {}
185+
elif "DIRACX_SECRET" in envVariables:
186+
pilotOptions += f" --pilotSecret={envVariables['DIRACX_SECRET']}"
185187

186188
if not CVMFS_locations:
187189
# What is in this location is almost certainly incorrect, especially the pilot.json

src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ def main():
5151
"""
5252
params = Params()
5353

54+
# TODO: Add also site here.
55+
# Later deprecated in V9.
5456
Script.registerSwitches(params.switches)
5557
Script.registerArgument("pilotRef: pilot reference")
5658
Script.registerArgument("VO: VO, or pilot owner group")

0 commit comments

Comments
 (0)