Skip to content

Commit 78710ea

Browse files
committed
refactor: move DIRAC WMS WebApp related RPC calls to dedicated service
1 parent fa43318 commit 78710ea

File tree

10 files changed

+349
-311
lines changed

10 files changed

+349
-311
lines changed

src/DIRAC/MonitoringSystem/ConfigTemplate.cfg

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,13 @@ Services
2828
}
2929
}
3030
##END
31+
##BEGIN WebApp
32+
WebApp
33+
{
34+
Port = 9199
35+
Authorization
36+
{
37+
Default = authenticated
38+
}
39+
}
3140
}
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
"""
2+
The WebAppHandler module provides a class to handle web requests from the DIRAC WebApp.
3+
It is not indented to be used in diracx
4+
"""
5+
from DIRAC import S_ERROR, S_OK
6+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
7+
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
8+
from DIRAC.Core.DISET.RequestHandler import RequestHandler
9+
from DIRAC.Core.Utilities.JEncode import strToIntDict
10+
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
11+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy
12+
13+
14+
class WebAppHandler(RequestHandler):
15+
@classmethod
16+
def initializeHandler(cls, serviceInfoDict):
17+
"""Initialization of DB objects"""
18+
19+
try:
20+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
21+
if not result["OK"]:
22+
return result
23+
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
24+
25+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
26+
if not result["OK"]:
27+
return result
28+
cls.jobDB = result["Value"](parentLogger=cls.log)
29+
30+
except RuntimeError as excp:
31+
return S_ERROR(f"Can't connect to DB: {excp}")
32+
33+
return S_OK()
34+
35+
##############################################################################
36+
# PilotAgents
37+
##############################################################################
38+
39+
types_getPilotMonitorWeb = [dict, list, int, int]
40+
41+
@classmethod
42+
def export_getPilotMonitorWeb(cls, selectDict, sortList, startItem, maxItems):
43+
"""Get the summary of the pilot information for a given page in the
44+
pilot monitor in a generic format
45+
"""
46+
47+
return cls.pilotAgentsDB.getPilotMonitorWeb(selectDict, sortList, startItem, maxItems)
48+
49+
types_getPilotMonitorSelectors = []
50+
51+
@classmethod
52+
def export_getPilotMonitorSelectors(cls):
53+
"""Get all the distinct selector values for the Pilot Monitor web portal page"""
54+
55+
return cls.pilotAgentsDB.getPilotMonitorSelectors()
56+
57+
types_getPilotSummaryWeb = [dict, list, int, int]
58+
59+
@classmethod
60+
def export_getPilotSummaryWeb(cls, selectDict, sortList, startItem, maxItems):
61+
"""Get the summary of the pilot information for a given page in the
62+
pilot monitor in a generic format
63+
"""
64+
65+
return cls.pilotAgentsDB.getPilotSummaryWeb(selectDict, sortList, startItem, maxItems)
66+
67+
types_getPilotStatistics = [str, dict]
68+
69+
@classmethod
70+
def export_getPilotStatistics(cls, attribute, selectDict):
71+
"""Get pilot statistics distribution per attribute value with a given selection"""
72+
73+
startDate = selectDict.get("FromDate", None)
74+
if startDate:
75+
del selectDict["FromDate"]
76+
77+
if startDate is None:
78+
startDate = selectDict.get("LastUpdate", None)
79+
if startDate:
80+
del selectDict["LastUpdate"]
81+
endDate = selectDict.get("ToDate", None)
82+
if endDate:
83+
del selectDict["ToDate"]
84+
85+
result = cls.pilotAgentsDB.getCounters(
86+
"PilotAgents", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
87+
)
88+
statistics = {}
89+
if result["OK"]:
90+
for status, count in result["Value"]:
91+
statistics[status[attribute]] = count
92+
93+
return S_OK(statistics)
94+
95+
types_getPilotsCounters = [str, list, dict]
96+
97+
# This was PilotManagerHandler.getCounters
98+
@classmethod
99+
def export_getPilotsCounters(cls, table, keys, condDict, newer=None, timeStamp="SubmissionTime"):
100+
"""Set the pilot agent status"""
101+
102+
return cls.pilotAgentsDB.getCounters(table, keys, condDict, newer=newer, timeStamp=timeStamp)
103+
104+
##############################################################################
105+
# Jobs
106+
##############################################################################
107+
108+
types_getJobPageSummaryWeb = [dict, list, int, int]
109+
110+
def export_getJobPageSummaryWeb(self, selectDict, sortList, startItem, maxItems, selectJobs=True):
111+
"""Get the summary of the job information for a given page in the
112+
job monitor in a generic format
113+
"""
114+
115+
resultDict = {}
116+
117+
startDate, endDate, selectDict = self.parseSelectors(selectDict)
118+
119+
# initialize jobPolicy
120+
credDict = self.getRemoteCredentials()
121+
owner = credDict["username"]
122+
ownerGroup = credDict["group"]
123+
operations = Operations(group=ownerGroup)
124+
globalJobsInfo = operations.getValue("/Services/JobMonitoring/GlobalJobsInfo", True)
125+
jobPolicy = JobPolicy(owner, ownerGroup, globalJobsInfo)
126+
jobPolicy.jobDB = self.jobDB
127+
result = jobPolicy.getControlledUsers(RIGHT_GET_INFO)
128+
if not result["OK"]:
129+
return result
130+
if not result["Value"]:
131+
return S_ERROR(f"User and group combination has no job rights ({owner!r}, {ownerGroup!r})")
132+
if result["Value"] != "ALL":
133+
selectDict[("Owner", "OwnerGroup")] = result["Value"]
134+
135+
# Sorting instructions. Only one for the moment.
136+
if sortList:
137+
orderAttribute = sortList[0][0] + ":" + sortList[0][1]
138+
else:
139+
orderAttribute = None
140+
141+
result = self.jobDB.getCounters(
142+
"Jobs", ["Status"], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
143+
)
144+
if not result["OK"]:
145+
return result
146+
147+
statusDict = {}
148+
nJobs = 0
149+
for stDict, count in result["Value"]:
150+
nJobs += count
151+
statusDict[stDict["Status"]] = count
152+
153+
resultDict["TotalRecords"] = nJobs
154+
if nJobs == 0:
155+
return S_OK(resultDict)
156+
157+
resultDict["Extras"] = statusDict
158+
159+
if selectJobs:
160+
iniJob = startItem
161+
if iniJob >= nJobs:
162+
return S_ERROR("Item number out of range")
163+
164+
result = self.jobDB.selectJobs(
165+
selectDict, orderAttribute=orderAttribute, newer=startDate, older=endDate, limit=(maxItems, iniJob)
166+
)
167+
if not result["OK"]:
168+
return result
169+
170+
summaryJobList = result["Value"]
171+
if not globalJobsInfo:
172+
validJobs, _invalidJobs, _nonauthJobs, _ownJobs = jobPolicy.evaluateJobRights(
173+
summaryJobList, RIGHT_GET_INFO
174+
)
175+
summaryJobList = validJobs
176+
177+
res = self.jobDB.getJobsAttributes(summaryJobList)
178+
if not res["OK"]:
179+
return res
180+
return S_OK(strToIntDict(res["Value"]))
181+
182+
summaryDict = result["Value"]
183+
# If no jobs can be selected after the properties check
184+
if not summaryDict:
185+
return S_OK(resultDict)
186+
187+
# Evaluate last sign of life time
188+
for jobDict in summaryDict.values():
189+
if not jobDict.get("HeartBeatTime") or jobDict["HeartBeatTime"] == "None":
190+
jobDict["LastSignOfLife"] = jobDict["LastUpdateTime"]
191+
else:
192+
jobDict["LastSignOfLife"] = jobDict["HeartBeatTime"]
193+
194+
# prepare the standard structure now
195+
# This should be faster than making a list of values()
196+
for jobDict in summaryDict.values():
197+
paramNames = list(jobDict)
198+
break
199+
records = [list(jobDict.values()) for jobDict in summaryDict.values()]
200+
201+
resultDict["ParameterNames"] = paramNames
202+
resultDict["Records"] = records
203+
204+
return S_OK(resultDict)
205+
206+
types_getJobStats = [str, dict]
207+
208+
@classmethod
209+
def export_getJobStats(cls, attribute, selectDict):
210+
"""Get job statistics distribution per attribute value with a given selection"""
211+
startDate, endDate, selectDict = cls.parseSelectors(selectDict)
212+
result = cls.jobDB.getCounters(
213+
"Jobs", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
214+
)
215+
if not result["OK"]:
216+
return result
217+
resultDict = {}
218+
for cDict, count in result["Value"]:
219+
resultDict[cDict[attribute]] = count
220+
221+
return S_OK(resultDict)
222+
223+
@classmethod
224+
def parseSelectors(cls, selectDict=None):
225+
"""Parse selectors before DB query
226+
227+
:param dict selectDict: selectors
228+
229+
:return: str, str, dict -- start/end date, selectors
230+
"""
231+
selectDict = selectDict or {}
232+
233+
# Get time period
234+
startDate = selectDict.get("FromDate", None)
235+
if startDate:
236+
del selectDict["FromDate"]
237+
# For backward compatibility
238+
if startDate is None:
239+
startDate = selectDict.get("LastUpdate", None)
240+
if startDate:
241+
del selectDict["LastUpdate"]
242+
endDate = selectDict.get("ToDate", None)
243+
if endDate:
244+
del selectDict["ToDate"]
245+
246+
# Provide JobID bound to a specific PilotJobReference
247+
# There is no reason to have both PilotJobReference and JobID in selectDict
248+
# If that occurs, use the JobID instead of the PilotJobReference
249+
pilotJobRefs = selectDict.get("PilotJobReference")
250+
if pilotJobRefs:
251+
del selectDict["PilotJobReference"]
252+
if not selectDict.get("JobID"):
253+
for pilotJobRef in [pilotJobRefs] if isinstance(pilotJobRefs, str) else pilotJobRefs:
254+
res = cls.pilotAgentsDB.getPilotInfo(pilotJobRef)
255+
if res["OK"] and "Jobs" in res["Value"][pilotJobRef]:
256+
selectDict["JobID"] = selectDict.get("JobID", [])
257+
selectDict["JobID"].extend(res["Value"][pilotJobRef]["Jobs"])
258+
259+
return startDate, endDate, selectDict
260+
261+
types_getJobsCounters = [list]
262+
263+
# This was JobManagerHanlder.getCounters
264+
@classmethod
265+
def export_getJobsCounters(cls, attrList, attrDict=None, cutDate=""):
266+
"""
267+
Retrieve list of distinct attributes values from attrList
268+
with attrDict as condition.
269+
For each set of distinct values, count number of occurences.
270+
Return a list. Each item is a list with 2 items, the list of distinct
271+
attribute values and the counter
272+
"""
273+
274+
_, _, attrDict = cls.parseSelectors(attrDict)
275+
return cls.jobDB.getCounters("Jobs", attrList, attrDict, newer=str(cutDate), timeStamp="LastUpdateTime")
276+
277+
types_getSiteSummaryWeb = [dict, list, int, int]
278+
279+
@classmethod
280+
def export_getSiteSummaryWeb(cls, selectDict, sortList, startItem, maxItems):
281+
"""Get the summary of the jobs running on sites in a generic format
282+
283+
:param dict selectDict: selectors
284+
:param list sortList: sorting list
285+
:param int startItem: start item number
286+
:param int maxItems: maximum of items
287+
288+
:return: S_OK(dict)/S_ERROR()
289+
"""
290+
return cls.jobDB.getSiteSummaryWeb(selectDict, sortList, startItem, maxItems)
291+
292+
types_getSiteSummarySelectors = []
293+
294+
@classmethod
295+
def export_getSiteSummarySelectors(cls):
296+
"""Get all the distinct selector values for the site summary web portal page
297+
298+
:return: S_OK(dict)/S_ERROR()
299+
"""
300+
resultDict = {}
301+
statusList = ["Good", "Fair", "Poor", "Bad", "Idle"]
302+
resultDict["Status"] = statusList
303+
maskStatus = ["Active", "Banned", "NoMask", "Reduced"]
304+
resultDict["MaskStatus"] = maskStatus
305+
306+
res = getSites()
307+
if not res["OK"]:
308+
return res
309+
siteList = res["Value"]
310+
311+
countryList = []
312+
for site in siteList:
313+
if site.find(".") != -1:
314+
country = site.split(".")[2].lower()
315+
if country not in countryList:
316+
countryList.append(country)
317+
countryList.sort()
318+
resultDict["Country"] = countryList
319+
siteList.sort()
320+
resultDict["Site"] = siteList
321+
322+
return S_OK(resultDict)

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
2323
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
2424
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
25-
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
2625
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2726
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2827
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
28+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -262,7 +262,7 @@ def _getJobPilotStatus(self, jobID):
262262
# There is no pilot reference, hence its status is unknown
263263
return S_OK("NoPilot")
264264

265-
result = PilotManagerClient().getPilotInfo(pilotReference)
265+
result = PilotAgentsDB().getPilotInfo(pilotReference)
266266
if not result["OK"]:
267267
if DErrno.cmpError(result, DErrno.EWMSNOPILOT):
268268
self.log.warn("No pilot found", f"for job {jobID}: {result['Message']}")

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def sja(mocker):
2727
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB")
2828
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobMonitoringClient", return_value=MagicMock())
2929
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock())
30-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotManagerClient", return_value=MagicMock())
30+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3333

0 commit comments

Comments
 (0)