Skip to content

Commit 3cd62e8

Browse files
author
Robin VAN DE MERGHEL
committed
feat: Allow to add envMapping into a wrapper
1 parent b96dbdd commit 3cd62e8

File tree

2 files changed

+57
-21
lines changed

2 files changed

+57
-21
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def _getProxyFromDelegationID(self, delegationID):
417417

418418
#############################################################################
419419

420-
def _writeXRSL(self, executableFile, inputs, outputs, diracXSecret):
420+
def _writeXRSL(self, executableFile, inputs, outputs, additionalEnv):
421421
"""Create the JDL for submission
422422
423423
:param str executableFile: executable to wrap in a XRSL file
@@ -465,7 +465,7 @@ def _writeXRSL(self, executableFile, inputs, outputs, diracXSecret):
465465
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
466466
(stdout="{diracStamp}.out")
467467
(stderr="{diracStamp}.err")
468-
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}") ("DIRACX_SECRET" "{diracXSecret}"))
468+
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}") {additionalEnv})
469469
(outputFiles={xrslOutputFiles})
470470
(queue={queue})
471471
{xrslMPAdditions}
@@ -476,7 +476,7 @@ def _writeXRSL(self, executableFile, inputs, outputs, diracXSecret):
476476
executable=os.path.basename(executableFile),
477477
xrslInputAdditions=xrslInputs,
478478
diracStamp=diracStamp,
479-
diracXSecret=diracXSecret,
479+
additionalEnv=additionalEnv,
480480
queue=self.queue,
481481
xrslOutputFiles=xrslOutputs,
482482
xrslMPAdditions=xrslMPAdditions,
@@ -502,7 +502,7 @@ def _bundlePreamble(self, executableFile):
502502
bundleFile.write(wrapperContent)
503503
return bundleFile.name
504504

505-
def _getArcJobID(self, executableFile, inputs, outputs, delegation, diracXSecret):
505+
def _getArcJobID(self, executableFile, inputs, outputs, delegation, additionalEnv):
506506
"""Get an ARC JobID endpoint to upload executables and inputs.
507507
508508
:param str executableFile: executable to submit
@@ -517,7 +517,7 @@ def _getArcJobID(self, executableFile, inputs, outputs, delegation, diracXSecret
517517
query = self._urlJoin("jobs")
518518

519519
# Get the job into the ARC way
520-
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, diracXSecret)
520+
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, additionalEnv)
521521
xrslString += delegation
522522
self.log.debug("XRSL string submitted", f"is {xrslString}")
523523
self.log.debug("DIRAC stamp for job", f"is {diracStamp}")
@@ -570,7 +570,7 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs):
570570
self.log.verbose("Input correctly uploaded", fileToSubmit)
571571
return S_OK()
572572

