Skip to content

Commit f5be3bb

Browse files
committed
feat: use summary table and triggers (PilotAgentsDB.PilotAgents, summaries for StatesAccounting)
F
1 parent 44ac56e commit f5be3bb

File tree

4 files changed

+174
-24
lines changed

4 files changed

+174
-24
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class StatesAccountingAgent(AgentModule):
4343
__renameFieldsMapping = {"JobType": "JobSplitType"}
4444

4545
# PilotsHistory fields
46-
__pilotsMapping = ["GridSite", "GridType", "Status", "NumOfPilots"]
46+
__pilotsMapping = ["GridSite", "GridType", "Status", "VO", "NumOfPilots"]
4747

4848
def initialize(self):
4949
"""Standard initialization"""
@@ -85,7 +85,8 @@ def execute(self):
8585
# PilotsHistory to Monitoring
8686
if "Monitoring" in self.pilotMonitoringOption:
8787
self.log.info("Committing PilotsHistory to Monitoring")
88-
result = PilotAgentsDB().getSummarySnapshot()
88+
sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, DestinationSite, Status, VO;"
89+
result = PilotAgentsDB()._query(sql)
8990
now = datetime.datetime.utcnow()
9091
if not result["OK"]:
9192
self.log.error(

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -943,15 +943,12 @@ def getPilotSummaryWeb(self, selectDict, sortList, startItem, maxItems):
943943
def getPilotMonitorSelectors(self):
944944
"""Get distinct values for the Pilot Monitor page selectors"""
945945

946-
paramNames = ["VO", "GridType", "Status", "DestinationSite", "GridSite"]
947-
948946
resultDict = {}
949-
for param in paramNames:
950-
result = self.getDistinctAttributeValues("PilotAgents", param)
951-
if result["OK"]:
952-
resultDict[param] = result["Value"]
953-
else:
947+
for param in ["VO", "Status", "DestinationSite", "GridSite"]:
948+
result = self.getDistinctAttributeValues("PilotsHistorySummary", param)
949+
if not result["OK"]:
954950
resultDict = []
951+
resultDict[param] = result["Value"]
955952

956953
return S_OK(resultDict)
957954

@@ -1048,18 +1045,6 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems):
10481045

10491046
return S_OK(resultDict)
10501047

1051-
def getSummarySnapshot(self, requestedFields=False):
1052-
"""Get the summary snapshot for a given combination"""
1053-
if not requestedFields:
1054-
requestedFields = ["GridSite", "GridType", "Status"]
1055-
valueFields = ["COUNT(PilotID)"]
1056-
defString = ", ".join(requestedFields)
1057-
valueString = ", ".join(valueFields)
1058-
result = self._query(f"SELECT {defString}, {valueString} FROM PilotAgents GROUP BY {defString}")
1059-
if not result["OK"]:
1060-
return result
1061-
return S_OK(((requestedFields + valueFields), result["Value"]))
1062-
10631048

