Skip to content

Commit 0914453

Browse files
committed
fix: Removing MonitorClient in more files
1 parent 81cc095 commit 0914453

File tree

14 files changed

+55
-290
lines changed

14 files changed

+55
-290
lines changed

src/DIRAC/Core/Utilities/ExecutorDispatcher.py

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from DIRAC import S_OK, S_ERROR, gLogger
77
from DIRAC.Core.Utilities.ReturnValues import isReturnStructure
88
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
9-
from DIRAC.FrameworkSystem.Client.MonitoringClient import MonitoringClient
109
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
1110

1211

@@ -310,9 +309,7 @@ def __init__(self, monitor=None):
310309
self.__states = ExecutorState(self.__log)
311310
self.__cbHolder = ExecutorDispatcherCallbacks()
312311
self.__monitor = None
313-
if isinstance(monitor, MonitoringClient):
314-
self.__monitor = monitor
315-
elif isinstance(monitor, MonitoringReporter):
312+
if isinstance(monitor, MonitoringReporter):
316313
self.__monitoringReporter = monitor
317314
gThreadScheduler.addPeriodicTask(60, self.__doPeriodicStuff)
318315
# If a task is frozen too many times, send error or forget task?
@@ -321,16 +318,6 @@ def __init__(self, monitor=None):
321318
self.__freezeOnFailedDispatch = True
322319
# If a task needs to go to an executor that has not connected. Freeze or forget the task?
323320
self.__freezeOnUnknownExecutor = True
324-
if self.__monitor:
325-
self.__monitor.registerActivity(
326-
"executors", "Executor reactors connected", "Executors", "executors", self.__monitor.OP_MEAN, 300
327-
)
328-
self.__monitor.registerActivity(
329-
"tasks", "Tasks processed", "Executors", "tasks", self.__monitor.OP_RATE, 300
330-
)
331-
self.__monitor.registerActivity(
332-
"taskTime", "Task processing time", "Executors", "seconds", self.__monitor.OP_MEAN, 300
333-
)
334321

335322
def setFailedOnTooFrozen(self, value):
336323
self.__failedOnTooFrozen = value
@@ -369,12 +356,6 @@ def __doPeriodicStuff(self):
369356
if not self.__monitor:
370357
return
371358
eTypes = self.__execTypes
372-
for eType in eTypes:
373-
try:
374-
self.__monitor.addMark("executors-%s" % eType, self.__execTypes[eType])
375-
except KeyError:
376-
pass
377-
self.__monitor.addMark("executors", len(self.__idMap))
378359

379360
def addExecutor(self, eId, eTypes, maxTasks=1):
380361
self.__log.verbose("Adding new %s executor to the pool %s" % (eId, ", ".join(eTypes)))
@@ -389,31 +370,6 @@ def addExecutor(self, eId, eTypes, maxTasks=1):
389370
for eType in eTypes:
390371
if eType not in self.__execTypes:
391372
self.__execTypes[eType] = 0
392-
if self.__monitor:
393-
self.__monitor.registerActivity(
394-
"executors-%s" % eType,
395-
"%s executor modules connected" % eType,
396-
"Executors",
397-
"executors",
398-
self.__monitor.OP_MEAN,
399-
300,
400-
)
401-
self.__monitor.registerActivity(
402-
"tasks-%s" % eType,
403-
"Tasks processed by %s" % eType,
404-
"Executors",
405-
"tasks",
406-
self.__monitor.OP_RATE,
407-
300,
408-
)
409-
self.__monitor.registerActivity(
410-
"taskTime-%s" % eType,
411-
"Task processing time for %s" % eType,
412-
"Executors",
413-
"seconds",
414-
self.__monitor.OP_MEAN,
415-
300,
416-
)
417373
self.__execTypes[eType] += 1
418374
finally:
419375
self.__executorsLock.release()
@@ -690,12 +646,6 @@ def __taskReceived(self, taskId, eId):
690646
self.removeExecutor(eId)
691647
self.__dispatchTask(taskId)
692648
return S_ERROR(errMsg)
693-
if self.__monitor:
694-
tTime = time.time() - self.__tasks[taskId].sendTime
695-
self.__monitor.addMark("taskTime-%s" % eTask.eType, tTime)
696-
self.__monitor.addMark("taskTime", tTime)
697-
self.__monitor.addMark("tasks-%s" % eTask.eType, 1)
698-
self.__monitor.addMark("tasks", 1)
699649
return S_OK(eTask.eType)
700650

