diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 7432aa8ccc1..05ba05a16fa 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -37,6 +37,8 @@ jobs: ARGS: DIRAC_USE_JSON_ENCODE=NO MYSQL_VER=mysql:8.0.40 - TEST_NAME: "Backward Compatibility" ARGS: CLIENT_INSTALLATION_BRANCH=rel-v8r0 PILOT_INSTALLATION_BRANCH=rel-v8r0 + - TEST_NAME: "Test DiracX latest" + ARGS: TEST_DIRACX=Yes --diracx-dist-dir $GITHUB_WORKSPACE/diracx-dist steps: - uses: actions/checkout@v4 @@ -55,7 +57,22 @@ jobs: packaging \ pyyaml \ requests \ - typer + typer \ + build + - name: Building wheels + run: | + # Clone diracx + git clone --single-branch --branch robin-pilot-legacy-logging https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx + + # Create dist dir + mkdir -p $GITHUB_WORKSPACE/diracx-dist + + # Building diracx + for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do + echo "Building $pkg_dir" + python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir + done + - name: Prepare environment run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }} - name: Install server @@ -79,3 +96,8 @@ jobs: if [ -f client-tests-failed ]; then has_error=1; echo "Client tests failed"; fi if [ -f pilot-tests-failed ]; then has_error=1; echo "pilot tests failed"; fi if [ ${has_error} = 1 ]; then exit 1; fi + - name: DiracX filtered logs + if: ${{ always() && contains(matrix.ARGS, 'TEST_DIRACX=Yes') }} + run: | + # Used to debug, and see which requests were done. + docker logs diracx | grep -Fv 'GET /.well-known/openid-configuration HTTP/1.1" 200 OK' diff --git a/docs/source/DeveloperGuide/CodeTesting/index.rst b/docs/source/DeveloperGuide/CodeTesting/index.rst index dab9d018f6c..1f60b1d102e 100644 --- a/docs/source/DeveloperGuide/CodeTesting/index.rst +++ b/docs/source/DeveloperGuide/CodeTesting/index.rst @@ -309,10 +309,8 @@ Running the above might take a while. Supposing you are interested in running on ./integration_tests.py prepare-environment [FLAGS] ./integration_tests.py install-server -which (in some minutes) will give you a fully dockerized server setup -(`docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`. -Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`, -the first thing to do is that you should first login in the docker container, by doing: +which (in some minutes) will give you a fully dockerized server setup. `docker container ls` will list the created container, and you can see what's going on inside with the standard `docker exec -it server /bin/bash`. +Now, suppose that you want to run `WorkloadManagementSystem/Test_JobDB.py`, the first thing to do is that you should first login in the docker container, by doing: .. code-block:: bash @@ -326,7 +324,7 @@ Now you can run the test with: pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py -You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/` +You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/`. You can also login in client and mysql with: @@ -335,7 +333,45 @@ You can also login in client and mysql with: ./integration_tests.py exec-client ./integration_tests.py exec-mysql +To restart a service, you can go into the server docker, and run `runsvctrl`: +.. code-block:: bash + + ./integration_tests.py exec-server + ... + runsvctrl t /home/dirac/ServerInstallDIR/diracos/runit/WorkloadManagement/JobMonitor/ + +And you can also restart all services (it can take some time): + +.. code-block:: bash + + ./integration_tests.py exec-server + ... + runsvctrl t /home/dirac/ServerInstallDIR/diracos/runit/*/* + + +You can also test DiracX in integration tests. To do that, you have to provide in the `prepare-environment` command the following flag: `TEST_DIRACX=Yes`. It will run DiracX alongside DIRAC, and use the available and activated legacy adapted services. + +To deactivate a service from being used with DiracX, you can add it in `integration_tests.py` in the `DIRACX_DISABLED_SERVICES` list: + +.. code-block:: python + + DIRACX_DISABLED_SERVICES = [ + "WorkloadManagement/JobMonitoring", + ] + +By setting `TEST_DIRACX=Yes` only, it will take the last version of DiracX by default. If you want to provide your own, you have to build your DiracX project, and provide the `dist` folder path when calling `prepare-client`. This path has to be absolute. + +.. code-block:: bash + + ./integration-tests.py prepare-client TEST_DIRACX=Yes --diracx-dist-dir my-dist-folder/ + +It will then mount your dist folder into DIRAC and DiracX (in `/diracx_sources`) to install the right dependencies. + +For MacOS, there are two bugs that can be fixed. + +- The first one is about `docker compose` not being recognized. To fix that, you can set in your environment variables `DOCKER_COMPOSE_CMD="docker-compose"`. +- The second one, is for macs with M1 (or more recent ones) chips which are not supported by the `opensearch` docker image. By setting the `ES_PLATFORM` flag to `linux/arm64` you will be able to start `opensearch` without issue. Validation and System tests --------------------------- diff --git a/integration_tests.py b/integration_tests.py index bdca32baaf0..619bf6b705c 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -25,6 +25,9 @@ DEFAULT_HOST_OS = "el9" DEFAULT_MYSQL_VER = "mysql:8.4.4" DEFAULT_ES_VER = "opensearchproject/opensearch:2.18.0" +# In MacOSX with Arm (MX), there's an issue with opensearch +# You *must* set the ES_PLATFORM flag to `linux/arm64` to make it work. +DEFAULT_ES_PLATFORM = "linux/amd64" DEFAULT_IAM_VER = "indigoiam/iam-login-service:v1.10.2" FEATURE_VARIABLES = { "DIRACOSVER": "master", @@ -36,8 +39,12 @@ "INSTALLATION_BRANCH": "", "DEBUG": "Yes", } -DIRACX_OPTIONS = () DEFAULT_MODULES = {"DIRAC": Path(__file__).parent.absolute()} +# All services that have a FutureClient, but we *explicitly* deactivate +# (for example if we did not finish to develop it) +DIRACX_DISABLED_SERVICES = [ + "WorkloadManagement/JobMonitoring", +] # Static configuration DB_USER = "Dirac" @@ -71,6 +78,10 @@ } LOG_PATTERN = re.compile(r"^[\d\-]{10} [\d:]{8} UTC [^\s]+ ([A-Z]+):") +# In niche cases where we use MacOSX with Orbstack, some commands may not work with docker compose +# If you're in that case, set in your environment `export DOCKER_COMPOSE_CMD="docker-compose"` +DOCKER_COMPOSE_CMD = shlex.split(os.environ.get("DOCKER_COMPOSE_CMD", "docker compose")) + class NaturalOrderGroup(typer.core.TyperGroup): """Group for showing subcommands in the correct order""" @@ -140,6 +151,19 @@ def list_commands(self, ctx): After restarting your terminal you command completion is available using: typer ./integration_tests.py run ... + +## DiracX + +If you want to activate DiracX, you have to set the flag TEST_DIRACX to "Yes". +It will search for legacy adapted services (services with a future client activated) +and do the necessary to make DIRAC work alongside DiracX. + +To deactivate a legacy adapted service (to pass CI for example), you have to add it in +the `DIRACX_DISABLED_SERVICES` list. If you don't, the program will set this service to be used +with DiracX, and if it is badly adapted, errors will be raised. + +> Note that you can provide a DiracX project (repository, branch) by building it and providing +the dist folder to the prepare-environment command. """, ) @@ -193,8 +217,8 @@ def destroy(): typer.secho("Shutting down and removing containers", err=True, fg=c.GREEN) with _gen_docker_compose(DEFAULT_MODULES) as docker_compose_fn: os.execvpe( - "docker", - ["docker", "compose", "-f", docker_compose_fn, "down", "--remove-orphans", "-t", "0", "--volumes"], + DOCKER_COMPOSE_CMD[0], + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn, "down", "--remove-orphans", "-t", "0", "--volumes"], _make_env({}), ) @@ -253,7 +277,7 @@ def prepare_environment( typer.secho("Running docker compose to create containers", fg=c.GREEN) with _gen_docker_compose(modules, diracx_dist_dir=diracx_dist_dir) as docker_compose_fn: subprocess.run( - ["docker", "compose", "-f", docker_compose_fn, "up", "-d", "dirac-server", "dirac-client", "dirac-pilot"] + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn, "up", "-d", "dirac-server", "dirac-client", "dirac-pilot"] + extra_services, check=True, env=docker_compose_env, @@ -360,7 +384,7 @@ def prepare_environment( subStderr = open(docker_compose_fn_final / "stderr", "w") subprocess.Popen( - ["docker", "compose", "-f", docker_compose_fn_final / "docker-compose.yml", "up", "-d", "diracx"], + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn_final / "docker-compose.yml", "up", "-d", "diracx"], env=docker_compose_env, stdin=None, stdout=subStdout, @@ -569,7 +593,7 @@ def _gen_docker_compose(modules, *, diracx_dist_dir=None): # Load the docker compose configuration and mount the necessary volumes input_fn = Path(__file__).parent / "tests/CI/docker-compose.yml" docker_compose = yaml.safe_load(input_fn.read_text()) - # diracx-wait-for-db needs the volume to be able to run the witing script + # diracx-wait-for-db needs the volume to be able to run the waiting script for ctn in ("dirac-server", "dirac-client", "dirac-pilot", "diracx-wait-for-db"): if "volumes" not in docker_compose["services"][ctn]: docker_compose["services"][ctn]["volumes"] = [] @@ -619,7 +643,7 @@ def _gen_docker_compose(modules, *, diracx_dist_dir=None): def _check_containers_running(*, is_up=True): with _gen_docker_compose(DEFAULT_MODULES) as docker_compose_fn: running_containers = subprocess.run( - ["docker", "compose", "-f", docker_compose_fn, "ps", "-q", "-a"], + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn, "ps", "-q", "-a"], stdout=subprocess.PIPE, env=_make_env({}), # docker compose ps has a non-zero exit code when no containers are running @@ -701,6 +725,7 @@ def _make_env(flags): else: env["MYSQL_ADMIN_COMMAND"] = "mysqladmin" env["ES_VER"] = flags.pop("ES_VER", DEFAULT_ES_VER) + env["ES_PLATFORM"] = flags.pop("ES_PLATFORM", DEFAULT_ES_PLATFORM) env["IAM_VER"] = flags.pop("IAM_VER", DEFAULT_IAM_VER) if "CVMFS_DIR" not in env or not Path(env["CVMFS_DIR"]).is_dir(): typer.secho(f"CVMFS_DIR environment value: {env.get('CVMFS_DIR', 'NOT SET')}", fg=c.YELLOW) @@ -1163,10 +1188,16 @@ def _make_config(modules, flags, release_var, editable): typer.secho(f"Required feature variable {key!r} is missing", err=True, fg=c.RED) raise typer.Exit(code=1) - # If we test DiracX, enable all the options + # If we test DiracX, add specific config if config["TEST_DIRACX"].lower() in ("yes", "true"): - for key in DIRACX_OPTIONS: - config[key] = "Yes" + if DIRACX_DISABLED_SERVICES: + # We link all disabled services + # config["DIRACX_DISABLED_SERVICES"] = "Service1 Service2 Service3 ..." + diracx_disabled_services = " ".join(DIRACX_DISABLED_SERVICES) + + typer.secho(f"The following services won't be legacy adapted: {diracx_disabled_services}", fg="yellow") + + config["DIRACX_DISABLED_SERVICES"] = diracx_disabled_services config["TESTREPO"] = [f"/home/dirac/LocalRepo/TestCode/{name}" for name in modules] config["ALTERNATIVE_MODULES"] = [f"/home/dirac/LocalRepo/ALTERNATIVE_MODULES/{name}" for name in modules] diff --git a/src/DIRAC/ConfigurationSystem/Client/Helpers/Registry.py b/src/DIRAC/ConfigurationSystem/Client/Helpers/Registry.py index ddc17fb306e..69c69dfbe35 100644 --- a/src/DIRAC/ConfigurationSystem/Client/Helpers/Registry.py +++ b/src/DIRAC/ConfigurationSystem/Client/Helpers/Registry.py @@ -457,7 +457,7 @@ def getVOForGroup(group): :return: str """ - return getVO() or gConfig.getValue(f"{gBaseRegistrySection}/Groups/{group}/VO", "") + return gConfig.getValue(f"{gBaseRegistrySection}/Groups/{group}/VO", "") or getVO() def getIdPForGroup(group): diff --git a/src/DIRAC/Core/Security/DiracX.py b/src/DIRAC/Core/Security/DiracX.py index 5576d9704df..8e866dd28d7 100644 --- a/src/DIRAC/Core/Security/DiracX.py +++ b/src/DIRAC/Core/Security/DiracX.py @@ -47,7 +47,7 @@ def addTokenToPEM(pemPath, group): from DIRAC.Core.Base.Client import Client - vo = Registry.getVOMSVOForGroup(group) + vo = Registry.getVOForGroup(group) if not vo: gLogger.error(f"ERROR: Could not find VO for group {group}, DiracX will not work!") disabledVOs = gConfig.getValue("/DiracX/DisabledVOs", []) diff --git a/src/DIRAC/Core/Tornado/Client/ClientSelector.py b/src/DIRAC/Core/Tornado/Client/ClientSelector.py index 741a6dc20bf..57fa05f3f3c 100644 --- a/src/DIRAC/Core/Tornado/Client/ClientSelector.py +++ b/src/DIRAC/Core/Tornado/Client/ClientSelector.py @@ -17,7 +17,6 @@ from DIRAC.Core.DISET.TransferClient import TransferClient from DIRAC.Core.Tornado.Client.TornadoClient import TornadoClient - sLog = gLogger.getSubLogger(__name__) @@ -82,6 +81,10 @@ def ClientSelector(disetClient, *args, **kwargs): # We use same interface as RP rpc = tornadoClient(*args, **kwargs) else: rpc = disetClient(*args, **kwargs) + except NotImplementedError as e: + # We catch explicitly NotImplementedError to avoid just printing "there's an error" + # If we mis-configured the CS for legacy adapted services, we MUST have an error. + raise e except Exception as e: # pylint: disable=broad-except # If anything went wrong in the resolution, we return default RPCClient # So the behaviour is exactly the same as before implementation of Tornado diff --git a/src/DIRAC/Core/Utilities/Extensions.py b/src/DIRAC/Core/Utilities/Extensions.py index 9a7d55331e9..774fe37da00 100644 --- a/src/DIRAC/Core/Utilities/Extensions.py +++ b/src/DIRAC/Core/Utilities/Extensions.py @@ -73,6 +73,15 @@ def findServices(modules): return findModules(modules, "Service", "*Handler") +def findFutureServices(modules): + """Find the legacy adapted services for one or more DIRAC extension(s) + + :param list/str/module module: One or more Python modules or Python module names + :returns: list of tuples of the form (SystemName, ServiceName) + """ + return findModules(modules, "FutureClient") + + @iterateThenSort def findDatabases(module): """Find the DB SQL schema defintions for one or more DIRAC extension(s) @@ -182,7 +191,7 @@ def parseArgs(): parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(required=True, dest="function") defaultExtensions = extensionsByPriority() - for func in [findSystems, findAgents, findExecutors, findServices, findDatabases]: + for func in [findSystems, findAgents, findExecutors, findServices, findDatabases, findFutureServices]: subparser = subparsers.add_parser(func.__name__) subparser.add_argument("--extensions", nargs="+", default=defaultExtensions) subparser.set_defaults(func=func) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py b/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py index 33fc1be1003..ed8b304be3b 100755 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py @@ -14,12 +14,14 @@ @createClient("WorkloadManagement/JobMonitoring") class JobMonitoringClient(Client): + # Set to None to raise an error if this service is set as "legacy adapted" + # See ClientSelector + diracxClient = None + def __init__(self, **kwargs): super().__init__(**kwargs) self.setServer("WorkloadManagement/JobMonitoring") - diracxClient = futureJobMonitoringClient - @ignoreEncodeWarning def getJobsStatus(self, jobIDs): res = self._getRPC().getJobsStatus(jobIDs) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py b/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py index 5cd731f2bea..b09418fc9fd 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py @@ -4,10 +4,17 @@ from DIRAC.Core.Base.Client import Client, createClient +from DIRAC.WorkloadManagementSystem.FutureClient.PilotManagerClient import ( + PilotManagerClient as futurePilotManagerClient, +) + + @createClient("WorkloadManagement/PilotManager") class PilotManagerClient(Client): """PilotManagerClient sets url for the PilotManagerHandler.""" + diracxClient = futurePilotManagerClient + def __init__(self, url=None, **kwargs): """ Sets URL for PilotManager handler diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py index 52e3c5cdc62..7161b5274bb 100644 --- a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py @@ -77,11 +77,11 @@ def setJobApplicationStatus(self, jobID: str | int, appStatus: str, source: str def setJobAttribute(self, jobID: str | int, attribute: str, value: str): with DiracXClient() as api: if attribute == "Status": - api.jobs.set_job_statuses( + return api.jobs.set_job_statuses( {jobID: {datetime.now(tz=timezone.utc): {"Status": value}}}, ) else: - api.jobs.patch_metadata({jobID: {attribute: value}}) + return api.jobs.patch_metadata({jobID: {attribute: value}}) @stripValueIfOK @convertToReturnValue diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py new file mode 100644 index 00000000000..fb7f85fa4e5 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py @@ -0,0 +1,145 @@ +from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue +from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient + + +class PilotManagerClient(FutureClient): + @convertToReturnValue + def addPilotReferences(self, pilot_references, VO, gridType="DIRAC", pilot_stamps_dict={}): + with DiracXClient() as api: + pilot_stamps = pilot_stamps_dict.values() + pilot_ref_dict = dict(zip(pilot_stamps, pilot_references)) + + # We will move toward a stamp as identifier for the pilot + return api.pilots.add_pilot_stamps( + {"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_ref_dict} # type: ignore + ) + + def set_pilot_field(self, pilot_stamp, values_dict): + with DiracXClient() as api: + values_dict["PilotStamp"] = pilot_stamp + return api.pilots.update_pilot_fields({"pilot_stamps_to_fields_mapping": [values_dict]}) # type: ignore + + @convertToReturnValue + def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None): + return self.set_pilot_field( + pilot_stamp, + { + "Status": status, + "DestinationSite": destination, + "StatusReason": reason, + "GridSite": grid_site, + "Queue": queue, + }, + ) + + @convertToReturnValue + def deletePilot(self, pilot_stamp): + with DiracXClient() as api: + pilot_stamps = [pilot_stamp] + return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) + + @convertToReturnValue + def getJobsForPilotByStamp(self, pilotStamp): + with DiracXClient() as api: + return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp) + + @convertToReturnValue + def getPilots(self, job_id): + with DiracXClient() as api: + pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id) + search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] + return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore + + @convertToReturnValue + def getPilotInfo(self, pilot_reference): + """Important: We assume that to one stamp is mapped one pilot.""" + with DiracXClient() as api: + search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}] + pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore + + if not pilot: + # Return an error as in the legacy code + return [] + + # Convert all bools in pilot to str + for k, v in pilot.items(): + if isinstance(v, bool): + pilot[k] = str(v) + + # Transform the list of pilots into a dict keyed by PilotJobReference + resDict = {} + + pilotRef = pilot.get("PilotJobReference", None) + assert pilot_reference == pilotRef + pilotStamp = pilot.get("PilotStamp", None) + + if pilotRef is not None: + resDict[pilotRef] = pilot + else: + # Fallback: use PilotStamp or another key if PilotJobReference is missing + resDict[pilotStamp] = pilot + + jobIDs = self.getJobsForPilotByStamp(pilotStamp) + if jobIDs: # Only add if jobs exist + for pilotRef, pilotInfo in resDict.items(): + pilotInfo["Jobs"] = jobIDs # Attach the entire list + + return resDict + + @convertToReturnValue + def getGroupedPilotSummary(self, column_list): + with DiracXClient() as api: + return api.pilots.summary(grouping=column_list) + + @convertToReturnValue + def getPilotSummary(self, startdate="", enddate=""): + with DiracXClient() as api: + search_filters = [] + if startdate: + search_filters.append({"parameter": "SubmissionTime", "operator": "gt", "value": startdate}) + if enddate: + search_filters.append({"parameter": "SubmissionTime", "operator": "lt", "value": enddate}) + + rows = api.pilots.summary(grouping=["DestinationSite", "Status"], search=search_filters) + + # Build nested result: { site: { status: count }, Total: { status: total_count } } + summary_dict = {"Total": {}} + for row in rows: + site = row["DestinationSite"] + status = row["Status"] + count = row["count"] + + if site not in summary_dict: + summary_dict[site] = {} + + summary_dict[site][status] = count + summary_dict["Total"].setdefault(status, 0) + summary_dict["Total"][status] += count + + return summary_dict + + @convertToReturnValue + def deletePilots(self, pilot_stamps): + # Used by no one, but we won't raise `UnimplementedError` because we still use it in tests. + with DiracXClient() as api: + pilot_ids = None + if pilot_stamps and isinstance(pilot_stamps, list): + if isinstance(pilot_stamps[0], int): + # Multiple elements (int) + pilot_ids = pilot_stamps # Semantic + elif isinstance(pilot_stamps, int): + # Only one element (int) + pilot_ids = [pilot_stamps] + elif isinstance(pilot_stamps, str): + # Only one element (str) + pilot_stamps = [pilot_stamps] + # Else: pilot_stamps should be list[str] (or the input is random) + + if pilot_ids: + # If we have defined pilot_ids, then we have to change them to pilot_stamps + search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] + + pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore + pilot_stamps = [pilot["PilotStamp"] for pilot in pilots] + + return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index d02574687bf..a6dfbd15ded 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -4,12 +4,10 @@ import datetime import shutil -import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader -from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import ( getPilotCE, getPilotRef, @@ -38,46 +36,7 @@ def initializeHandler(cls, serviceInfoDict): return S_OK() ############################################################################## - types_getCurrentPilotCounters = [dict] - @classmethod - def export_getCurrentPilotCounters(cls, attrDict={}): - """Get pilot counters per Status with attrDict selection. Final statuses are given for - the last day. - """ - - result = cls.pilotAgentsDB.getCounters("PilotAgents", ["Status"], attrDict, timeStamp="LastUpdateTime") - if not result["OK"]: - return result - last_update = datetime.datetime.utcnow() - TimeUtilities.day - resultDay = cls.pilotAgentsDB.getCounters( - "PilotAgents", ["Status"], attrDict, newer=last_update, timeStamp="LastUpdateTime" - ) - if not resultDay["OK"]: - return resultDay - - resultDict = {} - for statusDict, count in result["Value"]: - status = statusDict["Status"] - resultDict[status] = count - if status in PilotStatus.PILOT_FINAL_STATES: - resultDict[status] = 0 - for statusDayDict, ccount in resultDay["Value"]: - if status == statusDayDict["Status"]: - resultDict[status] = ccount - break - - return S_OK(resultDict) - - ########################################################################################## - types_addPilotReferences = [list, str] - - @classmethod - def export_addPilotReferences(cls, pilotRef, VO, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotReferences(pilotRef, VO, gridType, pilotStampDict) - - ############################################################################## types_getPilotOutput = [str] def export_getPilotOutput(self, pilotReference): @@ -205,30 +164,6 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict): # return res, correct or not return res - ############################################################################## - types_getPilotInfo = [[list, str]] - - @classmethod - def export_getPilotInfo(cls, pilotReference): - """Get the info about a given pilot job reference""" - return cls.pilotAgentsDB.getPilotInfo(pilotReference) - - ############################################################################## - types_selectPilots = [dict] - - @classmethod - def export_selectPilots(cls, condDict): - """Select pilots given the selection conditions""" - return cls.pilotAgentsDB.selectPilots(condDict) - - ############################################################################## - types_storePilotOutput = [str, str, str] - - @classmethod - def export_storePilotOutput(cls, pilotReference, output, error): - """Store the pilot output and error""" - return cls.pilotAgentsDB.storePilotOutput(pilotReference, output, error) - ############################################################################## types_getPilotLoggingInfo = [str] @@ -278,6 +213,9 @@ def export_getPilotSummary(cls, startdate="", enddate=""): return cls.pilotAgentsDB.getPilotSummary(startdate, enddate) ############################################################################## + + # --------------- Moved to DiracX --------------- + types_getGroupedPilotSummary = [list] @classmethod @@ -291,53 +229,15 @@ def export_getGroupedPilotSummary(cls, columnList): """ return cls.pilotAgentsDB.getGroupedPilotSummary(columnList) - ############################################################################## - types_getPilots = [[str, int]] - - @classmethod - def export_getPilots(cls, jobID): - """Get pilots executing/having executed the Job""" - result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) - if not result["OK"] or not result["Value"]: - return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") - - return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) - - ############################################################################## - types_setJobForPilot = [[str, int], str] - - @classmethod - def export_setJobForPilot(cls, jobID, pilotRef, destination=None): - """Report the DIRAC job ID which is executed by the given pilot job""" - - result = cls.pilotAgentsDB.setJobForPilot(int(jobID), pilotRef) - if not result["OK"]: - return result - result = cls.pilotAgentsDB.setCurrentJobID(pilotRef, int(jobID)) - if not result["OK"]: - return result - if destination: - result = cls.pilotAgentsDB.setPilotDestinationSite(pilotRef, destination) - - return result - - ########################################################################################## - types_setPilotBenchmark = [str, float] - - @classmethod - def export_setPilotBenchmark(cls, pilotRef, mark): - """Set the pilot agent benchmark""" - return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark) - - ########################################################################################## - types_setAccountingFlag = [str] + ############################################# + types_addPilotReferences = [list, str] + # Moved to DiracX @classmethod - def export_setAccountingFlag(cls, pilotRef, mark="True"): - """Set the pilot AccountingSent flag""" - return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark) + def export_addPilotReferences(cls, pilotReferences, VO, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" + return cls.pilotAgentsDB.addPilotReferences(pilotReferences, VO, gridType, pilotStampDict) - ########################################################################################## types_setPilotStatus = [str, str] @classmethod @@ -348,23 +248,18 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None, pilotRef, status, destination=destination, statusReason=reason, gridSite=gridSite, queue=queue ) - ########################################################################################## - types_countPilots = [dict] - - @classmethod - def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"): - """Set the pilot agent status""" - - return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp) - - ########################################################################################## + ############################################# types_deletePilots = [[list, str, int]] @classmethod def export_deletePilots(cls, pilotIDs): + # Used by no one. We keep it for tests. if isinstance(pilotIDs, str): return cls.pilotAgentsDB.deletePilot(pilotIDs) + # And list[str]???? + # pilot_id>>>S<<< + if isinstance(pilotIDs, int): pilotIDs = [ pilotIDs, @@ -376,9 +271,22 @@ def export_deletePilots(cls, pilotIDs): return S_OK() - ############################################################################## - types_clearPilots = [int, int] + ############################################# + types_getPilots = [[str, int]] @classmethod - def export_clearPilots(cls, interval=30, aborted_interval=7): - return cls.pilotAgentsDB.clearPilots(interval, aborted_interval) + def export_getPilots(cls, jobID): + """Get pilots executing/having executed the Job""" + result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) + if not result["OK"] or not result["Value"]: + return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + + return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) + + ############################################# + types_getPilotInfo = [[list, str]] + + @classmethod + def export_getPilotInfo(cls, pilotReference): + """Get the info about a given pilot job reference""" + return cls.pilotAgentsDB.getPilotInfo(pilotReference) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index b1ef5ad80a9..b93abddc913 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -178,8 +178,6 @@ def export_getJobPilotOutput(self, jobID): pilotReference = res["Value"][cycle]["Pilot_Reference"] c = cycle - if pilotReference: - return self.pilotManager.getPilotOutput(pilotReference) return S_ERROR("No pilot job reference found") diff --git a/tests/.dirac-ci-config.yaml b/tests/.dirac-ci-config.yaml index 87fd088a218..b1dc78b5d93 100644 --- a/tests/.dirac-ci-config.yaml +++ b/tests/.dirac-ci-config.yaml @@ -4,7 +4,7 @@ config: CLIENT_UPLOAD_BASE64: SSBsaWtlIHBpenphIQo= CLIENT_UPLOAD_LFN: LFN:/vo/test_lfn.txt CLIENT_UPLOAD_FILE: test_lfn.txt - PILOT_INSTALLATION_COMMAND: dirac-pilot.py --modules /home/dirac/LocalRepo/ALTERNATIVE_MODULES/DIRAC -M 2 -N jenkins.cern.ch -Q jenkins-queue_not_important -n DIRAC.Jenkins.ch --pilotUUID=whatever12345 --CVMFS_locations=/home/dirac/ -o diracInstallOnly --wnVO=vo --debug + PILOT_INSTALLATION_COMMAND: dirac-pilot.py --modules /home/dirac/LocalRepo/ALTERNATIVE_MODULES/DIRAC -M 2 -N jenkins.cern.ch -Q jenkins-queue_not_important -n DIRAC.Jenkins.ch --pilotUUID=whatever12345 --CVMFS_locations=/home/dirac/ -o diracInstallOnly --wnVO=vo --debug --diracx_URL=http://diracx:8000/ -z --clientID=995ed3b9-d5bd-49d3-a7f4-7fc7dbd5a0cd PILOT_JSON: "{ \"timestamp\": \"2023-02-13T14:34:26.725499\", \"CEs\": { @@ -37,7 +37,7 @@ config: \"https://server:9135/Configuration/Server\" ] }" - PILOT_DOWNLOAD_COMMAND: "git clone --single-branch --branch master https://github.com/DIRACGrid/Pilot.git && mv Pilot/Pilot/*.py . && rm -rf Pilot" + PILOT_DOWNLOAD_COMMAND: "git clone --single-branch --branch robin-add-pilot-logging https://github.com/Robin-Van-de-Merghel/Pilot.git && mv Pilot/Pilot/*.py . && rm -rf Pilot" # List of feature variables which must be passed when preparing required-feature-flags: [] diff --git a/tests/CI/check_db_initialized.sh b/tests/CI/check_db_initialized.sh index 29d7b9f4af5..cea879afe40 100755 --- a/tests/CI/check_db_initialized.sh +++ b/tests/CI/check_db_initialized.sh @@ -1,22 +1,43 @@ #!/bin/bash -dbMissing=true; -allDBs=(AccountingDB FTS3DB JobDB JobLoggingDB PilotAgentsDB ProductionDB ProxyDB ReqDB ResourceManagementDB ResourceStatusDB SandboxMetadataDB StorageManagementDB TaskQueueDB TransformationDB) -while ${dbMissing}; -do - dbMissing=false; - allExistingDBs=$(mysql -uDirac -pDirac -h mysql -P 3306 -e "show databases;"); - for db in "${allDBs[@]}"; - do - if grep -q "${db}" <<< "${allExistingDBs}"; - then - echo "${db} OK"; - else - echo "${db} not created"; - dbMissing=true; - fi; - done; - if ${dbMissing}; - then - sleep 1; +DB_USER="Dirac" +DB_PASS="Dirac" +DB_HOST="mysql" +DB_PORT=3306 +DB_CMD="" + +# Detect available client: maria or mysql +if command -v mariadb >/dev/null 2>&1; then + DB_CMD="mariadb -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -P${DB_PORT}" +elif command -v mysql >/dev/null 2>&1; then + DB_CMD="mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -P${DB_PORT}" +else + echo "❌ Neither mysql nor mariadb client found in PATH." + exit 1 +fi + +echo "Using client: ${DB_CMD%% *}" + +dbMissing=true +allDBs=( + AccountingDB FTS3DB JobDB JobLoggingDB PilotAgentsDB ProductionDB + ProxyDB ReqDB ResourceManagementDB ResourceStatusDB + SandboxMetadataDB StorageManagementDB TaskQueueDB TransformationDB +) + +while $dbMissing; do + dbMissing=false + allExistingDBs=$($DB_CMD -e "SHOW DATABASES;" 2>/dev/null) + + for db in "${allDBs[@]}"; do + if grep -q "^${db}$" <<<"$allExistingDBs"; then + echo "✅ ${db} exists" + else + echo "⚠️ ${db} not created yet" + dbMissing=true fi + done + + $dbMissing && sleep 1 done + +echo "🎉 All databases are present." diff --git a/tests/CI/docker-compose.yml b/tests/CI/docker-compose.yml index 9bd69cccfbc..e981544d5da 100644 --- a/tests/CI/docker-compose.yml +++ b/tests/CI/docker-compose.yml @@ -10,6 +10,7 @@ volumes: services: mysql: + platform: linux/amd64 image: ${MYSQL_VER} container_name: mysql environment: @@ -24,6 +25,7 @@ services: pull_policy: always opensearch: + platform: ${ES_PLATFORM} image: ${ES_VER} container_name: opensearch hostname: opensearch @@ -39,6 +41,7 @@ services: pull_policy: always iam-login-service: + platform: linux/amd64 image: ${IAM_VER} container_name: iam-login-service hostname: iam-login-service @@ -58,6 +61,7 @@ services: pull_policy: always iam-init-keystore: + platform: linux/amd64 image: alpine:latest container_name: iam-init-keystore volumes: @@ -83,6 +87,7 @@ services: # Mock of an S3 storage s3-direct: + platform: linux/amd64 # Fix the version until https://github.com/adobe/S3Mock/issues/2321 # is resolved image: adobe/s3mock:3.12.0 @@ -97,6 +102,7 @@ services: pull_policy: always diracx-wait-for-db: + platform: linux/amd64 image: ${MYSQL_VER} container_name: diracx-wait-for-db depends_on: @@ -105,6 +111,7 @@ services: command: /home/dirac/LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/CI/check_db_initialized.sh dirac-init-certificates: + platform: linux/amd64 image: ghcr.io/diracgrid/management/certificates-generation:latest container_name: dirac-init-certificates volumes: @@ -114,6 +121,7 @@ services: pull_policy: always dirac-server: + platform: linux/amd64 image: ${CI_REGISTRY_IMAGE}/${HOST_OS}-dirac container_name: server hostname: server @@ -148,6 +156,7 @@ services: dirac-client: + platform: linux/amd64 image: ${CI_REGISTRY_IMAGE}/${HOST_OS}-dirac container_name: client hostname: client @@ -162,6 +171,7 @@ services: pull_policy: always dirac-pilot: + platform: linux/amd64 image: ${CI_REGISTRY_IMAGE}/${HOST_OS}-dirac container_name: pilot hostname: pilot @@ -187,6 +197,7 @@ services: diracx-chmod: + platform: linux/amd64 image: ghcr.io/diracgrid/diracx/secret-generation:latest container_name: diracx-chmod volumes: @@ -201,6 +212,7 @@ services: diracx-init-keystore: + platform: linux/amd64 image: ghcr.io/diracgrid/diracx/services:dev container_name: diracx-init-keystore depends_on: @@ -215,6 +227,7 @@ services: pull_policy: always diracx-init-cs: + platform: linux/amd64 image: ghcr.io/diracgrid/diracx/client:dev container_name: diracx-init-cs depends_on: @@ -233,6 +246,7 @@ services: pull_policy: always diracx-init-db: + platform: linux/amd64 image: ghcr.io/diracgrid/diracx/services:dev container_name: diracx-init-db depends_on: @@ -245,6 +259,7 @@ services: pull_policy: always diracx: + platform: linux/amd64 image: ghcr.io/diracgrid/diracx/services:dev container_name: diracx environment: @@ -254,7 +269,7 @@ services: - DIRACX_DB_URL_JOBLOGGINGDB=mysql+aiomysql://Dirac:Dirac@mysql/JobLoggingDB - DIRACX_DB_URL_SANDBOXMETADATADB=mysql+aiomysql://Dirac:Dirac@mysql/SandboxMetadataDB - DIRACX_DB_URL_PILOTAGENTSDB=mysql+aiomysql://Dirac:Dirac@mysql/PilotAgentsDB - - 'DIRACX_OS_DB_PILOTLOGSDB={"sqlalchemy_dsn": "mysql+aiomysql://Dirac:Dirac@mysql/PilotLogsDB"}' + - DIRACX_DB_URL_TASKQUEUEDB=mysql+aiomysql://Dirac:Dirac@mysql/TaskQueueDB - DIRACX_SERVICE_AUTH_TOKEN_KEYSTORE=file:///keystore/jwks.json - DIRACX_SERVICE_AUTH_TOKEN_ISSUER=http://diracx:8000 - DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS=["http://diracx:8000/docs/oauth2-redirect"] @@ -262,7 +277,12 @@ services: - DIRACX_SERVICE_AUTH_STATE_KEY=uSNPPtZ1EbC5np13zOwmWJ84Duix753Hejzk/u/MQE4= # Obtained with echo 'InsecureChangeMe' | base64 -d | openssl sha256 - DIRACX_LEGACY_EXCHANGE_HASHED_API_KEY=07cddf6948d316ac9d186544dc3120c4c6697d8f994619665985c0a5bf76265a - - DIRACX_SERVICE_JOBS_ENABLED=false + - 'DIRACX_OS_DB_PILOTLOGSDB={"hosts": "elastic:changeme@opensearch:9200", "use_ssl": false, "verify_certs": false}' + - 'DIRACX_OS_DB_JOBPARAMETERSDB={"hosts": "elastic:changeme@opensearch:9200", "use_ssl": false, "verify_certs": false}' + - DIRACX_SANDBOX_STORE_BUCKET_NAME=sandboxes + - DIRACX_SANDBOX_STORE_AUTO_CREATE_BUCKET=true + - 'DIRACX_SANDBOX_STORE_S3_CLIENT_KWARGS={"endpoint_url": "http://s3-direct:9090", "aws_access_key_id": "console", "aws_secret_access_key": "console123"}' + ports: - 8000:8000 depends_on: diff --git a/tests/CI/install_client.sh b/tests/CI/install_client.sh index 4d58cafa081..324902c4868 100755 --- a/tests/CI/install_client.sh +++ b/tests/CI/install_client.sh @@ -54,10 +54,10 @@ echo -e "*** $(date -u) **** Got the DIRAC tests ****\n" source "${DIRAC_CI_SETUP_SCRIPT}" - if [[ -n "${INSTALLATION_BRANCH}" ]]; then +if [[ -n "${INSTALLATION_BRANCH}" ]]; then # shellcheck disable=SC2034 DIRACSETUP=$(< "${INSTALL_CFG_FILE}" grep "Setup = " | cut -f5 -d " ") - fi +fi echo -e "*** $(date -u) **** Client INSTALLATION START ****\n" diff --git a/tests/CI/run_pilot.sh b/tests/CI/run_pilot.sh index f7827b8503f..2f88dacb20f 100755 --- a/tests/CI/run_pilot.sh +++ b/tests/CI/run_pilot.sh @@ -29,6 +29,8 @@ touch /home/dirac/etc/grid-security/vomses/vomses # Copy over the pilot proxy cp /ca/certs/pilot_proxy /tmp/x509up_u$UID +export X509_USER_PROXY=/tmp/x509up_u$UID + eval "${PILOT_DOWNLOAD_COMMAND}" echo "${PILOT_JSON}" > pilot.json diff --git a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py index fc3e5ec6c22..9a623500232 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py +++ b/tests/Integration/WorkloadManagementSystem/Test_Client_WMS.py @@ -444,12 +444,13 @@ def test_JobStateUpdateAndJobMonitoringMultiple(lfn: str) -> None: assert res["Value"]["Status"] == JobStatus.MATCHED assert res["Value"]["MinorStatus"] == "MinorStatus-matched" - res = jobStateUpdateClient.setJobAttribute(jobID, "Status", JobStatus.RUNNING) + res = jobStateUpdateClient.setJobStatus(jobID, JobStatus.RUNNING) assert res["OK"], res["Message"] res = jobMonitoringClient.getJobSummary(int(jobID)) assert res["OK"], res["Message"] - assert res["Value"]["Status"] == JobStatus.RUNNING + assert res["Value"]["Status"] != JobStatus.RUNNING # NOT equal: timedelta = 0 compared to above + assert res["Value"]["Status"] == JobStatus.MATCHED # Keep previous result finally: res = jobManagerClient.killJob(jobIDs) assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index a030e9f70e4..8c9efeefe6f 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -16,76 +16,61 @@ from DIRAC import gLogger from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient +import os gLogger.setLevel("VERBOSE") def test_PilotsDB(): pilots = PilotManagerClient() - webapp = WebAppClient() + is_diracx_enabled = os.getenv("TEST_DIRACX") == "Yes" + # webapp = WebAppClient() # ? # This will allow you to run the test again if necessary - for jobID in ["aPilot", "anotherPilot"]: - pilots.deletePilots(jobID) + for pilot_ref in ["aPilot", "anotherPilot"]: + res = pilots.deletePilot(pilot_ref) res = pilots.addPilotReferences(["aPilot"], "VO") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] - assert "Submitted" in res["Value"] + res = pilots.addPilotReferences(["aPilot"], "VO") + + # Duplicates to see if we have a conflict + # If supports diracx, then it should be detected + # But old DIRAC doesn't. + if is_diracx_enabled: + assert not res["OK"], res + assert "Conflict" in res["Message"] + res = pilots.deletePilots("aPilot") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] res = pilots.addPilotReferences(["anotherPilot"], "VO") assert res["OK"], res["Message"] - res = pilots.storePilotOutput("anotherPilot", "This is an output", "this is an error") - assert res["OK"], res["Message"] - res = pilots.getPilotOutput("anotherPilot") - assert res["OK"], res["Message"] - assert res["Value"] == { - "VO": "VO", - "StdErr": "this is an error", - "FileList": [], - "StdOut": "This is an output", - } + res = pilots.getPilotInfo("anotherPilot") assert res["OK"], res["Message"] assert res["Value"]["anotherPilot"]["AccountingSent"] == "False" assert res["Value"]["anotherPilot"]["PilotJobReference"] == "anotherPilot" - res = pilots.selectPilots({}) - assert res["OK"], res["Message"] res = pilots.getPilotSummary("", "") assert res["OK"], res["Message"] assert res["Value"]["Total"]["Submitted"] >= 1 - res = webapp.getPilotMonitorWeb({}, [], 0, 100) - assert res["OK"], res["Message"] - assert res["Value"]["TotalRecords"] >= 1 - res = webapp.getPilotMonitorSelectors() - assert res["OK"], res["Message"] - res = webapp.getPilotSummaryWeb({}, [], 0, 100) - assert res["OK"], res["Message"] - assert res["Value"]["TotalRecords"] >= 1 - res = pilots.setAccountingFlag("anotherPilot", "True") - assert res["OK"], res["Message"] res = pilots.setPilotStatus("anotherPilot", "Running") assert res["OK"], res["Message"] res = pilots.getPilotInfo("anotherPilot") assert res["OK"], res["Message"] - assert res["Value"]["anotherPilot"]["AccountingSent"] == "True" assert res["Value"]["anotherPilot"]["Status"] == "Running" - res = pilots.setJobForPilot(123, "anotherPilot") - assert res["OK"], res["Message"] - res = pilots.setPilotBenchmark("anotherPilot", 12.3) - assert res["OK"], res["Message"] - res = pilots.countPilots({}) - assert res["OK"], res["Message"] + if is_diracx_enabled: + res = pilots.getGroupedPilotSummary(["GridSite", "DestinationSite", "VO"]) + assert res["OK"], res["Message"] # We won't test result (hopefully tested in DiracX) res = pilots.deletePilots("anotherPilot") assert res["OK"], res["Message"] - res = pilots.getCurrentPilotCounters({}) - assert res["OK"], res["Message"] + + # Delete twice, second time an error is raised + # DiracX feature + if is_diracx_enabled: + res = pilots.deletePilots("anotherPilot") + assert not res["OK"], res["Message"] diff --git a/tests/Integration/all_integration_client_tests.sh b/tests/Integration/all_integration_client_tests.sh old mode 100644 new mode 100755 index 332d30a07ea..8992a344f87 --- a/tests/Integration/all_integration_client_tests.sh +++ b/tests/Integration/all_integration_client_tests.sh @@ -57,8 +57,9 @@ echo -e "*** $(date -u) **** WMS TESTS ****\n" pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_SandboxStoreClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) if [[ -z "${INSTALLATION_BRANCH}" ]]; then - pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_Client_WMS.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) + dirac-proxy-init -g prod "${DEBUG}" |& tee -a clientTestOutputs.txt + pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_PilotsClient.py" |& tee -a clientTestOutputs.txt; (( ERR |= "${?}" )) fi # Make sure we have the prod role for these tests to get the VmRpcOperator permission diff --git a/tests/Jenkins/dirac-cfg-setup-diracx.py b/tests/Jenkins/dirac-cfg-setup-diracx.py index a20e4ece7cd..7d73683aa85 100755 --- a/tests/Jenkins/dirac-cfg-setup-diracx.py +++ b/tests/Jenkins/dirac-cfg-setup-diracx.py @@ -12,6 +12,7 @@ def parse_args(): parser.add_argument("--disable-vo", nargs="+", help="Disable a VO", default=[]) parser.add_argument("--url", help="URL of the DiracX services") parser.add_argument("--credentials-dir", help="Directory where hostcert.pem/hostkey.pem can be found") + parser.add_argument("--legacy-adapted-services", nargs="+", help="Services that have legacy adaptors.", default=[]) args = parser.parse_args() DIRAC.initialize( @@ -21,10 +22,10 @@ def parse_args(): ) ) - main(args.url, args.disable_vo) + main(args.url, args.disable_vo, args.legacy_adapted_services) -def main(url: str, disabled_vos: list[str]): +def main(url: str, disabled_vos: list[str], legacy_adapted_services: list[str]): from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI csAPI = CSAPI() @@ -72,6 +73,21 @@ def main(url: str, disabled_vos: list[str]): csSyncCFG = CFG().loadFromDict(csSync) returnValueOrRaise(csAPI.mergeCFGUnderSection("/DiracX/", csSyncCFG)) + # Add service with legacy adaptors. + legacy = {"LegacyClientEnabled": {}} + for service in legacy_adapted_services: + print(f"Adding legacy adaptor for service {service}") + # Service name such as: system/name + system, name = service.split("/") + + if not system in legacy["LegacyClientEnabled"]: + legacy["LegacyClientEnabled"][system] = {} + + legacy["LegacyClientEnabled"][system][name] = "yes" + + legacyCFG = CFG().loadFromDict(legacy) + returnValueOrRaise(csAPI.mergeCFGUnderSection("/DiracX/", legacyCFG)) + returnValueOrRaise(csAPI.commit()) diff --git a/tests/Jenkins/dirac_ci.sh b/tests/Jenkins/dirac_ci.sh index b6b05ddbcc9..db60c7b9f62 100644 --- a/tests/Jenkins/dirac_ci.sh +++ b/tests/Jenkins/dirac_ci.sh @@ -164,9 +164,22 @@ installSite() { diracxSetupArgs=("--credentials-dir" "$SERVERINSTALLDIR/etc/grid-security") if [[ "${TEST_DIRACX:-}" = "Yes" ]]; then diracxSetupArgs+=("--url=${DIRACX_URL}") + + # Only if we have TEST_DIRACX we can have a legacy_adapted service, or it will crash + # "Missing mandatory /DiracX/URL configuration" + # Call findFutureServices and read services into an array + findFutureServices 'exclude' $DIRACX_DISABLED_SERVICES + mapfile -t futureServices < futureServices + + # If there are any remaining services, add them to args + if [[ ${#futureServices[@]} -gt 0 ]]; then + diracxSetupArgs+=(--legacy-adapted-services "${futureServices[@]}") + fi + else diracxSetupArgs+=("--disable-vo" "vo") fi + if ! python "${TESTCODE}/DIRAC/tests/Jenkins/dirac-cfg-setup-diracx.py" "${diracxSetupArgs[@]}"; then echo "ERROR: dirac-cfg-setup-diracx.py failed" >&2 exit 1 diff --git a/tests/Jenkins/utilities.sh b/tests/Jenkins/utilities.sh index 57a8f2dc2ca..a911f95dd12 100644 --- a/tests/Jenkins/utilities.sh +++ b/tests/Jenkins/utilities.sh @@ -159,6 +159,56 @@ findAgents(){ } +#------------------------------------------------------------------------------- +# findServices: +# +# gets all *legacy adapted* service names from *DIRAC code and writes them to a file +# named futureServices. Needs an input for searching +# Little bit different from findAgents as we allow multiple exclusions +# +#------------------------------------------------------------------------------- + + +findFutureServices() { + echo '==> [findFutureServices]' + + # If first argument is "exclude", take the rest as the exclusion string + local disabledPattern="" + if [[ "${1:-}" == "exclude" ]]; then + shift + # Join all remaining arguments into a single string + local disabledServices="$*" + echo "==> excluding: $disabledServices" + + # Convert services into a grep-friendly pattern (fixed strings) + # We replace spaces with newlines to create multiple patterns + disabledPattern=$(printf "%s" "$disabledServices" | tr ' ' '\n') + fi + + if ! cd "${SERVERINSTALLDIR}"; then + echo 'ERROR: cannot change to ' "${SERVERINSTALLDIR}" >&2 + exit 1 + fi + + # Build the list of services + python -m DIRAC.Core.Utilities.Extensions findFutureServices \ + | sed -e 's/System / /g' \ + -e 's/Handler//g' \ + -e 's/Client//g' \ + -e 's/ /\//g' \ + | grep -v "JobAgent" > futureServices + + # Apply exclusion if any + if [[ -n "$disabledPattern" ]]; then + # Use grep -F for fixed strings (space-safe) + grep -Fv -f <(printf "%s\n" $disabledPattern) futureServices > futureServices.tmp + mv futureServices.tmp futureServices + fi + + echo "found $(wc -l < futureServices)" +} + + #------------------------------------------------------------------------------- # findExecutors: # @@ -255,12 +305,6 @@ installDIRAC() { done fi - echo "==> Installing main branch of diracx" - installDIRACX core client - - echo "$DIRAC" - echo "$PATH" - echo "==> Done installing, now configuring" if ! dirac-proxy-init --nocs --no-upload; then @@ -285,6 +329,13 @@ installDIRAC() { fi echo '==> Done installDIRAC' + + echo "==> Installing main branch of diracx" + installDIRACX core client + + echo "$DIRAC" + echo "$PATH" + } ############################################################################## @@ -292,17 +343,20 @@ installDIRAC() { # Arguments: list of DiracX submodule module names to install (core, client, etc.) function installDIRACX() { + # If we have no dist, we get the last tag to use it + if [[ -z "${DIRACX_CUSTOM_SOURCE_PREFIXES:-}" ]]; then + # Install latest by default + pip install diracx + return 0 + fi + for wheel_name in "$@"; do - if [[ -n "${DIRACX_CUSTOM_SOURCE_PREFIXES:-}" ]]; then - wheels=( $(find "${DIRACX_CUSTOM_SOURCE_PREFIXES}" -name "diracx_${wheel_name}-*.whl") ) - if [[ ! ${#wheels[@]} -eq 1 ]]; then - echo "ERROR: Multiple or no wheels found for ${wheel_name} in ${DIRACX_CUSTOM_SOURCE_PREFIXES}" - exit 1 - fi - pip install "${wheels[0]}" - else - pip install "git+https://github.com/DIRACGrid/diracx.git@main#egg=diracx-${wheel_name}&subdirectory=diracx-${wheel_name}" + wheels=( $(find "${DIRACX_CUSTOM_SOURCE_PREFIXES}" -name "diracx_${wheel_name}-*.whl") ) + if [[ ! ${#wheels[@]} -eq 1 ]]; then + echo "ERROR: Multiple or no wheels found for ${wheel_name} in ${DIRACX_CUSTOM_SOURCE_PREFIXES}" + exit 1 fi + pip install "${wheels[0]}" done }