diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 99690b2369b..d05cf4aea6f 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -38,7 +38,7 @@ jobs: - TEST_NAME: "Backward Compatibility" ARGS: CLIENT_INSTALLATION_BRANCH=rel-v8r0 PILOT_INSTALLATION_BRANCH=rel-v8r0 - TEST_NAME: "Test DiracX latest" - ARGS: TEST_DIRACX=Yes + ARGS: TEST_DIRACX=Yes --diracx-dist-dir $GITHUB_WORKSPACE/diracx-dist steps: - uses: actions/checkout@v4 @@ -57,7 +57,22 @@ jobs: packaging \ pyyaml \ requests \ - typer + typer \ + build + - name: Building wheels + run: | + # Clone diracx + git clone --single-branch --branch robin-pilot-registrations https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx + + # Create dist dir + mkdir -p $GITHUB_WORKSPACE/diracx-dist + + # Building diracx + for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do + echo "Building $pkg_dir" + python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir + done + - name: Prepare environment run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }} - name: Install server diff --git a/src/DIRAC/Interfaces/API/DiracAdmin.py b/src/DIRAC/Interfaces/API/DiracAdmin.py index a86158cd5ab..58171344b8f 100755 --- a/src/DIRAC/Interfaces/API/DiracAdmin.py +++ b/src/DIRAC/Interfaces/API/DiracAdmin.py @@ -380,63 +380,6 @@ def getJobPilotOutput(self, jobID, directory=""): self.log.always(f"Outputs retrieved in {outputPath}") return result - ############################################################################# - def getPilotOutput(self, gridReference, directory=""): - """Retrieve the pilot output (std.out and std.err) for an existing pilot reference. - - >>> gLogger.notice(dirac.getJobPilotOutput(12345)) - {'OK': True, 'Value': {}} - - :param str gridReference: pilot reference - :param str directory: a directory to download logs to. - :return: S_OK,S_ERROR - """ - if not isinstance(gridReference, str): - return self._errorReport("Expected string for pilot reference") - - if not directory: - directory = self.currentDir - - if not os.path.exists(directory): - return self._errorReport(f"Directory {directory} does not exist") - - result = PilotManagerClient().getPilotOutput(gridReference) - if not result["OK"]: - return result - - gridReferenceSmall = gridReference.split("/")[-1] - if not gridReferenceSmall: - gridReferenceSmall = "reference" - outputPath = f"{directory}/pilot_{gridReferenceSmall}" - - if os.path.exists(outputPath): - self.log.info(f"Remove {outputPath} and retry to continue") - return S_ERROR(f"Remove {outputPath} and retry to continue") - - if not os.path.exists(outputPath): - self.log.verbose(f"Creating directory {outputPath}") - os.mkdir(outputPath) - - outputs = result["Value"] - if "StdOut" in outputs: - stdout = f"{outputPath}/std.out" - with open(stdout, "w") as fopen: - fopen.write(outputs["StdOut"]) - self.log.info(f"Standard output written to {stdout}") - else: - self.log.warn("No standard output returned") - - if "StdErr" in outputs: - stderr = f"{outputPath}/std.err" - with open(stderr, "w") as fopen: - fopen.write(outputs["StdErr"]) - self.log.info(f"Standard error written to {stderr}") - else: - self.log.warn("No standard error returned") - - self.log.always(f"Outputs retrieved in {outputPath}") - return result - ############################################################################# def getPilotInfo(self, gridReference): """Retrieve info relative to a pilot reference diff --git a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py index fed8f0bf111..cc0035f0843 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py +++ b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py @@ -21,22 +21,10 @@ def main(): _, args = Script.parseCommandLine(ignoreErrors=True) from DIRAC import exit as DIRACExit - from DIRAC.Interfaces.API.DiracAdmin import DiracAdmin - diracAdmin = DiracAdmin() - exitCode = 0 - errorList = [] + print("This command is not supported anymore with DIRAV V9.") - for gridID in args: - result = diracAdmin.getPilotOutput(gridID) - if not result["OK"]: - errorList.append((gridID, result["Message"])) - exitCode = 2 - - for error in errorList: - print("ERROR %s: %s" % error) - - DIRACExit(exitCode) + DIRACExit(0) if __name__ == "__main__": diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py index 5cd731f2bea..b09418fc9fd 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py @@ -4,10 +4,17 @@ from DIRAC.Core.Base.Client import Client, createClient +from DIRAC.WorkloadManagementSystem.FutureClient.PilotManagerClient import ( + PilotManagerClient as futurePilotManagerClient, +) + + @createClient("WorkloadManagement/PilotManager") class PilotManagerClient(Client): """PilotManagerClient sets url for the PilotManagerHandler.""" + diracxClient = futurePilotManagerClient + def __init__(self, url=None, **kwargs): """ Sets URL for PilotManager handler diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py new file mode 100644 index 00000000000..6aa15ebeb72 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py @@ -0,0 +1,173 @@ +from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue +from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient + + +class PilotManagerClient(FutureClient): + def get_pilot_stamps_from_refs(self, pilot_references) -> list[str]: + with DiracXClient() as api: + search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}] + pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore + + return [pilot["PilotStamp"] for pilot in pilots] + + @convertToReturnValue + def addPilotReferences(self, pilot_references, VO, gridType="DIRAC", pilot_stamps_dict={}): + with DiracXClient() as api: + pilot_stamps = [pilot_stamps_dict.get(ref, ref) for ref in pilot_references] + pilot_ref_dict = dict(zip(pilot_stamps, pilot_references)) + + # We will move toward a stamp as identifier for the pilot + return api.pilots.add_pilot_stamps( + pilot_stamps=pilot_stamps, vo=VO, grid_type=gridType, pilot_references=pilot_ref_dict + ) + + def set_pilot_field(self, pilot_stamp, values_dict): + with DiracXClient() as api: + values_dict["PilotStamp"] = pilot_stamp + return api.pilots.update_pilot_fields(pilot_stamps_to_fields_mapping=[values_dict]) # type: ignore + + @convertToReturnValue + def setPilotStatus(self, pilot_reference, status, destination=None, reason=None, grid_site=None, queue=None): + # Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs) + pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference]) + pilot_stamp = pilot_stamps[0] # We might raise an error. This is so that we spot the error + + return self.set_pilot_field( + pilot_stamp, + { + "Status": status, + "DestinationSite": destination, + "StatusReason": reason, + "GridSite": grid_site, + "Queue": queue, + }, + ) + + @convertToReturnValue + def deletePilot(self, pilot_reference): + # Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs) + pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference]) + # We don't want to raise an error. + if not pilot_stamps: + return None + pilot_stamp = pilot_stamps[0] + + with DiracXClient() as api: + pilot_stamps = [pilot_stamp] + return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) + + @convertToReturnValue + def getJobsForPilotByStamp(self, pilotStamp): + with DiracXClient() as api: + return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp) + + @convertToReturnValue + def getPilots(self, job_id): + with DiracXClient() as api: + pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id) + search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] + return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore + + @convertToReturnValue + def getPilotInfo(self, pilot_reference): + """Important: We assume that to one stamp is mapped one pilot.""" + with DiracXClient() as api: + search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}] + pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore + + if not pilot: + # Return an error as in the legacy code + return [] + + # Convert all bools in pilot to str + for k, v in pilot.items(): + if isinstance(v, bool): + pilot[k] = str(v) + + # Transform the list of pilots into a dict keyed by PilotJobReference + resDict = {} + + pilotRef = pilot.get("PilotJobReference", None) + assert pilot_reference == pilotRef + pilotStamp = pilot.get("PilotStamp", None) + + if pilotRef is not None: + resDict[pilotRef] = pilot + else: + # Fallback: use PilotStamp or another key if PilotJobReference is missing + resDict[pilotStamp] = pilot + + jobIDs = self.getJobsForPilotByStamp(pilotStamp) + if jobIDs: # Only add if jobs exist + for pilotRef, pilotInfo in resDict.items(): + pilotInfo["Jobs"] = jobIDs # Attach the entire list + + return resDict + + @convertToReturnValue + def getGroupedPilotSummary(self, column_list): + with DiracXClient() as api: + return api.pilots.summary(grouping=column_list) + + @convertToReturnValue + def getPilotSummary(self, startdate="", enddate=""): + with DiracXClient() as api: + search_filters = [] + if startdate: + search_filters.append({"parameter": "SubmissionTime", "operator": "gt", "value": startdate}) + if enddate: + search_filters.append({"parameter": "SubmissionTime", "operator": "lt", "value": enddate}) + + rows = api.pilots.summary(grouping=["DestinationSite", "Status"], search=search_filters) + + # Build nested result: { site: { status: count }, Total: { status: total_count } } + summary_dict = {"Total": {}} + for row in rows: + site = row["DestinationSite"] + status = row["Status"] + count = row["count"] + + if site not in summary_dict: + summary_dict[site] = {} + + summary_dict[site][status] = count + summary_dict["Total"].setdefault(status, 0) + summary_dict["Total"][status] += count + + return summary_dict + + @convertToReturnValue + def deletePilots(self, pilot_references): + pilot_ids = [] + if pilot_references and isinstance(pilot_references, list): + if isinstance(pilot_references[0], int): + # Multiple elements (int) + pilot_ids = pilot_references # Semantic + elif isinstance(pilot_references, int): + # Only one element (int) + pilot_ids = [pilot_references] + elif isinstance(pilot_references, str): + # Only one element (str) + pilot_references = [pilot_references] + # Else: pilot_stamps should be list[str] (or the input is random) + + pilot_stamps = [] + + # Used by no one, but we won't raise `UnimplementedError` because we still use it in tests. + with DiracXClient() as api: + search = [] + if pilot_ids: + # If we have defined pilot_ids, then we have to change them to pilot_stamps + search = [{"parameter": "PilotID", "operator": "in", "values": pilot_ids}] + else: + # If we have defined pilot_ids, then we have to change them to pilot_stamps + search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}] + + pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore + pilot_stamps = [pilot["PilotStamp"] for pilot in pilots] + + if not pilot_stamps: + # Avoid useless requests + return None + + return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index d02574687bf..a6dfbd15ded 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -4,12 +4,10 @@ import datetime import shutil -import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import ( getPilotCE, getPilotRef, @@ -38,46 +36,7 @@ def initializeHandler(cls, serviceInfoDict): return S_OK() ############################################################################## - types_getCurrentPilotCounters = [dict] - @classmethod - def export_getCurrentPilotCounters(cls, attrDict={}): - """Get pilot counters per Status with attrDict selection. Final statuses are given for - the last day. - """ - - result = cls.pilotAgentsDB.getCounters("PilotAgents", ["Status"], attrDict, timeStamp="LastUpdateTime") - if not result["OK"]: - return result - last_update = datetime.datetime.utcnow() - TimeUtilities.day - resultDay = cls.pilotAgentsDB.getCounters( - "PilotAgents", ["Status"], attrDict, newer=last_update, timeStamp="LastUpdateTime" - ) - if not resultDay["OK"]: - return resultDay - - resultDict = {} - for statusDict, count in result["Value"]: - status = statusDict["Status"] - resultDict[status] = count - if status in PilotStatus.PILOT_FINAL_STATES: - resultDict[status] = 0 - for statusDayDict, ccount in resultDay["Value"]: - if status == statusDayDict["Status"]: - resultDict[status] = ccount - break - - return S_OK(resultDict) - - ########################################################################################## - types_addPilotReferences = [list, str] - - @classmethod - def export_addPilotReferences(cls, pilotRef, VO, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotReferences(pilotRef, VO, gridType, pilotStampDict) - - ############################################################################## types_getPilotOutput = [str] def export_getPilotOutput(self, pilotReference): @@ -205,30 +164,6 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict): # return res, correct or not return res - ############################################################################## - types_getPilotInfo = [[list, str]] - - @classmethod - def export_getPilotInfo(cls, pilotReference): - """Get the info about a given pilot job reference""" - return cls.pilotAgentsDB.getPilotInfo(pilotReference) - - ############################################################################## - types_selectPilots = [dict] - - @classmethod - def export_selectPilots(cls, condDict): - """Select pilots given the selection conditions""" - return cls.pilotAgentsDB.selectPilots(condDict) - - ############################################################################## - types_storePilotOutput = [str, str, str] - - @classmethod - def export_storePilotOutput(cls, pilotReference, output, error): - """Store the pilot output and error""" - return cls.pilotAgentsDB.storePilotOutput(pilotReference, output, error) - ############################################################################## types_getPilotLoggingInfo = [str] @@ -278,6 +213,9 @@ def export_getPilotSummary(cls, startdate="", enddate=""): return cls.pilotAgentsDB.getPilotSummary(startdate, enddate) ############################################################################## + + # --------------- Moved to DiracX --------------- + types_getGroupedPilotSummary = [list] @classmethod @@ -291,53 +229,15 @@ def export_getGroupedPilotSummary(cls, columnList): """ return cls.pilotAgentsDB.getGroupedPilotSummary(columnList) - ############################################################################## - types_getPilots = [[str, int]] - - @classmethod - def export_getPilots(cls, jobID): - """Get pilots executing/having executed the Job""" - result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) - if not result["OK"] or not result["Value"]: - return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") - - return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) - - ############################################################################## - types_setJobForPilot = [[str, int], str] - - @classmethod - def export_setJobForPilot(cls, jobID, pilotRef, destination=None): - """Report the DIRAC job ID which is executed by the given pilot job""" - - result = cls.pilotAgentsDB.setJobForPilot(int(jobID), pilotRef) - if not result["OK"]: - return result - result = cls.pilotAgentsDB.setCurrentJobID(pilotRef, int(jobID)) - if not result["OK"]: - return result - if destination: - result = cls.pilotAgentsDB.setPilotDestinationSite(pilotRef, destination) - - return result - - ########################################################################################## - types_setPilotBenchmark = [str, float] - - @classmethod - def export_setPilotBenchmark(cls, pilotRef, mark): - """Set the pilot agent benchmark""" - return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark) - - ########################################################################################## - types_setAccountingFlag = [str] + ############################################# + types_addPilotReferences = [list, str] + # Moved to DiracX @classmethod - def export_setAccountingFlag(cls, pilotRef, mark="True"): - """Set the pilot AccountingSent flag""" - return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark) + def export_addPilotReferences(cls, pilotReferences, VO, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" + return cls.pilotAgentsDB.addPilotReferences(pilotReferences, VO, gridType, pilotStampDict) - ########################################################################################## types_setPilotStatus = [str, str] @classmethod @@ -348,23 +248,18 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None, pilotRef, status, destination=destination, statusReason=reason, gridSite=gridSite, queue=queue ) - ########################################################################################## - types_countPilots = [dict] - - @classmethod - def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"): - """Set the pilot agent status""" - - return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp) - - ########################################################################################## + ############################################# types_deletePilots = [[list, str, int]] @classmethod def export_deletePilots(cls, pilotIDs): + # Used by no one. We keep it for tests. if isinstance(pilotIDs, str): return cls.pilotAgentsDB.deletePilot(pilotIDs) + # And list[str]???? + # pilot_id>>>S<<< + if isinstance(pilotIDs, int): pilotIDs = [ pilotIDs, @@ -376,9 +271,22 @@ def export_deletePilots(cls, pilotIDs): return S_OK() - ############################################################################## - types_clearPilots = [int, int] + ############################################# + types_getPilots = [[str, int]] @classmethod - def export_clearPilots(cls, interval=30, aborted_interval=7): - return cls.pilotAgentsDB.clearPilots(interval, aborted_interval) + def export_getPilots(cls, jobID): + """Get pilots executing/having executed the Job""" + result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) + if not result["OK"] or not result["Value"]: + return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + + return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) + + ############################################# + types_getPilotInfo = [[list, str]] + + @classmethod + def export_getPilotInfo(cls, pilotReference): + """Get the info about a given pilot job reference""" + return cls.pilotAgentsDB.getPilotInfo(pilotReference) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index b1ef5ad80a9..b93abddc913 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -178,8 +178,6 @@ def export_getJobPilotOutput(self, jobID): pilotReference = res["Value"][cycle]["Pilot_Reference"] c = cycle - if pilotReference: - return self.pilotManager.getPilotOutput(pilotReference) return S_ERROR("No pilot job reference found") diff --git a/tests/.dirac-ci-config.yaml b/tests/.dirac-ci-config.yaml index 87fd088a218..5000938566a 100644 --- a/tests/.dirac-ci-config.yaml +++ b/tests/.dirac-ci-config.yaml @@ -4,7 +4,7 @@ config: CLIENT_UPLOAD_BASE64: SSBsaWtlIHBpenphIQo= CLIENT_UPLOAD_LFN: LFN:/vo/test_lfn.txt CLIENT_UPLOAD_FILE: test_lfn.txt - PILOT_INSTALLATION_COMMAND: dirac-pilot.py --modules /home/dirac/LocalRepo/ALTERNATIVE_MODULES/DIRAC -M 2 -N jenkins.cern.ch -Q jenkins-queue_not_important -n DIRAC.Jenkins.ch --pilotUUID=whatever12345 --CVMFS_locations=/home/dirac/ -o diracInstallOnly --wnVO=vo --debug + PILOT_INSTALLATION_COMMAND: dirac-pilot.py --modules /home/dirac/LocalRepo/ALTERNATIVE_MODULES/DIRAC -M 2 -N jenkins.cern.ch -Q jenkins-queue_not_important -n DIRAC.Jenkins.ch --pilotUUID=whatever12345 --CVMFS_locations=/home/dirac/ -o diracInstallOnly --wnVO=vo --debug --diracx_URL=http://diracx:8000 --clientID=072afab5-ed92-46e0-a61d-4ecbc96e0770 PILOT_JSON: "{ \"timestamp\": \"2023-02-13T14:34:26.725499\", \"CEs\": { @@ -37,7 +37,7 @@ config: \"https://server:9135/Configuration/Server\" ] }" - PILOT_DOWNLOAD_COMMAND: "git clone --single-branch --branch master https://github.com/DIRACGrid/Pilot.git && mv Pilot/Pilot/*.py . && rm -rf Pilot" + PILOT_DOWNLOAD_COMMAND: "git clone --single-branch --branch adding-jwt-support https://github.com/Robin-Van-de-Merghel/Pilot.git && mv Pilot/Pilot/*.py . && rm -rf Pilot" # List of feature variables which must be passed when preparing required-feature-flags: [] diff --git a/tests/CI/install_client.sh b/tests/CI/install_client.sh index 324902c4868..2c8ebddfb35 100755 --- a/tests/CI/install_client.sh +++ b/tests/CI/install_client.sh @@ -94,4 +94,14 @@ if [[ -z "${INSTALLATION_BRANCH}" ]]; then echo " JobName = \"${GITHUB_JOB}_$(date +"%Y-%m-%d_%T" | sed 's/://g')\"" >> test_dl.jdl echo "]" >> test_dl.jdl dirac-wms-job-submit test_dl.jdl "${DEBUG}" |& tee -a clientTestOutputs.txt + + #-------------------------------------------------------------------------------# + if [[ "${TEST_DIRACX:-}" = "Yes" ]]; then + echo -e "*** $(date -u) **** Creates DiracX credentials to run commands ****\n" + installDIRACX cli + setDiracXCreds prod + # Generate secrets + secret=$(dirac pilots generate-pilot-secrets vo 1 | jq -r '.[0].pilot_secret') + echo "$secret" > /ca/certs/pilot_secret.txt + fi fi diff --git a/tests/CI/run_pilot.sh b/tests/CI/run_pilot.sh index f7827b8503f..301e6aa27d3 100755 --- a/tests/CI/run_pilot.sh +++ b/tests/CI/run_pilot.sh @@ -42,5 +42,13 @@ elif command -v python2 &> /dev/null; then py='python2' fi +additional_params="" + +if [[ -n "$TEST_DIRACX" && "$TEST_DIRACX" == "Yes" ]]; then + # Read the pilot secret from file + secret=$(cat /ca/certs/pilot_secret.txt) + additional_params="--pilotSecret $secret" +fi + # shellcheck disable=SC2086 -$py ${PILOT_INSTALLATION_COMMAND} +$py ${PILOT_INSTALLATION_COMMAND} ${additional_params} diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index a030e9f70e4..d0c0e7ddafa 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -16,76 +16,55 @@ from DIRAC import gLogger from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient +import os gLogger.setLevel("VERBOSE") def test_PilotsDB(): pilots = PilotManagerClient() - webapp = WebAppClient() + is_diracx_enabled = os.getenv("TEST_DIRACX") == "Yes" + # webapp = WebAppClient() # ? # This will allow you to run the test again if necessary - for jobID in ["aPilot", "anotherPilot"]: - pilots.deletePilots(jobID) + for pilot_ref in ["aPilot", "anotherPilot"]: + res = pilots.deletePilot(pilot_ref) - res = pilots.addPilotReferences(["aPilot"], "VO") + res = pilots.addPilotReferences(["aPilot"], "vo") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] - assert "Submitted" in res["Value"] + res = pilots.addPilotReferences(["aPilot"], "vo") + + # Duplicates to see if we have a conflict + # If supports diracx, then it should be detected + # But old DIRAC doesn't. + if is_diracx_enabled: + assert not res["OK"], res + assert "Conflict" in res["Message"] + res = pilots.deletePilots("aPilot") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] - res = pilots.addPilotReferences(["anotherPilot"], "VO") + res = pilots.addPilotReferences(["anotherPilot"], "vo") assert res["OK"], res["Message"] - res = pilots.storePilotOutput("anotherPilot", "This is an output", "this is an error") - assert res["OK"], res["Message"] - res = pilots.getPilotOutput("anotherPilot") - assert res["OK"], res["Message"] - assert res["Value"] == { - "VO": "VO", - "StdErr": "this is an error", - "FileList": [], - "StdOut": "This is an output", - } + res = pilots.getPilotInfo("anotherPilot") assert res["OK"], res["Message"] assert res["Value"]["anotherPilot"]["AccountingSent"] == "False" assert res["Value"]["anotherPilot"]["PilotJobReference"] == "anotherPilot" - res = pilots.selectPilots({}) - assert res["OK"], res["Message"] res = pilots.getPilotSummary("", "") assert res["OK"], res["Message"] assert res["Value"]["Total"]["Submitted"] >= 1 - res = webapp.getPilotMonitorWeb({}, [], 0, 100) - assert res["OK"], res["Message"] - assert res["Value"]["TotalRecords"] >= 1 - res = webapp.getPilotMonitorSelectors() - assert res["OK"], res["Message"] - res = webapp.getPilotSummaryWeb({}, [], 0, 100) - assert res["OK"], res["Message"] - assert res["Value"]["TotalRecords"] >= 1 - res = pilots.setAccountingFlag("anotherPilot", "True") - assert res["OK"], res["Message"] res = pilots.setPilotStatus("anotherPilot", "Running") assert res["OK"], res["Message"] res = pilots.getPilotInfo("anotherPilot") assert res["OK"], res["Message"] - assert res["Value"]["anotherPilot"]["AccountingSent"] == "True" assert res["Value"]["anotherPilot"]["Status"] == "Running" - res = pilots.setJobForPilot(123, "anotherPilot") - assert res["OK"], res["Message"] - res = pilots.setPilotBenchmark("anotherPilot", 12.3) - assert res["OK"], res["Message"] - res = pilots.countPilots({}) - assert res["OK"], res["Message"] + if is_diracx_enabled: + res = pilots.getGroupedPilotSummary(["GridSite", "DestinationSite", "VO"]) + assert res["OK"], res["Message"] # We won't test result (hopefully tested in DiracX) res = pilots.deletePilots("anotherPilot") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] diff --git a/tests/Integration/all_integration_client_tests.sh b/tests/Integration/all_integration_client_tests.sh old mode 100644 new mode 100755 index 332d30a07ea..8992a344f87 --- a/tests/Integration/all_integration_client_tests.sh +++ b/tests/Integration/all_integration_client_tests.sh @@ -57,8 +57,9 @@ echo -e "*** $(date -u) **** WMS TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxStoreClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) if [[ -z "${INSTALLATION_BRANCH}" ]]; then - pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_Client_WMS.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) + dirac-proxy-init -g prod "${DEBUG}" |& tee -a clientTestOutputs.txt + pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) fi # Make sure we have the prod role for these tests to get the VmRpcOperator permission diff --git a/tests/Jenkins/utilities.sh b/tests/Jenkins/utilities.sh index a911f95dd12..c4382631bad 100644 --- a/tests/Jenkins/utilities.sh +++ b/tests/Jenkins/utilities.sh @@ -729,3 +729,45 @@ stopRunsv() { echo '==> [Done stopRunsv]' } + + + +#............................................................................. +# +# setDiracXCreds +# +# gets creds from x509, extract token, and put it in the right place +# +#............................................................................. +setDiracXCreds() { + # $1 = DIRAC group + local group="$1" + local cache_dir="$HOME/.cache/diracx" + local creds_file="$cache_dir/credentials.json" + local tmpfile + + if [[ -z "$group" ]]; then + echo "Usage: setDiracXCreds " >&2 + return 1 + fi + + # 1. Init DIRAC proxy + dirac-proxy-init -g "$group" + + # 2. Extract DiracX token from proxy PEM + tmpfile=$(mktemp) + python - <<'EOF' > "$tmpfile" +from DIRAC.Core.Security.DiracX import diracxTokenFromPEM +from DIRAC.Core.Security.Locations import getProxyLocation +import json +pem_location = getProxyLocation() +token = diracxTokenFromPEM(pem_location) +if token: + print(json.dumps(token)) +EOF + + # 3. Move to ~/.cache/diracx/credentials.json + mkdir -p "$cache_dir" + mv "$tmpfile" "$creds_file" + echo "DiracX credentials updated at $creds_file" +}