Skip to content

Commit 1394f44

Browse files
feat: Migrate JobMonitoring toward DiracX
1 parent 5fd7250 commit 1394f44

File tree

3 files changed

+186
-2
lines changed

3 files changed

+186
-2
lines changed

.github/workflows/integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
- name: Building wheels
6363
run: |
6464
# Clone diracx
65-
git clone --single-branch --branch robin-pilot-legacy-logging https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx
65+
git clone --single-branch --branch robin-update-job-monitoring https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx
6666
6767
# Create dist dir
6868
mkdir -p $GITHUB_WORKSPACE/diracx-dist

src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ def __init__(self, **kwargs):
2222
super().__init__(**kwargs)
2323
self.setServer("WorkloadManagement/JobMonitoring")
2424

25+
diracxClient = futureJobMonitoringClient
26+
2527
@ignoreEncodeWarning
2628
def getJobsStatus(self, jobIDs):
2729
res = self._getRPC().getJobsStatus(jobIDs)

src/DIRAC/WorkloadManagementSystem/FutureClient/JobMonitoringClient.py

Lines changed: 183 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
22

33
from DIRAC.Core.Security.DiracX import DiracXClient
4+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL
45

56

67
class JobMonitoringClient:
78
def fetch(self, parameters, jobIDs):
9+
if not isinstance(jobIDs, list):
10+
jobIDs = [jobIDs]
811
with DiracXClient() as api:
912
jobs = api.jobs.search(
1013
parameters=["JobID"] + parameters,
11-
search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}],
14+
search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}], # type: ignore
1215
)
1316
return {j["JobID"]: {param: j[param] for param in parameters} for j in jobs}
1417

@@ -20,6 +23,185 @@ def getJobsMinorStatus(self, jobIDs):
2023
def getJobsStates(self, jobIDs):
2124
return self.fetch(["Status", "MinorStatus", "ApplicationStatus"], jobIDs)
2225

