Skip to content

Commit be5dc8f

Browse files
authored
Merge pull request #5999 from rupozzi/pilothistory
[integration] PilotsHistory Monitoring
2 parents c68af61 + 827487b commit be5dc8f

File tree

5 files changed

+141
-17
lines changed

5 files changed

+141
-17
lines changed

docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ Monitoring System
1010
Overview
1111
=========
1212

13-
The Monitoring system is used to monitor various components of DIRAC. Currently, we have five monitoring types:
13+
The Monitoring system is used to monitor various components of DIRAC. Currently, we have several monitoring types:
1414

1515
- WMSHistory: for monitoring the DIRAC WMS
16+
- PilotsHistory: for monitoring of DIRAC pilots
1617
- Component Monitoring: for monitoring DIRAC components such as services, agents, etc.
1718
- RMS Monitoring: for monitoring the DIRAC RequestManagement System (mostly the Request Executing Agent).
1819
- PilotSubmission Monitoring: for monitoring the DIRAC pilot submission statistics from SiteDirector agents
@@ -122,6 +123,11 @@ You can configure the MQ in the local dirac.cfg file where the agent is running:
122123
Note: the JSON file already contains the index patterns needed for the visualizations. You may need to adapt the index patterns to your existing ones.
123124

124125

126+
Enable PilotsHistory monitoring
127+
===============================
128+
In order to enable PilotsHistory monitoring you need to set the flag ``monitoringEnabled = True`` in Operations/Defaults.
129+
130+
125131
Enable Component monitoring
126132
===========================
127133

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
""" Definition for PilotsHistory Monitoring type.
2+
Filled by the agent "WorkloadManagement/StatesAccountingAgent"
3+
"""
4+
5+
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
6+
7+
8+
class PilotsHistory(BaseType):
9+
"""
10+
.. class:: PilotsHistoryMonitorType
11+
"""
12+
13+
def __init__(self):
14+
"""
15+
:param self: self reference
16+
"""
17+
18+
super().__init__()
19+
20+
self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
21+
22+
self.monitoringFields = ["NumOfPilots"]
23+
24+
self.index = "pilotshistory_index"
25+
26+
self.addMapping(
27+
{
28+
"TaskQueueID": {"type": "keyword"},
29+
"GridSite": {"type": "keyword"},
30+
"GridType": {"type": "keyword"},
31+
"Status": {"type": "keyword"},
32+
"NumOfPilots": {"type": "long"},
33+
}
34+
)
35+
36+
self.checkType()

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" StatesAccountingAgent sends periodically numbers of jobs in various states for various
1+
""" StatesAccountingAgent sends periodically numbers of jobs and pilots in various states for various
22
sites to the Monitoring system to create historical plots.
33
44
.. literalinclude:: ../ConfigTemplate.cfg
@@ -8,20 +8,24 @@
88
:caption: StatesAccountingAgent options
99
"""
1010
from DIRAC import S_OK, S_ERROR
11+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1112
from DIRAC.Core.Base.AgentModule import AgentModule
1213
from DIRAC.Core.Utilities import Time
1314
from DIRAC.AccountingSystem.Client.Types.WMSHistory import WMSHistory
1415
from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient
1516
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
1617
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
18+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
1719

1820

1921
class StatesAccountingAgent(AgentModule):
2022
"""Agent that every 15 minutes will report
2123
to the AccountingDB (MySQL) or the Monitoring DB (ElasticSearch), or both,
2224
a snapshot of the JobDB.
25+
Also sends a snapshot of PilotAgentsDB to Monitoring.
2326
"""
2427

