diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index a80a5a62f16..772585dad14 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -417,7 +417,7 @@ def _getProxyFromDelegationID(self, delegationID): ############################################################################# - def _writeXRSL(self, executableFile, inputs, outputs): + def _writeXRSL(self, executableFile, inputs, outputs, additionalEnv): """Create the JDL for submission :param str executableFile: executable to wrap in a XRSL file @@ -465,7 +465,7 @@ def _writeXRSL(self, executableFile, inputs, outputs): (inputFiles=({executable} "{executableFile}") {xrslInputAdditions}) (stdout="{diracStamp}.out") (stderr="{diracStamp}.err") -(environment=("DIRAC_PILOT_STAMP" "{diracStamp}")) +(environment=("DIRAC_PILOT_STAMP" "{diracStamp}") {additionalEnv}) (outputFiles={xrslOutputFiles}) (queue={queue}) {xrslMPAdditions} @@ -476,6 +476,7 @@ def _writeXRSL(self, executableFile, inputs, outputs): executable=os.path.basename(executableFile), xrslInputAdditions=xrslInputs, diracStamp=diracStamp, + additionalEnv=additionalEnv, queue=self.queue, xrslOutputFiles=xrslOutputs, xrslMPAdditions=xrslMPAdditions, @@ -501,7 +502,7 @@ def _bundlePreamble(self, executableFile): bundleFile.write(wrapperContent) return bundleFile.name - def _getArcJobID(self, executableFile, inputs, outputs, delegation): + def _getArcJobID(self, executableFile, inputs, outputs, delegation, additionalEnv): """Get an ARC JobID endpoint to upload executables and inputs. :param str executableFile: executable to submit @@ -516,7 +517,7 @@ def _getArcJobID(self, executableFile, inputs, outputs, delegation): query = self._urlJoin("jobs") # Get the job into the ARC way - xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs) + xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, additionalEnv) xrslString += delegation self.log.debug("XRSL string submitted", f"is {xrslString}") self.log.debug("DIRAC stamp for job", f"is {diracStamp}") @@ -569,10 +570,12 @@ def _uploadJobDependencies(self, arcJobID, executableFile, inputs): self.log.verbose("Input correctly uploaded", fileToSubmit) return S_OK() - def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None): + def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None, additionalEnv=[]): """Method to submit job Assume that the ARC queues are always of the format nordugrid-- And none of our supported batch systems have a "-" in their name + + For V9+: Will give back also a {"stamp": "secret"} dictionnary. """ result = self._checkSession() if not result["OK"]: @@ -650,8 +653,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= # Also : https://bugzilla.nordugrid.org/show_bug.cgi?id=4069 batchIDList = [] stampDict = {} - for _ in range(numberOfJobs): - result = self._getArcJobID(executableFile, inputs, outputs, delegation) + additionalEnvMappingResponse = {} + for i in range(numberOfJobs): + if i > len(additionalEnv): + currentEnv = "" + else: + # AdditionalEnv[i] format: + # {"secret": "1_l0v3_1c3cr34m", ...} + # We merge them to have the right format: '("key" "value") (...)' + currentEnv = " ".join([f"({key} {value})" for key, value in additionalEnv[i]]) + result = self._getArcJobID(executableFile, inputs, outputs, delegation, currentEnv) if not result["OK"]: break arcJobID, diracStamp = result["Value"] @@ -665,6 +676,11 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= jobReference = self._arcIDToJobReference(arcJobID) batchIDList.append(jobReference) stampDict[jobReference] = diracStamp + + # Add all env variables we added into additionalEnvMappingResponse so we have: + # { "Stamp1": { "SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} } + additionalEnvMappingResponse[diracStamp] = additionalEnv[i] + self.log.debug( "Successfully submitted job", f"{jobReference} to CE {self.ceName}", @@ -677,6 +693,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= if batchIDList: result = S_OK(batchIDList) result["PilotStampDict"] = stampDict + result["EnvMapping"] = additionalEnvMappingResponse else: result = S_ERROR("No ID obtained from the ARC job submission") return result diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index 5d357a5d5c6..d6aa0854dd3 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -372,7 +372,7 @@ def __init__(self, *args, **kwargs): self.ceType = CE_NAME self._cloudDriver = None - def submitJob(self, executableFile, proxy, numberOfJobs=1): + def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]): """Creates VM instances :param str executableFile: Path to pilot job wrapper file to use diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 25581a56550..281f627d816 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -308,7 +308,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False): return S_OK(stdout.strip()) ############################################################################# - def submitJob(self, executableFile, proxy, numberOfJobs=1): + def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]): """Method to submit job""" self.log.verbose("Executable file path:", executableFile) diff --git a/src/DIRAC/Resources/Computing/LocalComputingElement.py b/src/DIRAC/Resources/Computing/LocalComputingElement.py index 5f23f3e5e0c..6d22c629597 100644 --- a/src/DIRAC/Resources/Computing/LocalComputingElement.py +++ b/src/DIRAC/Resources/Computing/LocalComputingElement.py @@ -148,7 +148,7 @@ def _prepareHost(self): return S_OK() - def submitJob(self, executableFile, proxy=None, numberOfJobs=1): + def submitJob(self, executableFile, proxy=None, numberOfJobs=1, diracXSecrets=[]): if not os.access(executableFile, 5): os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) diff --git a/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py b/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py index 38c70887d4e..3a94861e23c 100644 --- a/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py @@ -50,7 +50,7 @@ def _reset(self): return S_OK() ############################################################################# - def submitJob(self, executableFile, proxy, numberOfJobs=1): + def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]): """Method to submit job""" # Choose eligible hosts, rank them by the number of available slots diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index dc353bbdcac..c4849edee42 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -526,7 +526,7 @@ def __executeHostCommand(self, command, options, ssh=None, host=None): else: return S_ERROR("\n".join([sshStdout, sshStderr])) - def submitJob(self, executableFile, proxy, numberOfJobs=1): + def submitJob(self, executableFile, proxy, numberOfJobs=1, diracXSecrets=[]): # self.log.verbose( "Executable file path: %s" % executableFile ) if not os.access(executableFile, 5): os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 5b06861bcd2..30685331a82 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -37,6 +37,7 @@ from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials +from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import ( _writePilotWrapperFile, getPilotFilesCompressedEncodedDict, @@ -103,6 +104,7 @@ def initialize(self): self.rssClient = ResourceStatus() self.pilotAgentsDB = getPilotAgentsDB() self.matcherClient = MatcherClient() + self.pilotManagementClient = PilotManagerClient() return S_OK() @@ -348,15 +350,15 @@ def _submitPilotsPerQueue(self, queueName: str): if not result["OK"]: self.log.info("Failed pilot submission", f"Queue: {queueName}") return result - pilotList, stampDict = result["Value"] + stampDict, secretDict = result["Value"] - # updating the pilotAgentsDB... done by default but maybe not strictly necessary - result = self._addPilotReferences(queueName, pilotList, stampDict) + submittedPilots = len(stampDict) + self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}") + + result = self._addPilotReferences(queueName, stampDict, secretDict) if not result["OK"]: return result - submittedPilots = len(pilotList) - self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}") return S_OK(submittedPilots) def _getQueueSlots(self, queue: str): @@ -460,8 +462,32 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: jobProxy = result["Value"] executable = self._getExecutable(queue, proxy=jobProxy, jobExecDir=jobExecDir, envVariables=envVariables) + # ----------- Generate additionnal env variables ----------- + + additionalEnv = [] + + # First, create secrets + resultSecrets = self.pilotManagementClient.createNSecrets(vo=self.vo, n=pilotsToSubmit) + if not resultSecrets["OK"]: + self.log.error("Couldn't fetch secrets.") + secrets = [] + else: + secrets = [{"DIRACX_SECRET": secret["pilot_secret"]} for secret in resultSecrets["Value"]] + + # --- + # More env? + # --- + + # Could be secrets, but also other things, increase in the for loop... + for i, el in enumerate(secrets): + additionalEnv[i].update(el) + + # ---------------------------------------------------------- + # Submit the job - submitResult = ce.submitJob(executable, "", pilotsToSubmit) + # NOTE FOR DIRACX /!\ : We need in each CE to create a secret + submitResult = ce.submitJob(executable, "", pilotsToSubmit, additionalEnv=additionalEnv) + # In case the CE does not need the executable after the submission, we delete it # Else, we keep it, the CE will delete it after the end of the pilot execution if submitResult.get("ExecutableToKeep") != executable: @@ -531,34 +557,66 @@ def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: if not result["OK"]: self.log.error("Failure submitting Monitoring report", result["Message"]) - return S_OK((pilotList, stampDict)) + secrets_request_body = {} + if "EnvMapping" in submitResult: + # TODO: Update this comment as we add DiracX support + # V9+, only for: + # 1. Arex - def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[str, str]): + # EnvMapping body: { "Stamp1": { "DIRACX_SECRET": "I_luv_strawberries", "...": "..." }, "Stamp2": {...} } + # Request body for secrets: {"I_luv_stawberries": ["Stamp1", "..."]} + envMapping = submitResult["SecretDict"] + secrets_request_body = {} + + for stamp, envVariables in envMapping.items(): + diracx_secret = envVariables.get("DIRACX_SECRET") + if diracx_secret: + if not diracx_secret in secrets_request_body: + secrets_request_body[diracx_secret] = [] + secrets_request_body[diracx_secret].append(stamp) + + # We reverse the ref-stamp dictionnary (DiracX behaviour) + references = stampDict.keys() + stamps = stampDict.values() + stampDict = dict(zip(stamps, references)) + + return S_OK((stampDict, secrets_request_body)) + + def _addPilotReferences(self, queue: str, stampDict: dict[str, str], secretDict: dict[str, str]): """Add pilotReference to pilotAgentsDB :param queue: the queue name :param pilotList: list of pilots - :param stampDict: dictionary of pilots timestamps + :param refDict: dictionary {"pilotstamp":"pilotref"} + :param secretDict: dictionary {"pilotstamp":"secret"} """ - result = self.pilotAgentsDB.addPilotReferences( - pilotList, - self.vo, - self.queueDict[queue]["CEType"], - stampDict, + # FIXME: Change for a client or at least request to DiracX + + # First, create pilots + stamps = stampDict.keys() + result = self.pilotManagementClient.addPilotReferences( + stamps, self.vo, self.queueDict[queue]["CEType"], stampDict ) if not result["OK"]: - self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"]) return result - for pilot in pilotList: - result = self.pilotAgentsDB.setPilotStatus( - pilot, - PilotStatus.SUBMITTED, - self.queueDict[queue]["CEName"], - "Successfully submitted by the SiteDirector", - self.queueDict[queue]["Site"], - self.queueDict[queue]["QueueName"], + # We associate all of the pilots with their secrets + if secretDict: + result = self.pilotManagementClient.associatePilotWithSecret(secretDict) + if not result["OK"]: + return result + + for stamp in stamps: + result = self.pilotManagementClient.set_pilot_field( + stamp, + { + "DestinationSite": self.queueDict[queue]["CEName"], + "StatusReason": "Successfully submitted by the SiteDirector", + "GridSite": self.queueDict[queue]["Site"], + "Queue": self.queueDict[queue]["QueueName"], + }, ) + if not result["OK"]: self.log.error("Failed to set pilot status", result["Message"]) return result @@ -591,14 +649,13 @@ def _getExecutable(self, queue: str, proxy: X509Chain, jobExecDir: str = "", env ce = self.queueCECache[queue]["CE"] workingDirectory = getattr(ce, "workingDirectory", self.workingDirectory) - executable = self._writePilotScript( + return self._writePilotScript( workingDirectory=workingDirectory, pilotOptions=pilotOptions, proxy=proxy, pilotExecDir=jobExecDir, envVariables=envVariables, ) - return executable def _getPilotOptions(self, queue: str) -> list[str]: """Prepare pilot options @@ -680,6 +737,13 @@ def _getPilotOptions(self, queue: str) -> list[str]: if "PipInstallOptions" in queueDict: pilotOptions.append(f"--pipInstallOptions={queueDict['PipInstallOptions']}") + # FIXME: Get secret + # if "secret" in queueDict: + # pilotOptions.append(f"--pilotSecret={queueDict['...']}") + # FIXME: Get clientID + # pilotOptions.append(f"--clientID={opsHelper.getValue('TO CHANGE')}) + pilotOptions.append(f"--diracx_URL={DIRAC.gConfig.getValue('/DiracX/URL')}") + return pilotOptions def _writePilotScript( diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py new file mode 100644 index 00000000000..a9eb56b097d --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py @@ -0,0 +1,115 @@ +from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue + +from DIRAC.Core.Security.DiracX import DiracXClient + + +class PilotManagerClient: + @convertToReturnValue + def addPilotReferences(self, pilot_stamps, VO, gridType="DIRAC", pilot_references={}): + with DiracXClient() as api: + # We will move toward a stamp as identifier for the pilot + return api.pilots.add_pilot_stamps( + {"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_references, "generate_secrets": False} # type: ignore + ) # type: ignore + + def set_pilot_field(self, pilot_stamp, values_dict): + with DiracXClient() as api: + values_dict["PilotStamp"] = pilot_stamp + return api.pilots.update_pilot_fields({"pilot_stamps_to_fields_mapping": [values_dict]}) # type: ignore + + @convertToReturnValue + def setPilotBenchmark(self, pilotStamp, mark): + return self.set_pilot_field(pilotStamp, {"BenchMark": mark}) + + @convertToReturnValue + def setAccountingFlag(self, pilotStamp, flag): + return self.set_pilot_field(pilotStamp, {"AccountingSent": flag}) + + @convertToReturnValue + def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None): + return self.set_pilot_field( + pilot_stamp, + { + "Status": status, + "DestinationSite": destination, + "StatusReason": reason, + "GridSite": grid_site, + "Queue": queue, + }, + ) + + @convertToReturnValue + def clearPilots(self, interval=30, aborted_interval=7): + with DiracXClient() as api: + api.pilots.delete_pilots(age_in_days=interval, delete_only_aborted=False) + api.pilots.delete_pilots(age_in_days=aborted_interval, delete_only_aborted=True) + + @convertToReturnValue + def deletePilots(self, pilot_stamps): + with DiracXClient() as api: + pilot_ids = None + if isinstance(pilot_stamps, list[int]): # type: ignore + # Multiple elements (int) + pilot_ids = pilot_stamps # Semantic + elif isinstance(pilot_stamps, int): + # Only one element (int) + pilot_ids = [pilot_stamps] + elif isinstance(pilot_stamps, str): + # Only one element (str) + pilot_stamps = [pilot_stamps] + # Else: pilot_stamps should be list[str] (or the input is random) + + if pilot_ids: + # If we have defined pilot_ids, then we have to change them to pilot_stamps + query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] + + pilots = api.pilots.search(parameters=["PilotStamp"], search=query, sort=[]) + pilot_stamps = [pilot["PilotStamp"] for pilot in pilots] + + api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore + + @convertToReturnValue + def setJobForPilot(self, job_id, pilot_stamp, destination=None): + with DiracXClient() as api: + api.pilots.add_jobs_to_pilot({"pilot_stamp": pilot_stamp, "job_ids": [job_id]}) # type: ignore + + self.set_pilot_field( + pilot_stamp, + { + "DestinationSite": destination, + }, + ) + + @convertToReturnValue + def getPilots(self, job_id): + with DiracXClient() as api: + pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id) + + query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] + + return api.pilots.search(parameters=[], search=query, sort=[]) + + @convertToReturnValue + def getPilotInfo(self, pilot_stamp): + with DiracXClient() as api: + query = [{"parameter": "PilotStamp", "operator": "eq", "value": pilot_stamp}] + + return api.pilots.search(parameters=[], search=query, sort=[]) + + @convertToReturnValue + def associatePilotWithSecret(self, secretDict): + # secretDict format: {"secret": ["stamp"]} + with DiracXClient() as api: + return api.pilots.update_secrets_constraints(secretDict) # type: ignore + + @convertToReturnValue + def createNSecrets(self, vo, n=100, expiration_minutes=120, pilot_secret_use_count_max=1): + with DiracXClient() as api: + return api.pilots.create_pilot_secrets( + { + "n": n, + "expiration_minutes": expiration_minutes, + "pilot_secret_use_count_max": pilot_secret_use_count_max, + "vo": vo, + } + ) # type: ignore diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index d02574687bf..e0948c3bae6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -69,14 +69,6 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) - ########################################################################################## - types_addPilotReferences = [list, str] - - @classmethod - def export_addPilotReferences(cls, pilotRef, VO, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotReferences(pilotRef, VO, gridType, pilotStampDict) - ############################################################################## types_getPilotOutput = [str] @@ -205,17 +197,10 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict): # return res, correct or not return res - ############################################################################## - types_getPilotInfo = [[list, str]] - - @classmethod - def export_getPilotInfo(cls, pilotReference): - """Get the info about a given pilot job reference""" - return cls.pilotAgentsDB.getPilotInfo(pilotReference) - ############################################################################## types_selectPilots = [dict] + # Won't be moved to DiracX: not used at all anywhere. @classmethod def export_selectPilots(cls, condDict): """Select pilots given the selection conditions""" @@ -291,19 +276,29 @@ def export_getGroupedPilotSummary(cls, columnList): """ return cls.pilotAgentsDB.getGroupedPilotSummary(columnList) - ############################################################################## - types_getPilots = [[str, int]] + types_countPilots = [dict] @classmethod - def export_getPilots(cls, jobID): - """Get pilots executing/having executed the Job""" - result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) - if not result["OK"] or not result["Value"]: - return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"): + """Set the pilot agent status""" - return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) + return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp) - ############################################################################## + # --------------- Moved to DiracX --------------- + + ############################################# + types_addPilotReferences = [list, str] + + # Moved to DiracX + @classmethod + def export_addPilotReferences(cls, pilotStamps, VO, gridType="DIRAC", pilotRefDict={}): + """Add a new pilot job reference""" + pilot_references = pilotRefDict.values() + pilot_stamp_dict = dict(zip(pilotStamps, pilot_references)) + + return cls.pilotAgentsDB.addPilotReferences(pilot_references, VO, gridType, pilot_stamp_dict) + + ############################################# types_setJobForPilot = [[str, int], str] @classmethod @@ -321,7 +316,7 @@ def export_setJobForPilot(cls, jobID, pilotRef, destination=None): return result - ########################################################################################## + ############################################# types_setPilotBenchmark = [str, float] @classmethod @@ -329,7 +324,7 @@ def export_setPilotBenchmark(cls, pilotRef, mark): """Set the pilot agent benchmark""" return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark) - ########################################################################################## + ############################################# types_setAccountingFlag = [str] @classmethod @@ -337,7 +332,7 @@ def export_setAccountingFlag(cls, pilotRef, mark="True"): """Set the pilot AccountingSent flag""" return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark) - ########################################################################################## + ############################################# types_setPilotStatus = [str, str] @classmethod @@ -348,16 +343,7 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None, pilotRef, status, destination=destination, statusReason=reason, gridSite=gridSite, queue=queue ) - ########################################################################################## - types_countPilots = [dict] - - @classmethod - def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"): - """Set the pilot agent status""" - - return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp) - - ########################################################################################## + ############################################# types_deletePilots = [[list, str, int]] @classmethod @@ -365,6 +351,9 @@ def export_deletePilots(cls, pilotIDs): if isinstance(pilotIDs, str): return cls.pilotAgentsDB.deletePilot(pilotIDs) + # And list[str]???? + # pilot_id>>>S<<< + if isinstance(pilotIDs, int): pilotIDs = [ pilotIDs, @@ -376,9 +365,29 @@ def export_deletePilots(cls, pilotIDs): return S_OK() - ############################################################################## + ############################################# types_clearPilots = [int, int] @classmethod def export_clearPilots(cls, interval=30, aborted_interval=7): return cls.pilotAgentsDB.clearPilots(interval, aborted_interval) + + ############################################# + types_getPilots = [[str, int]] + + @classmethod + def export_getPilots(cls, jobID): + """Get pilots executing/having executed the Job""" + result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) + if not result["OK"] or not result["Value"]: + return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + + return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) + + ############################################# + types_getPilotInfo = [[list, str]] + + @classmethod + def export_getPilotInfo(cls, pilotReference): + """Get the info about a given pilot job reference""" + return cls.pilotAgentsDB.getPilotInfo(pilotReference) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py index c39bbe26341..78a9bca9235 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py @@ -182,6 +182,8 @@ def pilotWrapperScript( if envVariables is None: envVariables = {} + elif "DIRACX_SECRET" in envVariables: + pilotOptions += f" --pilotSecret={envVariables['DIRACX_SECRET']}" if not CVMFS_locations: # What is in this location is almost certainly incorrect, especially the pilot.json diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py index 39a5773024a..461744b6b34 100644 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py @@ -51,6 +51,8 @@ def main(): """ params = Params() + # TODO: Add also site here. + # Later deprecated in V9. Script.registerSwitches(params.switches) Script.registerArgument("pilotRef: pilot reference") Script.registerArgument("VO: VO, or pilot owner group") @@ -74,7 +76,7 @@ def main(): if not DErrno.cmpError(res, DErrno.EWMSNOPILOT): gLogger.error(res["Message"]) DIRACExit(1) - res = pmc.addPilotReferences([pilotRef], VO, gridType, {pilotRef: pilotStamp}) + res = pmc.addPilotReferences([pilotStamp], VO, gridType, {pilotStamp: pilotRef}) if not res["OK"]: gLogger.error(res["Message"]) DIRACExit(1) diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index a030e9f70e4..c295adb3172 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -25,8 +25,8 @@ def test_PilotsDB(): webapp = WebAppClient() # This will allow you to run the test again if necessary - for jobID in ["aPilot", "anotherPilot"]: - pilots.deletePilots(jobID) + for pilot_stamp in ["aPilot", "anotherPilot"]: + pilots.deletePilot(pilot_stamp) res = pilots.addPilotReferences(["aPilot"], "VO") assert res["OK"], res["Message"]