Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _getProxyFromDelegationID(self, delegationID):

#############################################################################

def _writeXRSL(self, executableFile, inputs, outputs):
def _writeXRSL(self, executableFile, inputs, outputs, additionalEnv):
"""Create the JDL for submission

:param str executableFile: executable to wrap in a XRSL file
Expand Down Expand Up @@ -465,7 +465,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
(stdout="{diracStamp}.out")
(stderr="{diracStamp}.err")
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}"))
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}") {additionalEnv})
(outputFiles={xrslOutputFiles})
(queue={queue})
{xrslMPAdditions}
Expand All @@ -476,6 +476,7 @@ def _writeXRSL(self, executableFile, inputs, outputs):
executable=os.path.basename(executableFile),
xrslInputAdditions=xrslInputs,
diracStamp=diracStamp,
additionalEnv=additionalEnv,
queue=self.queue,
xrslOutputFiles=xrslOutputs,
xrslMPAdditions=xrslMPAdditions,
Expand All @@ -501,7 +502,7 @@ def _bundlePreamble(self, executableFile):
bundleFile.write(wrapperContent)
return bundleFile.name

def _getArcJobID(self, executableFile, inputs, outputs, delegation):
def _getArcJobID(self, executableFile, inputs, outputs, delegation, additionalEnv):
"""Get an ARC JobID endpoint to upload executables and inputs.

:param str executableFile: executable to submit
Expand All @@ -516,7 +517,7 @@ def _getArcJobID(self, executableFile, inputs, outputs, delegation):
query = self._urlJoin("jobs")

# Get the job into the ARC way
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs)
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, additionalEnv)
xrslString += delegation
self.log.debug("XRSL string submitted", f"is {xrslString}")
self.log.debug("DIRAC stamp for job", f"is {diracStamp}")
Expand Down Expand Up @@ -569,10 +570,12 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs):
self.log.verbose("Input correctly uploaded", fileToSubmit)
return S_OK()

def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None):
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None, additionalEnv=[]):
"""Method to submit job
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
And none of our supported batch systems have a "-" in their name

For V9+: Will give back also a {"stamp": "secret"} dictionnary.
"""
result = self._checkSession()
if not result["OK"]:
Expand Down Expand Up @@ -650,8 +653,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
# Also : https://bugzilla.nordugrid.org/show_bug.cgi?id=4069
batchIDList = []
stampDict = {}
for _ in range(numberOfJobs):
result = self._getArcJobID(executableFile, inputs, outputs, delegation)
additionalEnvMappingResponse = {}
for i in range(numberOfJobs):
if i > len(additionalEnv):
currentEnv = ""
else:
# AdditionalEnv[i] format:
# {"secret": "1_l0v3_1c3cr34m", ...}
# We merge them to have the right format: '("key" "value") (...)'
currentEnv = " ".join([f"({key} {value})" for key, value in additionalEnv[i]])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if we need also \"

result = self._getArcJobID(executableFile, inputs, outputs, delegation, currentEnv)
if not result["OK"]:
break
arcJobID, diracStamp = result["Value"]
Expand All @@ -665,6 +676,11 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
jobReference = self._arcIDToJobReference(arcJobID)
batchIDList.append(jobReference)
stampDict[jobReference] = diracStamp

# Add all env variables we added into additionalEnvMappingResponse so we have:
# { "Stamp1": { "SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} }
additionalEnvMappingResponse[diracStamp] = additionalEnv[i]