28+
# WMSHistory fields
2529
__summaryKeyFieldsMapping = [
2630
"Status",
2731
"Site",
@@ -36,24 +40,29 @@ class StatesAccountingAgent(AgentModule):
3640
__summaryValueFieldsMapping = ["Jobs", "Reschedules"]
3741
__renameFieldsMapping = {"JobType": "JobSplitType"}
3842

43+
# PilotsHistory fields
44+
__pilotKeyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
45+
__pilotValueFields = ["NumOfPilots"]
46+
3947
def initialize(self):
4048
"""Standard initialization"""
4149
# This agent will always loop every 15 minutes
4250
self.am_setOption("PollingTime", 900)
4351

4452
self.backends = self.am_getOption("Backends", "Accounting").replace(" ", "").split(",")
45-
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")
46-
47-
self.log.info("Committing to %s backend" % "and ".join(self.backends))
53+
self.monitoringEnabled = Operations().getValue("MonitoringEnabled", False)
4854

55+
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")
56+
pilotMessageQueue = self.am_getOption("MessageQueue", "dirac.monitoring")
4957
self.datastores = {} # For storing the clients to Accounting and Monitoring
5058

5159
if "Accounting" in self.backends:
5260
self.datastores["Accounting"] = DataStoreClient(retryGraceTime=900)
53-
if "Monitoring" in self.backends:
61+
if "Monitoring" in self.backends or self.monitoringEnabled:
5462
self.datastores["Monitoring"] = MonitoringReporter(
5563
monitoringType="WMSHistory", failoverQueueName=messageQueue
5664
)
65+
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory", failoverQueueName=pilotMessageQueue)
5766

5867
self.__jobDBFields = []
5968
for field in self.__summaryKeyFieldsMapping:
@@ -66,27 +75,62 @@ def initialize(self):
6675

6776
def execute(self):
6877
"""Main execution method"""
69-
# Get the WMS Snapshot!
78+
79+
# PilotsHistory to Monitoring
80+
if self.monitoringEnabled:
81+
self.log.info("Committing PilotsHistory to Monitoring")
82+
result = PilotAgentsDB.getSummarySnapshot(self.__pilotKeyFields)
83+
now = Time.dateTime()
84+
if not result["OK"]:
85+
self.log.error(
86+
"Can't get the PilotAgentsDB summary",
87+
"%s: won't commit PilotsHistory at this cycle" % result["Message"],
88+
)
89+
return S_ERROR()
90+
91+
values = result["Value"][1]
92+
for record in values:
93+
record = record[1:]
94+
rD = {}
95+
for iP, _ in enumerate(self.__pilotKeyFields):
96+
rD[self.__pilotKeyFields[iP]] = record[iP]
97+
record = record[len(self.__pilotKeyFields) :]
98+
for iP, _ in enumerate(self.__pilotValueFields):
99+
rD[self.__pilotValueFields[iP]] = int(record[iP])
100+
rD["timestamp"] = int(Time.toEpoch(now))
101+
self.log.debug("Adding following PilotsHistory record to Reporter: \n", rD)
102+
self.pilotReporter.addRecord(rD)
103+
104+
self.log.info("Committing to Monitoring...")
105+
result = self.pilotReporter.commit()
106+
if not result["OK"]:
107+
self.log.error("Could not commit to Monitoring", result["Message"])
108+
return result
109+
self.log.verbose("Done committing PilotsHistory to Monitoring")
110+
111+
# WMSHistory to Monitoring or Accounting
112+
self.log.info("Committing WMSHistory to %s backend" % "and ".join(self.backends))
70113
result = JobDB().getSummarySnapshot(self.__jobDBFields)
71114
now = Time.dateTime()
72115
if not result["OK"]:
73-
self.log.error("Can't get the JobDB summary", "%s: won't commit at this cycle" % result["Message"])
116+
self.log.error(
117+
"Can't get the JobDB summary", "%s: won't commit WMSHistory at this cycle" % result["Message"]
118+
)
74119
return S_ERROR()
75120

76-
# Now we try to commit
77121
values = result["Value"][1]
78122

79-
self.log.info("Start sending records")
123+
self.log.info("Start sending WMSHistory records")
80124
for record in values:
81125
record = record[1:]
82126
rD = {}
83127
for fV in self.__summaryDefinedFields:
84128
rD[fV[0]] = fV[1]
85-
for iP in range(len(self.__summaryKeyFieldsMapping)):
129+
for iP, _ in enumerate(self.__summaryKeyFieldsMapping):
86130
fieldName = self.__summaryKeyFieldsMapping[iP]
87131
rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP]
88132
record = record[len(self.__summaryKeyFieldsMapping) :]
89-
for iP in range(len(self.__summaryValueFieldsMapping)):
133+
for iP, _ in enumerate(self.__summaryValueFieldsMapping):
90134
rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP])
91135

