Skip to content

Commit e487029

Browse files
committed
feat: Add new PilotsHistory agent and monitoring type
1 parent 3e312f8 commit e487029

File tree

2 files changed

+90
-0
lines changed

2 files changed

+90
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
""" Definition for PilotsHistory Monitoring type.
2+
Filled by the agent "WorkloadManagement/PilotsHistoryAgent"
3+
"""
4+
5+
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
6+
7+
8+
class PilotsHistory(BaseType):
9+
"""
10+
.. class:: PilotsHistoryMonitorType
11+
"""
12+
13+
def __init__(self):
14+
"""
15+
:param self: self reference
16+
"""
17+
18+
super().__init__()
19+
20+
self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
21+
22+
self.monitoringFields = ["Pilots"]
23+
24+
self.index = "pilotshistory_index"
25+
26+
self.addMapping(
27+
{
28+
"TaskQueueID": {"type": "keyword"},
29+
"GridSite": {"type": "keyword"},
30+
"GridType": {"type": "keyword"},
31+
"Status": {"type": "keyword"},
32+
"Pilots": {"type": "long"},
33+
}
34+
)
35+
36+
self.checkType()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
""" PilotsHistoryAgent sends the number of pilots retrieved from PilotAgentsDB
2+
every 15 min to the Monitoring system to create historical plots.
3+
"""
4+
5+
from DIRAC import S_OK, S_ERROR
6+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
7+
from DIRAC.Core.Base.AgentModule import AgentModule
8+
from DIRAC.Core.Utilities import Time
9+
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
10+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
11+
12+
13+
class PilotsHistoryAgent(AgentModule):
14+
"""Agent that every 15 minutes will report a snapshot of
15+
the PilotAgentsDB to the Monitoring DB (ElasticSearch).
16+
"""
17+
18+
__summaryKeyFieldsMapping = ["TaskQueueID", "GridSite", "GridType", "Status"]
19+
__summaryValueFieldsMapping = ["Pilots"]
20+
21+
def initialize(self):
22+
# Loop every 15m
23+
self.am_setOption("PollingTime", 900)
24+
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory")
25+
26+
return S_OK()
27+
28+
def execute(self):
29+
# Retrieve the snapshot of the number of pilots
30+
result = PilotAgentsDB.getSummarySnapshot()
31+
now = Time.dateTime()
32+
if not result["OK"]:
33+
self.log.error("Can't get the PilotAgentsDB summary", "%s: won't commit at this cycle" % result["Message"])
34+
return S_ERROR()
35+
36+
values = result["Value"][1]
37+
for record in values:
38+
record = record[1:]
39+
rD = {}
40+
record = record[len(self.__summaryKeyFieldsMapping) :]
41+
for iP in range(len(self.__summaryValueFieldsMapping)):
42+
rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP])
43+
rD["timestamp"] = int(Time.toEpoch(now))
44+
self.log.verbose("Adding following record to Reporter: \n", rD)
45+
self.pilotReporter.addRecord(rD)
46+
47+
self.log.info("Committing to Monitoring")
48+
result = self.pilotReporter.commit()
49+
if not result["OK"]:
50+
self.log.error("Could not commit pilots history to Monitoring")
51+
return S_ERROR()
52+
self.log.verbose("Done committing to Monitoring")
53+
54+
return S_OK()

0 commit comments

Comments
 (0)