701651
def freezeTask(self, eId, taskId, freezeTime, taskObj=False):

src/DIRAC/DataManagementSystem/Service/FileCatalogHandler.py

Lines changed: 4 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption
1212
from DIRAC import S_OK, S_ERROR
13-
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
1413
from DIRAC.DataManagementSystem.DB.FileCatalogDB import FileCatalogDB
1514

1615

@@ -65,54 +64,6 @@ def initializeHandler(cls, serviceInfo):
6564
databaseConfig[configKey] = configValue
6665
res = cls.fileCatalogDB.setConfig(databaseConfig)
6766

68-
gMonitor.registerActivity(
69-
"AddFile", "Amount of addFile calls", "FileCatalogHandler", "calls/min", gMonitor.OP_SUM
70-
)
71-
gMonitor.registerActivity(
72-
"AddFileSuccessful", "Files successfully added", "FileCatalogHandler", "files/min", gMonitor.OP_SUM
73-
)
74-
gMonitor.registerActivity(
75-
"AddFileFailed", "Files failed to add", "FileCatalogHandler", "files/min", gMonitor.OP_SUM
76-
)
77-
78-
gMonitor.registerActivity(
79-
"RemoveFile", "Amount of removeFile calls", "FileCatalogHandler", "calls/min", gMonitor.OP_SUM
80-
)
81-
gMonitor.registerActivity(
82-
"RemoveFileSuccessful", "Files successfully removed", "FileCatalogHandler", "files/min", gMonitor.OP_SUM
83-
)
84-
gMonitor.registerActivity(
85-
"RemoveFileFailed", "Files failed to remove", "FileCatalogHandler", "files/min", gMonitor.OP_SUM
86-
)
87-
88-
gMonitor.registerActivity(
89-
"AddReplica", "Amount of addReplica calls", "FileCatalogHandler", "calls/min", gMonitor.OP_SUM
90-
)
91-
gMonitor.registerActivity(
92-
"AddReplicaSuccessful", "Replicas successfully added", "FileCatalogHandler", "replicas/min", gMonitor.OP_SUM
93-
)
94-
gMonitor.registerActivity(
95-
"AddReplicaFailed", "Replicas failed to add", "FileCatalogHandler", "replicas/min", gMonitor.OP_SUM
96-
)
97-
98-
gMonitor.registerActivity(
99-
"RemoveReplica", "Amount of removeReplica calls", "FileCatalogHandler", "calls/min", gMonitor.OP_SUM
100-
)
101-
gMonitor.registerActivity(
102-
"RemoveReplicaSuccessful",
103-
"Replicas successfully removed",
104-
"FileCatalogHandler",
105-
"replicas/min",
106-
gMonitor.OP_SUM,
107-
)
108-
gMonitor.registerActivity(
109-
"RemoveReplicaFailed", "Replicas failed to remove", "FileCatalogHandler", "replicas/min", gMonitor.OP_SUM
110-
)
111-
112-
gMonitor.registerActivity(
113-
"ListDirectory", "Amount of listDirectory calls", "FileCatalogHandler", "calls/min", gMonitor.OP_SUM
114-
)
115-
11667
return res
11768

11869
########################################################################
@@ -243,25 +194,13 @@ def export_exists(self, lfns):
243194

244195
def export_addFile(self, lfns):
245196
"""Register supplied files"""
246-
gMonitor.addMark("AddFile", 1)
247-
res = self.fileCatalogDB.addFile(lfns, self.getRemoteCredentials())
248-
if res["OK"]:
249-
gMonitor.addMark("AddFileSuccessful", len(res.get("Value", {}).get("Successful", [])))
250-
gMonitor.addMark("AddFileFailed", len(res.get("Value", {}).get("Failed", [])))
251-
252-
return res
197+
return self.fileCatalogDB.addFile(lfns, self.getRemoteCredentials())
253198

254199
types_removeFile = [[list, dict, str]]
255200