92136
for backend in self.datastores:
@@ -101,16 +145,16 @@ def execute(self):
101145
acWMS.setValuesFromDict(rD)
102146
retVal = acWMS.checkValues()
103147
if not retVal["OK"]:
104-
self.log.error("Invalid accounting record ", "%s -> %s" % (retVal["Message"], rD))
148+
self.log.error("Invalid WMSHistory accounting record ", "%s -> %s" % (retVal["Message"], rD))
105149
else:
106150
self.datastores["Accounting"].addRegister(acWMS)
107151

108152
for backend, datastore in self.datastores.items():
109-
self.log.info("Committing to %s backend" % backend)
153+
self.log.info("Committing WMSHistory records to %s backend" % backend)
110154
result = datastore.commit()
111155
if not result["OK"]:
112-
self.log.error("Couldn't commit WMS history to %s" % backend, result["Message"])
156+
self.log.error("Couldn't commit WMSHistory to %s" % backend, result["Message"])
113157
return S_ERROR()
114-
self.log.verbose("Done committing to %s backend" % backend)
158+
self.log.verbose("Done committing WMSHistory to %s backend" % backend)
115159

116160
return S_OK()

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,17 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
11731173

11741174
return S_OK(resultDict)
11751175

1176+
def getSummarySnapshot(self, requestedFields=False):
1177+
"""Get the summary snapshot for a given combination"""
1178+
requestedFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
1179+
valueFields = ["COUNT(PilotID)"]
1180+
defString = ", ".join(requestedFields)
1181+
valueString = ", ".join(valueFields)
1182+
result = self._query(f"SELECT {defString}, {valueString} FROM PilotAgents GROUP BY {defString}")
1183+
if not result["OK"]:
1184+
return result
1185+
return S_OK(((requestedFields + valueFields), result["Value"]))
1186+
11761187

11771188
class PivotedPilotSummaryTable:
11781189
"""

tests/Integration/Monitoring/Test_MonitoringReporter.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
wmsMonitoringReporter = MonitoringReporter(monitoringType="WMSHistory")
6464
componentMonitoringReporter = MonitoringReporter(monitoringType="ComponentMonitoring")
6565
pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotSubmissionMonitoring")
66-
66+
pilotsHistoryReporter = MonitoringReporter(monitoringType="PilotsHistory")
6767
data = [
6868
{
6969
"Status": "Waiting",
@@ -852,6 +852,25 @@
852852
},
853853
]
854854

855+
pilotsHistoryData = [
856+
{
857+
"TaskQueueID": "1240451",
858+
"GridSite": "LCG.CNAF.it",
859+
"GridType": "",
860+
"Status": "failed",
861+
"Pilots": "7",
862+
"timestamp": 1649161714,
863+
},
864+
{
865+
"TaskQueueID": "12401",
866+
"GridSite": "LCG.CNAF.it",
867+
"GridType": "",
868+
"Status": "failed",
869+
"Pilots": "7",
870+
"timestamp": 1649161714,
871+
},
872+
]
873+
855874

856875
def test_addWMSRecords():
857876
for record in data:
@@ -875,3 +894,11 @@ def test_addPilotSubmissionRecords():
875894
result = pilotMonitoringReporter.commit()
876895
assert result["OK"]
877896
assert result["Value"] == len(pilotMonitoringData)
897+
898+
899+
def test_addPilotHistoryRecords():
900+
for record in pilotsHistoryData:
901+
pilotsHistoryReporter.addRecord(record)
902+
result = pilotsHistoryReporter.commit()
903+
assert result["OK"]
904+
assert result["Value"] == len(pilotsHistoryData)

0 commit comments

Comments
 (0)