Skip to content

Commit 47f7282

Browse files
authored
Merge pull request #8254 from fstagni/90_jm_simplifications
[9.0] fix: removed several unused RPC calls from JobMonitoring
2 parents 5011800 + 22af916 commit 47f7282

File tree

10 files changed

+125
-360
lines changed

10 files changed

+125
-360
lines changed

src/DIRAC/MonitoringSystem/Service/WebAppHandler.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
77
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
88
from DIRAC.Core.DISET.RequestHandler import RequestHandler
9-
from DIRAC.Core.Utilities.JEncode import strToIntDict
109
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1110
from DIRAC.RequestManagementSystem.Client.Operation import Operation
1211
from DIRAC.RequestManagementSystem.Client.Request import Request
@@ -334,6 +333,74 @@ def export_getSiteSummarySelectors(cls):
334333

335334
return S_OK(resultDict)
336335

336+
types_getApplicationStates = []
337+
338+
@classmethod
339+
def export_getApplicationStates(cls, condDict=None, older=None, newer=None):
340+
"""Return Distinct Values of ApplicationStatus job Attribute in WMS"""
341+
return cls.jobDB.getDistinctJobAttributes("ApplicationStatus", condDict, older, newer)
342+
343+
types_getJobTypes = []
344+
345+
@classmethod
346+
def export_getJobTypes(cls, condDict=None, older=None, newer=None):
347+
"""Return Distinct Values of JobType job Attribute in WMS"""
348+
return cls.jobDB.getDistinctJobAttributes("JobType", condDict, older, newer)
349+
350+
types_getOwners = []
351+
352+
@classmethod
353+
def export_getOwners(cls, condDict=None, older=None, newer=None):
354+
"""
355+
Return Distinct Values of Owner job Attribute in WMS
356+
"""
357+
return cls.jobDB.getDistinctJobAttributes("Owner", condDict, older, newer)
358+
359+
types_getOwnerGroup = []
360+
361+
@classmethod
362+
def export_getOwnerGroup(cls):
363+
"""
364+
Return Distinct Values of OwnerGroup from the JobDB
365+
"""
366+
return cls.jobDB.getDistinctJobAttributes("OwnerGroup")
367+
368+
types_getJobGroups = []
369+
370+
@classmethod
371+
def export_getJobGroups(cls, condDict=None, older=None, cutDate=None):
372+
"""
373+
Return Distinct Values of ProductionId job Attribute in WMS
374+
"""
375+
return cls.jobDB.getDistinctJobAttributes("JobGroup", condDict, older, newer=cutDate)
376+
377+
types_getSites = []
378+
379+
@classmethod
380+
def export_getSites(cls, condDict=None, older=None, newer=None):
381+
"""
382+
Return Distinct Values of Site job Attribute in WMS
383+
"""
384+
return cls.jobDB.getDistinctJobAttributes("Site", condDict, older, newer)
385+
386+
types_getStates = []
387+
388+
@classmethod
389+
def export_getStates(cls, condDict=None, older=None, newer=None):
390+
"""
391+
Return Distinct Values of Status job Attribute in WMS
392+
"""
393+
return cls.jobDB.getDistinctJobAttributes("Status", condDict, older, newer)
394+
395+
types_getMinorStates = []
396+
397+
@classmethod
398+
def export_getMinorStates(cls, condDict=None, older=None, newer=None):
399+
"""
400+
Return Distinct Values of Minor Status job Attribute in WMS
401+
"""
402+
return cls.jobDB.getDistinctJobAttributes("MinorStatus", condDict, older, newer)
403+
337404
##############################################################################
338405
# Transformations
339406
##############################################################################

src/DIRAC/TransformationSystem/Agent/TaskManagerAgentBase.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,20 @@
77
In case you want to further extend it you are required to follow the note on the
88
initialize method and on the _getClients method.
99
"""
10-
import time
11-
import datetime
1210
import concurrent.futures
11+
import datetime
12+
import time
1313

14-
from DIRAC import S_OK
15-
14+
from DIRAC import S_OK, gConfig
15+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1616
from DIRAC.Core.Base.AgentModule import AgentModule
1717
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
18-
from DIRAC.Core.Utilities.List import breakListIntoChunks
1918
from DIRAC.Core.Utilities.Dictionaries import breakDictionaryIntoChunks
20-
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
19+
from DIRAC.Core.Utilities.List import breakListIntoChunks
20+
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
2121
from DIRAC.TransformationSystem.Client.FileReport import FileReport
22-
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
2322
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
24-
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
23+
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
2524
from DIRAC.WorkloadManagementSystem.Client import JobStatus
2625
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
2726

@@ -193,11 +192,9 @@ def execute(self):
193192
else:
194193
# Get the transformations which should be submitted
195194
self.tasksPerLoop = self.am_getOption("TasksPerLoop", self.tasksPerLoop)
196-
res = self.jobManagerClient.getMaxParametricJobs()
197-
if not res["OK"]:
198-
self.log.warn("Could not get the maxParametricJobs from JobManager", res["Message"])
199-
else:
200-
self.maxParametricJobs = res["Value"]
195+
self.maxParametricJobs = gConfig.getValue(
196+
"/Systems/WorkloadManagement/Services/JobManager/MaxParametricJobs", self.maxParametricJobs
197+
)
201198

202199
self._addOperationForTransformations(
203200
self.operationsOnTransformationDict,

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@
1616

1717
# # from DIRAC
1818
from DIRAC import S_ERROR, S_OK
19-
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
2019
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2120
from DIRAC.Core.Base.AgentModule import AgentModule
2221
from DIRAC.Core.Utilities.DErrno import cmpError
2322
from DIRAC.Core.Utilities.List import breakListIntoChunks
2423
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
2524
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
26-
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
2725
from DIRAC.RequestManagementSystem.Client.File import File
2826
from DIRAC.RequestManagementSystem.Client.Operation import Operation
2927
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
@@ -36,6 +34,7 @@
3634
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
3735
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3836
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
37+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
3938

4039
# # agent's name
4140
AGENT_NAME = "Transformation/TransformationCleaningAgent"
@@ -65,6 +64,8 @@ def __init__(self, *args, **kwargs):
6564
self.reqClient = None
6665
# # file catalog client
6766
self.metadataClient = None
67+
# # JobDB
68+
self.jobDB = None
6869

6970
# # transformations types
7071
self.transformationTypes = None
@@ -127,6 +128,8 @@ def initialize(self):
127128
self.metadataClient = FileCatalogClient()
128129
# # job monitoring client
129130
self.jobMonitoringClient = JobMonitoringClient()
131+
# # job DB
132+
self.jobDB = JobDB()
130133

131134
return S_OK()
132135

@@ -224,7 +227,7 @@ def finalize(self):
224227
So, we should just clean from time to time.
225228
What I added here is done only when the agent finalize, and it's quite light-ish operation anyway.
226229
"""
227-
res = self.jobMonitoringClient.getJobGroups(None, datetime.utcnow() - timedelta(days=365))
230+
res = self.jobDB.getDistinctJobAttributes("JobGroup", None, datetime.utcnow() - timedelta(days=365))
228231
if not res["OK"]:
229232
self.log.error("Failed to get job groups", res["Message"])
230233
return res