256201
def export_removeFile(self, lfns):
257202
"""Remove the supplied lfns"""
258-
gMonitor.addMark("RemoveFile", 1)
259-
res = self.fileCatalogDB.removeFile(lfns, self.getRemoteCredentials())
260-
if res["OK"]:
261-
gMonitor.addMark("RemoveFileSuccessful", len(res.get("Value", {}).get("Successful", [])))
262-
gMonitor.addMark("RemoveFileFailed", len(res.get("Value", {}).get("Failed", [])))
263-
264-
return res
203+
return self.fileCatalogDB.removeFile(lfns, self.getRemoteCredentials())
265204

266205
types_setFileStatus = [dict]
267206

@@ -273,25 +212,13 @@ def export_setFileStatus(self, lfns):
273212

274213
def export_addReplica(self, lfns):
275214
"""Register supplied replicas"""
276-
gMonitor.addMark("AddReplica", 1)
277-
res = self.fileCatalogDB.addReplica(lfns, self.getRemoteCredentials())
278-
if res["OK"]:
279-
gMonitor.addMark("AddReplicaSuccessful", len(res.get("Value", {}).get("Successful", [])))
280-
gMonitor.addMark("AddReplicaFailed", len(res.get("Value", {}).get("Failed", [])))
281-
282-
return res
215+
return self.fileCatalogDB.addReplica(lfns, self.getRemoteCredentials())
283216

284217
types_removeReplica = [[list, dict, str]]
285218

286219
def export_removeReplica(self, lfns):
287220
"""Remove the supplied replicas"""
288-
gMonitor.addMark("RemoveReplica", 1)
289-
res = self.fileCatalogDB.removeReplica(lfns, self.getRemoteCredentials())
290-
if res["OK"]:
291-
gMonitor.addMark("RemoveReplicaSuccessful", len(res.get("Value", {}).get("Successful", [])))
292-
gMonitor.addMark("RemoveReplicaFailed", len(res.get("Value", {}).get("Failed", [])))
293-
294-
return res
221+
return self.fileCatalogDB.removeReplica(lfns, self.getRemoteCredentials())
295222

296223
types_setReplicaStatus = [[list, dict, str]]
297224

@@ -398,7 +325,6 @@ def export_removeDirectory(self, lfns):
398325

399326
def export_listDirectory(self, lfns, verbose):
400327
"""List the contents of supplied directories"""
401-
gMonitor.addMark("ListDirectory", 1)
402328
return self.fileCatalogDB.listDirectory(lfns, self.getRemoteCredentials(), verbose=verbose)
403329

404330
types_isDirectory = [[list, dict, str]]

src/DIRAC/FrameworkSystem/Service/PlottingHandler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import hashlib
66

77
from DIRAC import S_OK, S_ERROR, rootPath, gConfig, gLogger
8-
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
98
from DIRAC.ConfigurationSystem.Client import PathFinder
109
from DIRAC.Core.DISET.RequestHandler import RequestHandler
1110
from DIRAC.FrameworkSystem.Service.PlotCache import gPlotCache
@@ -34,7 +33,6 @@ def initializePlottingHandler(serviceInfo):
3433
return S_ERROR("Data location is not writable")
3534

3635
gPlotCache.setPlotsLocation(dataPath)
37-
gMonitor.registerActivity("plotsDrawn", "Drawn plot images", "Plotting requests", "plots", gMonitor.OP_SUM)
3836
return S_OK()
3937