self.log.debug(
"Successfully submitted job",
f"{jobReference} to CE {self.ceName}",
Expand All @@ -677,6 +693,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
if batchIDList:
result = S_OK(batchIDList)
result["PilotStampDict"] = stampDict
result["EnvMapping"] = additionalEnvMappingResponse
else:
result = S_ERROR("No ID obtained from the ARC job submission")
return result
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def __init__(self, *args, **kwargs):
self.ceType = CE_NAME
self._cloudDriver = None

def submitJob(self, executableFile, proxy, numberOfJobs=1):
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
"""Creates VM instances

:param str executableFile: Path to pilot job wrapper file to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False):
return S_OK(stdout.strip())

#############################################################################
def submitJob(self, executableFile, proxy, numberOfJobs=1):
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
"""Method to submit job"""

self.log.verbose("Executable file path:", executableFile)
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/LocalComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _prepareHost(self):

return S_OK()

def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
def submitJob(self, executableFile, proxy=None, numberOfJobs=1, diracXSecrets=[]):
if not os.access(executableFile, 5):
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _reset(self):
return S_OK()

#############################################################################
def submitJob(self, executableFile, proxy, numberOfJobs=1):
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
"""Method to submit job"""

# Choose eligible hosts, rank them by the number of available slots
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def __executeHostCommand(self, command, options, ssh=None, host=None):
else:
return S_ERROR("\n".join([sshStdout, sshStderr]))

def submitJob(self, executableFile, proxy, numberOfJobs=1):
def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]):
# self.log.verbose( "Executable file path: %s" % executableFile )
if not os.access(executableFile, 5):
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
Expand Down
114 changes: 89 additions & 25 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import (
_writePilotWrapperFile,
getPilotFilesCompressedEncodedDict,
Expand Down Expand Up @@ -103,6 +104,7 @@ def initialize(self):
self.rssClient = ResourceStatus()
self.pilotAgentsDB = getPilotAgentsDB()
self.matcherClient = MatcherClient()
self.pilotManagementClient = PilotManagerClient()

return S_OK()

Expand Down Expand Up @@ -348,15 +350,15 @@ def _submitPilotsPerQueue(self, queueName: str):
if not result["OK"]:
self.log.info("Failed pilot submission", f"Queue: {queueName}")
return result
pilotList, stampDict = result["Value"]
stampDict, secretDict = result["Value"]

# updating the pilotAgentsDB... done by default but maybe not strictly necessary
result = self._addPilotReferences(queueName, pilotList, stampDict)
submittedPilots = len(stampDict)
self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}")

result = self._addPilotReferences(queueName, stampDict, secretDict)
if not result["OK"]:
return result

submittedPilots = len(pilotList)
self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}")
return S_OK(submittedPilots)

def _getQueueSlots(self, queue: str):
Expand Down Expand Up @@ -460,8 +462,32 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
jobProxy = result["Value"]
executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables)

# ----------- Generate additionnal env variables -----------

additionalEnv = []

# First, create secrets
resultSecrets = self.pilotManagementClient.createNSecrets(vo=self.vo, n=pilotsToSubmit)
if not resultSecrets["OK"]:
self.log.error("Couldn't fetch secrets.")
secrets = []
else:
secrets = [{"DIRACX_SECRET": secret["pilot_secret"]} for secret in resultSecrets["Value"]]

# ---
# More env?
# ---

# Could be secrets, but also other things, increase in the for loop...
for i, el in enumerate(secrets):
additionalEnv[i].update(el)

# ----------------------------------------------------------

# Submit the job
submitResult = ce.submitJob(executable, "", pilotsToSubmit)
# NOTE FOR DIRACX /!\ : We need in each CE to create a secret
submitResult = ce.submitJob(executable, "", pilotsToSubmit, additionalEnv=additionalEnv)

# In case the CE does not need the executable after the submission, we delete it
# Else, we keep it, the CE will delete it after the end of the pilot execution
if submitResult.get("ExecutableToKeep") != executable:
Expand Down Expand Up @@ -531,34 +557,66 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue:
if not result["OK"]:
self.log.error("Failure submitting Monitoring report", result["Message"])

return S_OK((pilotList, stampDict))
secrets_request_body = {}
if "EnvMapping" in submitResult:
# TODO: Update this comment as we add DiracX support
# V9+, only for:
# 1. Arex

def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[str, str]):
# EnvMapping body: { "Stamp1": { "DIRACX_SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} }
# Request body for secrets: {"I_luv_stawberries": ["Stamp1", "..."]}
envMapping = submitResult["SecretDict"]
secrets_request_body = {}

for stamp, envVariables in envMapping.items():
diracx_secret = envVariables.get("DIRACX_SECRET")
if diracx_secret:
if not diracx_secret in secrets_request_body:
secrets_request_body[diracx_secret] = []
secrets_request_body[diracx_secret].append(stamp)

# We reverse the ref-stamp dictionnary (DiracX behaviour)
references = stampDict.keys()
stamps = stampDict.values()
stampDict = dict(zip(stamps, references))

return S_OK((stampDict, secrets_request_body))

def _addPilotReferences(self, queue: str, stampDict: dict[str, str], secretDict: dict[str, str]):
"""Add pilotReference to pilotAgentsDB

:param queue: the queue name
:param pilotList: list of pilots
:param stampDict: dictionary of pilots timestamps
:param refDict: dictionary {"pilotstamp":"pilotref"}
:param secretDict: dictionary {"pilotstamp":"secret"}
"""
result = self.pilotAgentsDB.addPilotReferences(
pilotList,
self.vo,
self.queueDict[queue]["CEType"],
stampDict,
# FIXME: Change for a client or at least request to DiracX

# First, create pilots
stamps = stampDict.keys()
result = self.pilotManagementClient.addPilotReferences(
stamps, self.vo, self.queueDict[queue]["CEType"], stampDict
)
if not result["OK"]:
self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"])
return result

for pilot in pilotList:
result = self.pilotAgentsDB.setPilotStatus(
pilot,
PilotStatus.SUBMITTED,
self.queueDict[queue]["CEName"],
"Successfully submitted by the SiteDirector",
self.queueDict[queue]["Site"],
self.queueDict[queue]["QueueName"],
# We associate all of the pilots with their secrets
if secretDict:
result = self.pilotManagementClient.associatePilotWithSecret(secretDict)
if not result["OK"]:
return result

for stamp in stamps:
result = self.pilotManagementClient.set_pilot_field(
stamp,
{
"DestinationSite": self.queueDict[queue]["CEName"],
"StatusReason": "Successfully submitted by the SiteDirector",
"GridSite": self.queueDict[queue]["Site"],
"Queue": self.queueDict[queue]["QueueName"],
},
)

if not result["OK"]:
self.log.error("Failed to set pilot status", result["Message"])
return result
Expand Down Expand Up @@ -591,14 +649,13 @@ def _getExecutable(self, queue: str, proxy: X509Chain, jobExecDir: str = "", env
ce = self.queueCECache[queue]["CE"]
workingDirectory = getattr(ce, "workingDirectory", self.workingDirectory)

executable = self._writePilotScript(
return self._writePilotScript(
workingDirectory=workingDirectory,
pilotOptions=pilotOptions,
proxy=proxy,
pilotExecDir=jobExecDir,
envVariables=envVariables,
)
return executable

def _getPilotOptions(self, queue: str) -> list[str]:
"""Prepare pilot options
Expand Down Expand Up @@ -680,6 +737,13 @@ def _getPilotOptions(self, queue: str) -> list[str]:
if "PipInstallOptions" in queueDict:
pilotOptions.append(f"--pipInstallOptions={queueDict['PipInstallOptions']}")

# FIXME: Get secret
# if "secret" in queueDict:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed

# pilotOptions.append(f"--pilotSecret={queueDict['...']}")
# FIXME: Get clientID
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone has an idea.

# pilotOptions.append(f"--clientID={opsHelper.getValue('TO CHANGE')})
pilotOptions.append(f"--diracx_URL={DIRAC.gConfig.getValue('/DiracX/URL')}")

return pilotOptions

def _writePilotScript(
Expand Down
Loading
Loading