Skip to content

Commit 104ab94

Browse files
feat: Improved JobMonitoring and rebase on top of legacy adaptor
1 parent 652750d commit 104ab94

File tree

3 files changed

+65
-14
lines changed

3 files changed

+65
-14
lines changed

.github/workflows/integration.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
name: Integration tests
22

33
env:
4-
DIRACX_REPO: default
5-
DIRACX_BRANCH: main
4+
DIRACX_REPO: https://github.com/Robin-Van-de-Merghel/diracx.git
5+
DIRACX_BRANCH: robin-update-job-monitoring
66

77
on:
88
push:

src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ def __init__(self, **kwargs):
1818
super().__init__(**kwargs)
1919
self.setServer("WorkloadManagement/JobMonitoring")
2020

21-
# Set to NULL to avoid using it in ClientSelector
22-
diracxClient = None
21+
diracxClient = futureJobMonitoringClient
2322

2423
@ignoreEncodeWarning
2524
def getJobsStatus(self, jobIDs):

src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
class JobMonitoringClient:
77
def fetch(self, parameters, jobIDs):
8+
if not isinstance(jobIDs, list):
9+
jobIDs = [jobIDs]
810
with DiracXClient() as api:
911
jobs = api.jobs.search(
1012
parameters=["JobID"] + parameters,
@@ -34,12 +36,10 @@ def getJobsStatus(self, jobIDs):
3436

3537
@convertToReturnValue
3638
def getJobParameter(self, jobID, parName):
37-
# with DiracXClient() as api:
38-
# parameters = api.jobs.get_job_parameters(
39-
# job_id=jobID
40-
# )
39+
with DiracXClient() as api:
40+
parameters = api.jobs.get_job_parameters(job_id=jobID)
4141

42-
# return parameters
42+
return parameters
4343
pass
4444

4545
@convertToReturnValue
@@ -94,11 +94,55 @@ def cleanLFN(data):
9494

9595
return return_dict
9696

97+
def convert_condDict_to_searchSpec(self, condDict):
98+
"""
99+
Convert condDict to list of SearchSpec dicts.
100+
101+
- If key is str: one SearchSpec
102+
- If key is tuple: multiple SearchSpec (one per key in tuple)
103+
- If value is dict: skip this condDict entry
104+
- list value -> operator 'in'
105+
- scalar value -> operator 'eq'
106+
"""
107+
search_list = []
108+
109+
if not condDict:
110+
return search_list
111+
112+
for key, val in condDict.items():
113+
if isinstance(val, dict):
114+
# abandon this condition entry
115+
continue
116+
117+
# Normalize keys: if tuple, treat each element as a separate key
118+
keys = [key] if isinstance(key, str) else list(key)
119+
120+
for k in keys:
121+
if not isinstance(k, str):
122+
# skip non-str keys silently (or raise error if you want)
123+
continue
124+
125+
if isinstance(val, list):
126+
search_list.append({"parameter": k, "operator": "in", "values": val})
127+
else:
128+
search_list.append({"parameter": k, "operator": "eq", "value": val})
129+
130+
return search_list
131+
97132
@convertToReturnValue
98-
def getJobs(self, attrDict, cutDate=None):
99-
# TODO: Check, selectJobs is awful...
100-
# See if we do it
101-
pass
133+
def getJobs(self, attrDict={}, cutDate=None):
134+
# Get job >>>IDS<<<
135+
with DiracXClient() as api:
136+
search = self.convert_condDict_to_searchSpec(attrDict)
137+
138+
if cutDate:
139+
# TODO: Verify the date format!
140+
search.append({"parameter": "LastUpdateTime", "operator": "gt", "value": str(cutDate)})
141+
142+
jobs = api.jobs.search(parameters=["JobID"], search=search)
143+
144+
# Returns a **string**??, cf test_JobStateUpdateAndJobMonitoringMultiple
145+
return [str(job["JobID"]) for job in jobs]
102146

103147
@convertToReturnValue
104148
def getJobJDL(self, jobID, original):
@@ -133,7 +177,15 @@ def getJobLoggingInfo(self, jobID):
133177

134178
@convertToReturnValue
135179
def getJobSummary(self, jobID):
136-
return self.fetch(parameters=[], jobIDs=[jobID])
180+
with DiracXClient() as api:
181+
# TODO: Fix diracx client...
182+
jobs = api.jobs.summary(
183+
grouping=[], search=[{"parameter": "JobID", "operator": "eq", "value": jobID}] # type: ignore
184+
)
185+
186+
if jobs:
187+
return jobs[0]
188+
return []
137189

138190
@convertToReturnValue
139191
def getJobsSummary(self, jobIDs):

0 commit comments

Comments
 (0)