4038

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,50 @@
1-
#!/usr/bin/env python
2-
import sys
3-
from DIRAC.Core.Base.Script import Script
4-
5-
6-
@Script()
7-
def main():
8-
Script.parseCommandLine(ignoreErrors=True)
9-
10-
fieldsToShow = ("ComponentName", "Type", "Host", "Port", "Status", "Message")
11-
12-
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
13-
14-
result = gMonitor.getComponentsStatusWebFormatted(sortingList=[["ComponentName", "ASC"]])
15-
if not result["OK"]:
16-
print("ERROR: %s" % result["Message"])
17-
sys.exit(1)
18-
paramNames = result["Value"]["ParameterNames"]
19-
records = result["Value"]["Records"]
20-
fieldLengths = []
21-
for param in paramNames:
22-
fieldLengths.append(len(param))
23-
24-
for record in records:
25-
for i, _ in enumerate(record):
26-
if paramNames[i] in fieldsToShow:
27-
fieldLengths[i] = max(fieldLengths[i], len(str(record[i])))
28-
# Print time!
29-
line = []
30-
sepLine = []
31-
for i, param in enumerate(paramNames):
32-
if param in fieldsToShow:
33-
line.append("%s%s" % (param, " " * (fieldLengths[i] - len(param))))
34-
sepLine.append("-" * fieldLengths[i])
35-
print("|".join(line))
36-
sepLine = "+".join(sepLine)
37-
print(sepLine)
38-
for record in records:
39-
line = []
40-
for i, _ in enumerate(record):
41-
if paramNames[i] in fieldsToShow:
42-
val = str(record[i])
43-
line.append("%s%s" % (val, " " * (fieldLengths[i] - len(val))))
44-
print("|".join(line))
45-
# print sepLine
46-
47-
48-
if __name__ == "__main__":
49-
main()
1+
""""What to do with this?"""
2+
# #!/usr/bin/env python
3+
# import sys
4+
# from DIRAC.Core.Base.Script import Script
5+
6+
7+
# @Script()
8+
# def main():
9+
# Script.parseCommandLine(ignoreErrors=True)
10+
11+
# fieldsToShow = ("ComponentName", "Type", "Host", "Port", "Status", "Message")
12+
13+
# from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
14+
15+
# result = gMonitor.getComponentsStatusWebFormatted(sortingList=[["ComponentName", "ASC"]])
16+
# if not result["OK"]:
17+
# print("ERROR: %s" % result["Message"])
18+
# sys.exit(1)
19+
# paramNames = result["Value"]["ParameterNames"]
20+
# records = result["Value"]["Records"]
21+
# fieldLengths = []
22+
# for param in paramNames:
23+
# fieldLengths.append(len(param))
24+
25+
# for record in records:
26+
# for i, _ in enumerate(record):
27+
# if paramNames[i] in fieldsToShow:
28+
# fieldLengths[i] = max(fieldLengths[i], len(str(record[i])))
29+
# # Print time!
30+
# line = []
31+
# sepLine = []
32+
# for i, param in enumerate(paramNames):
33+
# if param in fieldsToShow:
34+
# line.append("%s%s" % (param, " " * (fieldLengths[i] - len(param))))
35+
# sepLine.append("-" * fieldLengths[i])
36+
# print("|".join(line))
37+
# sepLine = "+".join(sepLine)
38+
# print(sepLine)
39+
# for record in records:
40+
# line = []
41+
# for i, _ in enumerate(record):
42+
# if paramNames[i] in fieldsToShow:
43+
# val = str(record[i])
44+
# line.append("%s%s" % (val, " " * (fieldLengths[i] - len(val))))
45+
# print("|".join(line))
46+
# # print sepLine
47+
48+
49+
# if __name__ == "__main__":
50+
# main()

src/DIRAC/RequestManagementSystem/Agent/CleanReqDBAgent.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
# # from DIRAC
2828
from DIRAC import S_OK
29-
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
3029
from DIRAC.Core.Base.AgentModule import AgentModule
3130
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
3231
from DIRAC.RequestManagementSystem.Client.Request import Request
@@ -75,14 +74,6 @@ def initialize(self):
7574
self.log.info("Kick assigned requests period = %s hours" % self.KICK_GRACE_HOURS)
7675
self.KICK_LIMIT = self.am_getOption("KickLimit", self.KICK_LIMIT)
7776
self.log.info("Kick limit = %s request/cycle" % self.KICK_LIMIT)
78-
79-
# # gMonitor stuff
80-
gMonitor.registerActivity(
81-
"DeletedRequests", "Deleted finished requests", "CleanReqDBAgent", "Requests/min", gMonitor.OP_SUM
82-
)
83-
gMonitor.registerActivity(
84-
"KickedRequests", "Assigned requests kicked", "CleanReqDBAgent", "Requests/min", gMonitor.OP_SUM
85-
)
8677
return S_OK()
8778

8879
def execute(self):
@@ -147,8 +138,6 @@ def execute(self):
147138
continue
148139
deleted += 1
149140

150-
gMonitor.addMark("KickedRequests", kicked)
151-
gMonitor.addMark("DeletedRequests", deleted)
152141
self.log.info("execute: kicked assigned requests = %s" % kicked)
153142
self.log.info("execute: deleted finished requests = %s" % deleted)
154143
return S_OK()

0 commit comments

Comments
 (0)