573-
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None, diracXSecrets=[]):
573+
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None, additionalEnv=[]):
574574
"""Method to submit job
575575
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
576576
And none of our supported batch systems have a "-" in their name
@@ -653,13 +653,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
653653
# Also : https://bugzilla.nordugrid.org/show_bug.cgi?id=4069
654654
batchIDList = []
655655
stampDict = {}
656-
secretDict = {}
656+
additionalEnvMappingResponse = {}
657657
for i in range(numberOfJobs):
658-
if i > len(diracXSecrets):
659-
currentSecret = ""
658+
if i > len(additionalEnv):
659+
currentEnv = ""
660660
else:
661-
currentSecret = diracXSecrets[i]
662-
result = self._getArcJobID(executableFile, inputs, outputs, delegation, currentSecret)
661+
# AdditionalEnv[i] format:
662+
# {"secret": "1_l0v3_1c3cr34m", ...}
663+
# We merge them to have the right format: '("key" "value") (...)'
664+
currentEnv = " ".join([f"({key} {value})" for key, value in additionalEnv[i]])
665+
result = self._getArcJobID(executableFile, inputs, outputs, delegation, currentEnv)
663666
if not result["OK"]:
664667
break
665668
arcJobID, diracStamp = result["Value"]
@@ -673,8 +676,11 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
673676
jobReference = self._arcIDToJobReference(arcJobID)
674677
batchIDList.append(jobReference)
675678
stampDict[jobReference] = diracStamp
676-
secretDict[currentSecret] = {}
677-
secretDict[currentSecret]["PilotStamps"] = [diracStamp] # Used by DiracX to associate secrets and pilots
679+
680+
# Add all env variables we added into additionalEnvMappingResponse so we have:
681+
# { "Stamp1": { "SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} }
682+
additionalEnvMappingResponse[diracStamp] = additionalEnv[i]
683+
678684
self.log.debug(
679685
"Successfully submitted job",
680686
f"{jobReference} to CE {self.ceName}",
@@ -687,7 +693,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
687693
if batchIDList:
688694
result = S_OK(batchIDList)
689695
result["PilotStampDict"] = stampDict
690-
result["SecretDict"] = secretDict
696+
result["EnvMapping"] = additionalEnvMappingResponse
691697
else:
692698
result = S_ERROR("No ID obtained from the ARC job submission")
693699
return result

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -462,11 +462,31 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
462462
jobProxy = result["Value"]
463463
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)
464464

465-
secrets = self.pilotManagementClient.createNSecrets(vo=self.vo, n=pilotsToSubmit)
465+
# ----------- Generate additionnal env variables -----------
466+
467+
additionalEnv = []
468+
469+
# First, create secrets
470+
resultSecrets = self.pilotManagementClient.createNSecrets(vo=self.vo, n=pilotsToSubmit)
471+
if not resultSecrets["OK"]:
472+
self.log.error("Couldn't fetch secrets.")
473+
secrets = []
474+
else:
475+
secrets = [{"DIRACX_SECRET": secret["pilot_secret"]} for secret in resultSecrets["Value"]]
476+
477+
# ---
478+
# More env?
479+
# ---
480+
481+
# Could be secrets, but also other things, increase in the for loop...
482+
for i, el in enumerate(secrets):
483+
additionalEnv[i].update(el)
484+
485+
# ----------------------------------------------------------
466486

467487
# Submit the job
468488
# NOTE FOR DIRACX /!\ : We need in each CE to create a secret
469-
submitResult = ce.submitJob(executable, "", pilotsToSubmit, diracXSecrets=secrets)
489+
submitResult = ce.submitJob(executable, "", pilotsToSubmit, additionalEnv=additionalEnv)
470490

471491
# In case the CE does not need the executable after the submission, we delete it
472492
# Else, we keep it, the CE will delete it after the end of the pilot execution
@@ -537,20 +557,30 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
537557
if not result["OK"]:
538558
self.log.error("Failure submitting Monitoring report", result["Message"])
539559

540-
secretDict = {}
541-
if "SecretDict" in submitResult:
560+
secrets_request_body = {}
561+
if "EnvMapping" in submitResult:
542562
# TODO: Update this comment as we add DiracX support
543563
# V9+, only for:
544564
# 1. Arex
545565

546-
# Result body: {"secret": "PilotStamps": ["stamp"]}
547-
secretDict = submitResult["SecretDict"]
566+
# EnvMapping body: { "Stamp1": { "DIRACX_SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} }
567+
# Request body for secrets: {"I_luv_stawberries": ["Stamp1", "..."]}
568+
envMapping = submitResult["SecretDict"]
569+
secrets_request_body = {}
570+
571+
for stamp, envVariables in envMapping.items():
572+
diracx_secret = envVariables.get("DIRACX_SECRET")
573+
if diracx_secret:
574+
if not diracx_secret in secrets_request_body:
575+
secrets_request_body[diracx_secret] = []
576+
secrets_request_body[diracx_secret].append(stamp)
548577

578+
# We reverse the ref-stamp dictionnary (DiracX behaviour)
549579
references = stampDict.keys()
550580
stamps = stampDict.values()
551581
stampDict = dict(zip(stamps, references))
552582

553-
return S_OK((stampDict, secretDict))
583+
return S_OK((stampDict, secrets_request_body))
554584

555585
def _addPilotReferences(self, queue: str, stampDict: dict[str, str], secretDict: dict[str, str]):
556586
"""Add pilotReference to pilotAgentsDB

0 commit comments

Comments
 (0)