Skip to content

Commit 9c5a319

Browse files
committed
feat: Added method for retrieving snapshot of PilotsHistory
1 parent b17cd7a commit 9c5a319

File tree

4 files changed

+66
-69
lines changed

4 files changed

+66
-69
lines changed

src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
""" Definition for PilotsHistory Monitoring type.
2-
Filled by the agent "WorkloadManagement/PilotsHistoryAgent"
2+
Filled by the agent "WorkloadManagement/StatesAccountingAgent"
33
"""
44

55
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType

src/DIRAC/WorkloadManagementSystem/Agent/PilotsHistoryAgent.py

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" StatesAccountingAgent sends periodically numbers of jobs in various states for various
1+
""" StatesAccountingAgent sends periodically numbers of jobs and pilots in various states for various
22
sites to the Monitoring system to create historical plots.
33
44
.. literalinclude:: ../ConfigTemplate.cfg
@@ -8,20 +8,24 @@
88
:caption: StatesAccountingAgent options
99
"""
1010
from DIRAC import S_OK, S_ERROR
11+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1112
from DIRAC.Core.Base.AgentModule import AgentModule
1213
from DIRAC.Core.Utilities import Time
1314
from DIRAC.AccountingSystem.Client.Types.WMSHistory import WMSHistory
1415
from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient
1516
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
1617
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
18+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
1719

1820

1921
class StatesAccountingAgent(AgentModule):
2022
"""Agent that every 15 minutes will report
2123
to the AccountingDB (MySQL) or the Monitoring DB (ElasticSearch), or both,
2224
a snapshot of the JobDB.
25+
Also sends a snapshot of PilotAgentsDB to Monitoring.
2326
"""
2427

28+
# WMSHistory fields
2529
__summaryKeyFieldsMapping = [
2630
"Status",
2731
"Site",
@@ -36,24 +40,29 @@ class StatesAccountingAgent(AgentModule):
3640
__summaryValueFieldsMapping = ["Jobs", "Reschedules"]
3741
__renameFieldsMapping = {"JobType": "JobSplitType"}
3842

43+
# PilotsHistory fields
44+
__pilotKeyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
45+
__pilotValueFields = ["Pilots"]
46+
3947
def initialize(self):
4048
"""Standard initialization"""
4149
# This agent will always loop every 15 minutes
4250
self.am_setOption("PollingTime", 900)
4351

4452
self.backends = self.am_getOption("Backends", "Accounting").replace(" ", "").split(",")
45-
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")
53+
self.monitoringEnabled = Operations().getValue("monitoringEnabled", "False")
4654

47-
self.log.info("Committing to %s backend" % "and ".join(self.backends))
55+
messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory")
4856

4957
self.datastores = {} # For storing the clients to Accounting and Monitoring
5058

5159
if "Accounting" in self.backends:
5260
self.datastores["Accounting"] = DataStoreClient(retryGraceTime=900)
53-
if "Monitoring" in self.backends:
61+
if "Monitoring" in self.backends or self.monitoringEnabled:
5462
self.datastores["Monitoring"] = MonitoringReporter(
5563
monitoringType="WMSHistory", failoverQueueName=messageQueue
5664
)
65+
self.pilotReporter = MonitoringReporter(monitoringType="PilotsHistory")
5766

5867
self.__jobDBFields = []
5968
for field in self.__summaryKeyFieldsMapping:
@@ -66,17 +75,50 @@ def initialize(self):
6675

6776
def execute(self):
6877
"""Main execution method"""
69-
# Get the WMS Snapshot!
78+
79+
# PilotsHistory to Monitoring
80+
if self.monitoringEnabled:
81+
result = PilotAgentsDB.getSummarySnapshot(self.__pilotKeyFields)
82+
now = Time.dateTime()
83+
if not result["OK"]:
84+
self.log.error(
85+
"Can't get the PilotAgentsDB summary", "%s: won't commit at this cycle" % result["Message"]
86+
)
87+
return S_ERROR()
88+
89+
values = result["Value"][1]
90+
for record in values:
91+
record = record[1:]
92+
rD = {}
93+
for iP in range(len(self.__pilotKeyFields)):
94+
rD[self.__pilotKeyFields[iP]] = record[iP]
95+
record = record[len(self.__pilotKeyFields) :]
96+
for iP in range(len(self.__pilotValueFields)):
97+
rD[self.__pilotValueFields[iP]] = int(record[iP])
98+
rD["timestamp"] = int(Time.toEpoch(now))
99+
self.log.verbose("Adding following PilotsHistory record to Reporter: \n", rD)
100+
self.pilotReporter.addRecord(rD)
101+
102+
self.log.info("Committing PilotsHistory to Monitoring")
103+
result = self.pilotReporter.commit()
104+
if not result["OK"]:
105+
self.log.error("Could not commit PilotsHistory to Monitoring")
106+
return S_ERROR()
107+
self.log.verbose("Done committing PilotsHistory to Monitoring")
108+
109+
# WMSHistory to Monitoring or Accounting
110+
self.log.info("Committing WMSHistory to %s backend" % "and ".join(self.backends))
70111
result = JobDB().getSummarySnapshot(self.__jobDBFields)
71112
now = Time.dateTime()
72113
if not result["OK"]:
73-
self.log.error("Can't get the JobDB summary", "%s: won't commit at this cycle" % result["Message"])
114+
self.log.error(
115+
"Can't get the JobDB summary", "%s: won't commit WMSHistory at this cycle" % result["Message"]
116+
)
74117
return S_ERROR()
75118

76-
# Now we try to commit
77119
values = result["Value"][1]
78120

79-
self.log.info("Start sending records")
121+
self.log.info("Start sending WMSHistory records")
80122
for record in values:
81123
record = record[1:]
82124
rD = {}
@@ -101,16 +143,16 @@ def execute(self):
101143
acWMS.setValuesFromDict(rD)
102144
retVal = acWMS.checkValues()
103145
if not retVal["OK"]:
104-
self.log.error("Invalid accounting record ", "%s -> %s" % (retVal["Message"], rD))
146+
self.log.error("Invalid WMSHistory accounting record ", "%s -> %s" % (retVal["Message"], rD))
105147
else:
106148
self.datastores["Accounting"].addRegister(acWMS)
107149

108150
for backend, datastore in self.datastores.items():
109-
self.log.info("Committing to %s backend" % backend)
151+
self.log.info("Committing WMSHistory records to %s backend" % backend)
110152
result = datastore.commit()
111153
if not result["OK"]:
112-
self.log.error("Couldn't commit WMS history to %s" % backend, result["Message"])
154+
self.log.error("Couldn't commit WMSHistory to %s" % backend, result["Message"])
113155
return S_ERROR()
114-
self.log.verbose("Done committing to %s backend" % backend)
156+
self.log.verbose("Done committing WMSHistory to %s backend" % backend)
115157

116158
return S_OK()

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,17 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
11731173

11741174
return S_OK(resultDict)
11751175

1176+
def getSummarySnapshot(self, requestedFields=False):
1177+
"""Get the summary snapshot for a given combination"""
1178+
requestedFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
1179+
valueFields = ["COUNT(PilotID)"]
1180+
defString = ", ".join(requestedFields)
1181+
valueString = ", ".join(valueFields)
1182+
result = self._query(f"SELECT {defString}, {valueString} FROM PilotAgents GROUP BY {defString}")
1183+
if not result["OK"]:
1184+
return result
1185+
return S_OK(((requestedFields + valueFields), result["Value"]))
1186+
11761187

11771188
class PivotedPilotSummaryTable:
11781189
"""

0 commit comments

Comments
 (0)