src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,6 @@ def getJobParameters(self, jobIDs, parName=None):
3838
res["Value"] = strToIntDict(res["Value"])
3939
return res
4040

41-
@ignoreEncodeWarning
42-
def getJobsParameters(self, jobIDs, parameters):
43-
res = self._getRPC().getJobsParameters(jobIDs, parameters)
44-
45-
# Cast the str keys to int
46-
if res["OK"]:
47-
res["Value"] = strToIntDict(res["Value"])
48-
return res
49-
5041
@ignoreEncodeWarning
5142
def getJobsMinorStatus(self, jobIDs):
5243
res = self._getRPC().getJobsMinorStatus(jobIDs)

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,64 +1190,6 @@ def rescheduleJob(self, jobID):
11901190

11911191
return retVal
11921192

1193-
#############################################################################
1194-
def getSiteSummary(self):
1195-
"""Get the summary of jobs in a given status on all the sites"""
1196-
1197-
waitingList = ['"Submitted"', '"Assigned"', '"Waiting"', '"Matched"']
1198-
waitingString = ",".join(waitingList)
1199-
1200-
result = self.getDistinctJobAttributes("Site")
1201-
if not result["OK"]:
1202-
return result
1203-
1204-
siteList = result["Value"]
1205-
siteDict = {}
1206-
totalDict = {
1207-
JobStatus.WAITING: 0,
1208-
JobStatus.RUNNING: 0,
1209-
JobStatus.STALLED: 0,
1210-
JobStatus.DONE: 0,
1211-
JobStatus.FAILED: 0,
1212-
}
1213-
1214-
for site in siteList:
1215-
if site == "ANY":
1216-
continue
1217-
# Waiting
1218-
siteDict[site] = {}
1219-
ret = self._escapeString(site)
1220-
if not ret["OK"]:
1221-
return ret
1222-
e_site = ret["Value"]
1223-
1224-
req = f"SELECT COUNT(JobID) FROM Jobs WHERE Status IN ({waitingString}) AND Site={e_site}"
1225-
result = self._query(req)
1226-
if result["OK"]:
1227-
count = result["Value"][0][0]
1228-
else:
1229-
return S_ERROR("Failed to get Site data from the JobDB")
1230-
siteDict[site][JobStatus.WAITING] = count
1231-
totalDict[JobStatus.WAITING] += count
1232-
# Running,Stalled,Done,Failed
1233-
for status in [
1234-
f'"{JobStatus.RUNNING}"',
1235-
f'"{JobStatus.STALLED}"',
1236-
f'"{JobStatus.DONE}"',
1237-
f'"{JobStatus.FAILED}"',
1238-
]:
1239-
req = f"SELECT COUNT(JobID) FROM Jobs WHERE Status={status} AND Site={e_site}"
1240-
result = self._query(req)
1241-
if result["OK"]:
1242-
count = result["Value"][0][0]
1243-
else:
1244-
return S_ERROR("Failed to get Site data from the JobDB")
1245-
siteDict[site][status.replace('"', "")] = count
1246-
totalDict[status.replace('"', "")] += count
1247-
1248-
siteDict["Total"] = totalDict
1249-
return S_OK(siteDict)
1250-
12511193
#################################################################################
12521194
def getSiteSummaryWeb(self, selectDict, sortList, startItem, maxItems):
12531195
"""Get the summary of jobs in a given status on all the sites in the standard Web form"""

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,6 @@ def __sendJobsToOptimizationMind(self, jids):
105105
return
106106
self.log.info("Optimize msg sent", f"for {len(jids)} jobs")
107107

108-
###########################################################################
109-
types_getMaxParametricJobs = []
110-
111-
def export_getMaxParametricJobs(self):
112-
"""Get the maximum number of parametric jobs
113-
114-
:return: S_OK()/S_ERROR()
115-
"""
116-
return S_OK(self.maxParametricJobs)
117-
118108
types_submitJob = [str]
119109

120110
def export_submitJob(self, jobDesc):

0 commit comments

Comments
 (0)