26+
@convertToReturnValue
27+
def getJobsApplicationStatus(self, jobIDs):
28+
return self.fetch(["ApplicationStatus"], jobIDs)
29+
2330
@convertToReturnValue
2431
def getJobsSites(self, jobIDs):
2532
return self.fetch(["Site"], jobIDs)
33+
34+
@convertToReturnValue
35+
def getJobsStatus(self, jobIDs):
36+
return self.fetch(["Status"], jobIDs)
37+
38+
@convertToReturnValue
39+
def getJobParameter(self, jobID, parName=""):
40+
with DiracXClient() as api:
41+
par = [] if not parName else [parName]
42+
return api.jobs.get_job_parameters(job_id=jobID, parameters=[par])
43+
44+
# @convertToReturnValue
45+
# def getJobParameters(self, jobIDs, parName=""):
46+
# with DiracXClient() as api:
47+
# par = [] if not parName else [parName]
48+
# return api.jobs.get_job_parameters(job_id=jobID, parameters=[parName])
49+
50+
@convertToReturnValue
51+
def getInputData(self, jobIDs):
52+
with DiracXClient() as api:
53+
# It's a mess
54+
if isinstance(jobIDs, int):
55+
jobIDs = [jobIDs]
56+
elif isinstance(jobIDs, str):
57+
jobIDs = [int(jobIDs)]
58+
elif isinstance(jobIDs, list):
59+
if isinstance(jobIDs[0], str):
60+
jobIDs = [int(jobID) for jobID in jobIDs]
61+
62+
reqDict = {}
63+
64+
for jobID in jobIDs:
65+
reqDict[jobID] = api.jobs.get_input_data(jobID) # type: ignore
66+
67+
def cleanLFN(data):
68+
lfn = data["LFN"]
69+
if lfn.lower().startswith("lfn:"):
70+
return lfn[4:]
71+
return lfn
72+
73+
# WARNING /!\: Old code is super bizarre.
74+
# It can return list as well as dict...
75+
# By default, it's better to return a dict.
76+
77+
return_dict = {}
78+
for data in reqDict:
79+
job_id = data["JobID"]
80+
if not job_id in return_dict:
81+
return_dict[job_id] = []
82+
83+
return_dict[job_id].append(cleanLFN(data))
84+
85+
return return_dict
86+
87+
def convert_condDict_to_searchSpec(self, condDict):
88+
"""
89+
Convert condDict to list of SearchSpec dicts.
90+
91+
- If key is str: one SearchSpec
92+
- If key is tuple: multiple SearchSpec (one per key in tuple)
93+
- If value is dict: skip this condDict entry
94+
- list value -> operator 'in'
95+
- scalar value -> operator 'eq'
96+
"""
97+
search_list = []
98+
99+
if not condDict:
100+
return search_list
101+
102+
for key, val in condDict.items():
103+
if isinstance(val, dict):
104+
# abandon this condition entry
105+
continue
106+
107+
# Normalize keys: if tuple, treat each element as a separate key
108+
keys = [key] if isinstance(key, str) else list(key)
109+
110+
for k in keys:
111+
if not isinstance(k, str):
112+
# skip non-str keys silently (or raise error if you want)
113+
continue
114+
115+
if isinstance(val, list):
116+
search_list.append({"parameter": k, "operator": "in", "values": val})
117+
else:
118+
search_list.append({"parameter": k, "operator": "eq", "value": val})
119+
120+
return search_list
121+
122+
@convertToReturnValue
123+
def getJobs(self, attrDict={}, cutDate=None):
124+
# Get job >>>IDS<<<
125+
with DiracXClient() as api:
126+
search = self.convert_condDict_to_searchSpec(attrDict)
127+
128+
if cutDate:
129+
# TODO: Verify the date format!
130+
search.append({"parameter": "LastUpdateTime", "operator": "gt", "value": str(cutDate)})
131+
132+
jobs = api.jobs.search(parameters=["JobID"], search=search)
133+
134+
# Returns a **string**??, cf test_JobStateUpdateAndJobMonitoringMultiple
135+
return [str(job["JobID"]) for job in jobs]
136+
137+
@convertToReturnValue
138+
def getJobJDL(self, jobID, original):
139+
with DiracXClient() as api:
140+
jdl = api.jobs.get_job_jdl(job_id=jobID)
141+
142+
if not jdl:
143+
return ""
144+
145+
if original:
146+
return extractJDL(jdl.original_jdl)
147+
148+
return extractJDL(jdl.jdl)
149+
150+
@convertToReturnValue
151+
def getJobLoggingInfo(self, jobID):
152+
fields = ["DateTime", "Source", "Status", "MinorStatus", "ApplicationStatus"] # CF Dirac.py
153+
154+
jobs = self.fetch(parameters=["LoggingInfo"] + fields, jobIDs=[jobID])
155+
156+
# Normally, only one job
157+
if len(jobs) == 0:
158+
return []
159+
job = jobs[0]
160+
161+
# Rearrange into a list, CF Dirac.py
162+
res = []
163+
for field in fields:
164+
res.append(job.get(field, ""))
165+
166+
return res
167+
168+
@convertToReturnValue
169+
def getJobSummary(self, jobID):
170+
with DiracXClient() as api:
171+
# TODO: Fix diracx client...
172+
jobs = api.jobs.summary(
173+
grouping=[], search=[{"parameter": "JobID", "operator": "eq", "value": jobID}] # type: ignore
174+
)
175+
176+
if jobs:
177+
return jobs[0]
178+
return []
179+
180+
@convertToReturnValue
181+
def getJobsSummary(self, jobIDs):
182+
return self.fetch(parameters=[], jobIDs=jobIDs)
183+
184+
@convertToReturnValue
185+
def getJobAttributes(self, jobID, attrList=None):
186+
if not attrList:
187+
attrList = []
188+
189+
return self.fetch(parameters=attrList, jobIDs=[jobID])
190+
191+
@convertToReturnValue
192+
def getJobAttribute(self, jobID, attribute):
193+
return self.fetch(parameters=[attribute], jobIDs=[jobID])
194+
195+
@convertToReturnValue
196+
def getJobHeartBeatData(self, jobID):
197+
with DiracXClient() as api:
198+
res = api.jobs.get_job_heartbeat_info(job_id=jobID)
199+
200+
result = []
201+
for row in res:
202+
name = row["Name"]
203+
value = str(row["Value"])
204+
heartbeattime = row["HeartBeatTime"]
205+
# ('name', '"0.1"', 'time')
206+
# :)
207+
result.append((str(name), "%.01f" % (float(value.replace('"', ""))), str(heartbeattime)))

0 commit comments

Comments
 (0)