10641049
class PivotedPilotSummaryTable:
10651050
"""

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
-- $Header: /tmp/libdirac/tmp.stZoy15380/dirac/DIRAC3/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql,v 1.20 2009/08/26 09:39:53 rgracian Exp $
2-
31
-- ------------------------------------------------------------------------------
42
--
53
-- Schema definition for the PilotAgentsDB database - containing the Pilots status
@@ -48,7 +46,6 @@ CREATE TABLE `PilotAgents` (
4846
KEY `Statuskey` (`GridSite`,`DestinationSite`,`Status`)
4947
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
5048

51-
5249
DROP TABLE IF EXISTS `JobToPilotMapping`;
5350
CREATE TABLE `JobToPilotMapping` (
5451
`PilotID` INT(11) UNSIGNED NOT NULL,
@@ -65,3 +62,77 @@ CREATE TABLE `PilotOutput` (
6562
`StdError` MEDIUMTEXT,
6663
PRIMARY KEY (`PilotID`)
6764
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
65+
66+
67+
-- ------------------------------------------------------------------------------
68+
-- summary table and triggers
69+
-- ------------------------------------------------------------------------------
70+
71+
-- summary for PilotsHistory
72+
73+
DROP TABLE IF EXISTS `PilotsHistorySummary`;
74+
CREATE TABLE `PilotsHistorySummary` (
75+
`GridSite` VARCHAR(128),
76+
`DestinationSite` VARCHAR(128),
77+
`Status` VARCHAR(32),
78+
`VO` VARCHAR(128),
79+
`PilotCount` INT,
80+
PRIMARY KEY (`GridSite`,`DestinationSite`,`Status`, `VO`)
81+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
82+
83+
84+
-- now the triggers
85+
86+
DELIMITER //
87+
88+
CREATE TRIGGER trg_PilotAgents_insert
89+
AFTER INSERT ON PilotAgents
90+
FOR EACH ROW
91+
BEGIN
92+
INSERT INTO PilotsHistorySummary (GridSite, DestinationSite, Status, VO, PilotCount)
93+
VALUES (NEW.GridSite, NEW.DestinationSite, NEW.Status, NEW.VO, 1)
94+
ON DUPLICATE KEY UPDATE PilotCount = PilotCount + 1;
95+
END;
96+
//
97+
98+
CREATE TRIGGER trg_PilotAgents_delete
99+
AFTER DELETE ON PilotAgents
100+
FOR EACH ROW
101+
BEGIN
102+
UPDATE PilotsHistorySummary
103+
SET PilotCount = PilotCount - 1
104+
WHERE GridSite = OLD.GridSite
105+
AND DestinationSite = OLD.DestinationSite
106+
AND Status = OLD.Status
107+
AND VO = OLD.VO;
108+
109+
-- Optional cleanup (remove zero rows)
110+
DELETE FROM PilotsHistorySummary WHERE PilotCount = 0;
111+
END;
112+
//
113+
114+
CREATE TRIGGER trg_PilotAgents_update_status
115+
AFTER UPDATE ON PilotAgents
116+
FOR EACH ROW
117+
BEGIN
118+
IF OLD.Status != NEW.Status THEN
119+
120+
-- Decrease count from old status
121+
UPDATE PilotsHistorySummary
122+
SET PilotCount = PilotCount - 1
123+
WHERE GridSite = OLD.GridSite
124+
AND DestinationSite = OLD.DestinationSite
125+
AND Status = OLD.Status
126+
AND VO = OLD.VO;
127+
128+
-- Delete row if count drops to zero
129+
DELETE FROM PilotsHistorySummary WHERE PilotCount = 0;
130+
131+
-- Increase count for new status
132+
INSERT INTO PilotsHistorySummary (GridSite, DestinationSite, Status, VO, PilotCount)
133+
VALUES (NEW.GridSite, NEW.DestinationSite, NEW.Status, NEW.VO, 1)
134+
ON DUPLICATE KEY UPDATE PilotCount = PilotCount + 1;
135+
136+
END IF;
137+
END;
138+
//

tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,96 @@ def test_PivotedPilotSummaryTable():
186186
assert "Total" in columns
187187

188188
cleanUpPilots(pilotRef)
189+
190+
191+
# Parse date strings into datetime objects
192+
def process_data(data):
193+
converted_data = []
194+
195+
for row in data:
196+
# date fields
197+
date_indices = [10, 11] # Positions of date fields
198+
for i in date_indices:
199+
if not row[i]:
200+
row[i] = None
201+
else:
202+
try:
203+
row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S")
204+
except ValueError:
205+
# Handle invalid dates
206+
row[i] = None
207+
# Convert other fields to appropriate types
208+
int_indices = [0, 1] # Positions of integer fields
209+
for i in int_indices:
210+
if not row[i]:
211+
row[i] = 0
212+
else:
213+
try:
214+
row[i] = int(row[i])
215+
except ValueError:
216+
# Handle invalid integers
217+
row[i] = 0
218+
float_indices = [9] # Positions of float fields
219+
for i in float_indices:
220+
if not row[i]:
221+
row[i] = 0
222+
else:
223+
try:
224+
row[i] = float(row[i])
225+
except ValueError:
226+
# Handle invalid float
227+
row[i] = 0
228+
converted_data.append(tuple(row))
229+
return converted_data
230+
231+
232+
def test_summarySnapshot():
233+
# first delete all pilots
234+
sql = "DELETE FROM PilotAgents"
235+
res = paDB._update(sql)
236+
assert res["OK"], res["Message"]
237+
sql = "DELETE FROM PilotsHistorySummary"
238+
res = paDB._update(sql)
239+
assert res["OK"], res["Message"]
240+
241+
# insert some predefined pilots to test the summary snapshot
242+
with open("pilots.csv", newline="", encoding="utf-8") as csvfile:
243+
csvreader = csv.reader(csvfile)
244+
data = list(csvreader)
245+
processed_data = process_data(data)
246+
placeholders = ",".join(["%s"] * len(processed_data[0]))
247+
sql = f"INSERT INTO PilotAgents (InitialJobID, CurrentJobID, PilotJobReference, PilotStamp, DestinationSite, Queue, GridSite, VO, GridType, BenchMark, SubmissionTime, LastUpdateTime, Status, StatusReason, AccountingSent) VALUES ({placeholders})"
248+
res = paDB._updatemany(sql, processed_data)
249+
assert res["OK"], res["Message"]
250+
sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, DestinationSite, Status, VO;"
251+
result = PilotAgentsDB()._query(sql)
252+
assert result["OK"], result["Message"]
253+
values = result["Value"][1]
254+
assert len(values) == 5, "Expected 5 record in the summary"
255+
# Check it corresponds to the basic "GROUP BY" query
256+
sql = "SELECT GridSite, DestinationSite, Status, VO, COUNT(*) FROM PilotAgents GROUP BY GridSite, DestinationSite, Status, VO ORDER BY GridSite, DestinationSite, Status, VO;"
257+
result_grouped = PilotAgentsDB()._query(sql)
258+
assert result_grouped["OK"], result_grouped["Message"]
259+
sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, DestinationSite, Status, VO;"
260+
result_summary = PilotAgentsDB()._query(sql)
261+
assert result_summary["OK"], result_summary["Message"]
262+
assert result_grouped["Value"] == result_summary["Value"], "Summary and grouped query results differ"
263+
264+
# deleting now
265+
with open("pilots.csv", newline="", encoding="utf-8") as csvfile:
266+
csvreader = csv.reader(csvfile)
267+
data = list(csvreader)
268+
processed_data = process_data(data)
269+
pilotStamps = [row[3] for row in processed_data]
270+
pilotStampsStr = ",".join("'" + p + "'" for p in pilotStamps)
271+
sql = f"DELETE FROM PilotAgents WHERE PilotStamp IN (%s)" % pilotStampsStr
272+
res = paDB._update(sql)
273+
assert res["OK"], res["Message"]
274+
# Check it corresponds to the basic "GROUP BY" query
275+
sql = "SELECT GridSite, DestinationSite, Status, VO, COUNT(*) FROM PilotAgents GROUP BY GridSite, DestinationSite, Status, VO ORDER BY GridSite, DestinationSite, Status, VO;"
276+
result_grouped = PilotAgentsDB()._query(sql)
277+
assert result_grouped["OK"], result_grouped["Message"]
278+
sql = "select * FROM PilotsHistorySummary ORDER BY GridSite, DestinationSite, Status, VO;"
279+
result_summary = PilotAgentsDB()._query(sql)
280+
assert result_summary["OK"], result_summary["Message"]
281+
assert result_grouped["Value"] == result_summary["Value"], "Summary and grouped query results differ"

0 commit comments

Comments
 (0)