From a519e5b77744d2b8214bafbe9b3ad9cbb4abdd90 Mon Sep 17 00:00:00 2001 From: Robin VAN DE MERGHEL Date: Fri, 1 Aug 2025 09:06:15 +0200 Subject: [PATCH] feat: Migrate JobMonitoring toward DiracX --- .github/workflows/integration.yml | 17 +- .../Client/JobMonitoringClient.py | 2 + .../FutureClient/JobMonitoringClient.py | 184 +++++++++++++++++- 3 files changed, 201 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 99690b2369b..41a8e886676 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -57,7 +57,22 @@ jobs: packaging \ pyyaml \ requests \ - typer + typer \ + build + - name: Building wheels + run: | + # Clone diracx + git clone --single-branch --branch robin-update-job-monitoring https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx + + # Create dist dir + mkdir -p $GITHUB_WORKSPACE/diracx-dist + + # Building diracx + for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do + echo "Building $pkg_dir" + python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir + done + - name: Prepare environment run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }} - name: Install server diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py b/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py index ed8b304be3b..a6b75fc82d4 100755 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py @@ -22,6 +22,8 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.setServer("WorkloadManagement/JobMonitoring") + diracxClient = futureJobMonitoringClient + @ignoreEncodeWarning def getJobsStatus(self, jobIDs): res = self._getRPC().getJobsStatus(jobIDs) diff --git a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py index e08b150954b..badf006132d 100644 --- a/src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py +++ b/src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py @@ -1,14 +1,17 @@ from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue from DIRAC.Core.Security.DiracX import DiracXClient +from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL class JobMonitoringClient: def fetch(self, parameters, jobIDs): + if not isinstance(jobIDs, list): + jobIDs = [jobIDs] with DiracXClient() as api: jobs = api.jobs.search( parameters=["JobID"] + parameters, - search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}], + search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}], # type: ignore ) return {j["JobID"]: {param: j[param] for param in parameters} for j in jobs} @@ -20,6 +23,185 @@ def getJobsMinorStatus(self, jobIDs): def getJobsStates(self, jobIDs): return self.fetch(["Status", "MinorStatus", "ApplicationStatus"], jobIDs) + @convertToReturnValue + def getJobsApplicationStatus(self, jobIDs): + return self.fetch(["ApplicationStatus"], jobIDs) + @convertToReturnValue def getJobsSites(self, jobIDs): return self.fetch(["Site"], jobIDs) + + @convertToReturnValue + def getJobsStatus(self, jobIDs): + return self.fetch(["Status"], jobIDs) + + @convertToReturnValue + def getJobParameter(self, jobID, parName=""): + with DiracXClient() as api: + par = [] if not parName else [parName] + return api.jobs.get_job_parameters(job_id=jobID, parameters=[par]) + + # @convertToReturnValue + # def getJobParameters(self, jobIDs, parName=""): + # with DiracXClient() as api: + # par = [] if not parName else [parName] + # return api.jobs.get_job_parameters(job_id=jobID, parameters=[parName]) + + @convertToReturnValue + def getInputData(self, jobIDs): + with DiracXClient() as api: + # It's a mess + if isinstance(jobIDs, int): + jobIDs = [jobIDs] + elif isinstance(jobIDs, str): + jobIDs = [int(jobIDs)] + elif isinstance(jobIDs, list): + if isinstance(jobIDs[0], str): + jobIDs = [int(jobID) for jobID in jobIDs] + + reqDict = {} + + for jobID in jobIDs: + reqDict[jobID] = api.jobs.get_input_data(jobID) # type: ignore + + def cleanLFN(data): + lfn = data["LFN"] + if lfn.lower().startswith("lfn:"): + return lfn[4:] + return lfn + + # WARNING /!\: Old code is super bizarre. + # It can return list as well as dict... + # By default, it's better to return a dict. + + return_dict = {} + for data in reqDict: + job_id = data["JobID"] + if not job_id in return_dict: + return_dict[job_id] = [] + + return_dict[job_id].append(cleanLFN(data)) + + return return_dict + + def convert_condDict_to_searchSpec(self, condDict): + """ + Convert condDict to list of SearchSpec dicts. + + - If key is str: one SearchSpec + - If key is tuple: multiple SearchSpec (one per key in tuple) + - If value is dict: skip this condDict entry + - list value -> operator 'in' + - scalar value -> operator 'eq' + """ + search_list = [] + + if not condDict: + return search_list + + for key, val in condDict.items(): + if isinstance(val, dict): + # abandon this condition entry + continue + + # Normalize keys: if tuple, treat each element as a separate key + keys = [key] if isinstance(key, str) else list(key) + + for k in keys: + if not isinstance(k, str): + # skip non-str keys silently (or raise error if you want) + continue + + if isinstance(val, list): + search_list.append({"parameter": k, "operator": "in", "values": val}) + else: + search_list.append({"parameter": k, "operator": "eq", "value": val}) + + return search_list + + @convertToReturnValue + def getJobs(self, attrDict={}, cutDate=None): + # Get job >>>IDS<<< + with DiracXClient() as api: + search = self.convert_condDict_to_searchSpec(attrDict) + + if cutDate: + # TODO: Verify the date format! + search.append({"parameter": "LastUpdateTime", "operator": "gt", "value": str(cutDate)}) + + jobs = api.jobs.search(parameters=["JobID"], search=search) + + # Returns a **string**??, cf test_JobStateUpdateAndJobMonitoringMultiple + return [str(job["JobID"]) for job in jobs] + + @convertToReturnValue + def getJobJDL(self, jobID, original): + with DiracXClient() as api: + jdl = api.jobs.get_job_jdl(job_id=jobID) + + if not jdl: + return "" + + if original: + return extractJDL(jdl.original_jdl) + + return extractJDL(jdl.jdl) + + @convertToReturnValue + def getJobLoggingInfo(self, jobID): + fields = ["DateTime", "Source", "Status", "MinorStatus", "ApplicationStatus"] # CF Dirac.py + + jobs = self.fetch(parameters=["LoggingInfo"] + fields, jobIDs=[jobID]) + + # Normally, only one job + if len(jobs) == 0: + return [] + job = jobs[0] + + # Rearrange into a list, CF Dirac.py + res = [] + for field in fields: + res.append(job.get(field, "")) + + return res + + @convertToReturnValue + def getJobSummary(self, jobID): + with DiracXClient() as api: + # TODO: Fix diracx client... + jobs = api.jobs.summary( + grouping=[], search=[{"parameter": "JobID", "operator": "eq", "value": jobID}] # type: ignore + ) + + if jobs: + return jobs[0] + return [] + + @convertToReturnValue + def getJobsSummary(self, jobIDs): + return self.fetch(parameters=[], jobIDs=jobIDs) + + @convertToReturnValue + def getJobAttributes(self, jobID, attrList=None): + if not attrList: + attrList = [] + + return self.fetch(parameters=attrList, jobIDs=[jobID]) + + @convertToReturnValue + def getJobAttribute(self, jobID, attribute): + return self.fetch(parameters=[attribute], jobIDs=[jobID]) + + @convertToReturnValue + def getJobHeartBeatData(self, jobID): + with DiracXClient() as api: + res = api.jobs.get_job_heartbeat_info(job_id=jobID) + + result = [] + for row in res: + name = row["Name"] + value = str(row["Value"]) + heartbeattime = row["HeartBeatTime"] + # ('name', '"0.1"', 'time') + # :) + result.append((str(name), "%.01f" % (float(value.replace('"', ""))), str(heartbeattime)))