Skip to content

Commit 27b77aa

Browse files
committed
feat: use summary table and triggers (JobDB.Jobs, summaries for StatesAccounting)
1 parent f555ccd commit 27b77aa

File tree

3 files changed

+109
-20
lines changed

3 files changed

+109
-20
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class StatesAccountingAgent(AgentModule):
3434
"User",
3535
"UserGroup",
3636
"JobGroup",
37+
"VO",
3738
"JobType",
3839
"ApplicationStatus",
3940
"MinorStatus",
@@ -110,14 +111,14 @@ def execute(self):
110111

111112
# WMSHistory to Monitoring or Accounting
112113
self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend")
113-
result = JobDB().getSummarySnapshot(self.__jobDBFields)
114-
now = datetime.datetime.utcnow()
114+
result = JobDB()._query("SELECT * FROM JobsHistorySummary")
115115
if not result["OK"]:
116116
self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle")
117117
return S_ERROR()
118118

119119
values = result["Value"][1]
120120

121+
now = datetime.datetime.utcnow()
121122
self.log.info("Start sending WMSHistory records")
122123
for record in values:
123124
rD = {}

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
from DIRAC.Core.Utilities.ReturnValues import (
2727
S_ERROR,
2828
S_OK,
29+
DReturnType,
30+
SErrorException,
2931
convertToReturnValue,
3032
returnValueOrRaise,
31-
SErrorException,
32-
DReturnType,
3333
)
3434
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
3535
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
@@ -107,7 +107,7 @@ def __getAttributeNames(self):
107107
def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=None, timeStamp="LastUpdateTime"):
108108
"""Get distinct values of the job attribute under specified conditions"""
109109
return self.getDistinctAttributeValues(
110-
"Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
110+
"JobsHistorySummary", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
111111
)
112112

113113
#############################################################################
@@ -1538,19 +1538,6 @@ def setJobCommandStatus(self, jobID, command, status):
15381538

15391539
return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}")
15401540

1541-
#####################################################################################
1542-
def getSummarySnapshot(self, requestedFields=False):
1543-
"""Get the summary snapshot for a given combination"""
1544-
if not requestedFields:
1545-
requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"]
1546-
valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"]
1547-
defString = ", ".join(requestedFields)
1548-
valueString = ", ".join(valueFields)
1549-
result = self._query(f"SELECT {defString}, {valueString} FROM Jobs GROUP BY {defString}")
1550-
if not result["OK"]:
1551-
return result
1552-
return S_OK(((requestedFields + valueFields), result["Value"]))
1553-
15541541
def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines):
15551542
"""Remove HeartBeatLoggingInfo from DB.
15561543

src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ CREATE TABLE `Jobs` (
4040
`JobName` VARCHAR(128) NOT NULL DEFAULT 'Unknown',
4141
`Owner` VARCHAR(64) NOT NULL DEFAULT 'Unknown',
4242
`OwnerGroup` VARCHAR(128) NOT NULL DEFAULT 'Unknown',
43-
`VO` VARCHAR(32) NOT NULL DEFAULT 'Unknown',
43+
`VO` VARCHAR(128) NOT NULL DEFAULT 'Unknown',
4444
`SubmissionTime` DATETIME DEFAULT NULL,
4545
`RescheduleTime` DATETIME DEFAULT NULL,
4646
`LastUpdateTime` DATETIME DEFAULT NULL,
@@ -126,9 +126,110 @@ CREATE TABLE `JobCommands` (
126126
`JobID` INT(11) UNSIGNED NOT NULL,
127127
`Command` VARCHAR(100) NOT NULL,
128128
`Arguments` VARCHAR(100) NOT NULL,
129-
`Status` VARCHAR(64) NOT NULL DEFAULT 'Received',
129+
`Status` VARCHAR(32) NOT NULL DEFAULT 'Received',
130130
`ReceptionTime` DATETIME NOT NULL,
131131
`ExecutionTime` DATETIME DEFAULT NULL,
132132
PRIMARY KEY (`JobID`,`Arguments`,`ReceptionTime`),
133133
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
134134
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
135+
136+
137+
-- ------------------------------------------------------------------------------
138+
-- summary table and triggers
139+
-- ------------------------------------------------------------------------------
140+
141+
-- summary for JobsHistory
142+
143+
DROP TABLE IF EXISTS `JobsHistorySummary`;
144+
CREATE TABLE `JobsHistorySummary` (
145+
`ID` INT AUTO_INCREMENT PRIMARY KEY,
146+
`Status` VARCHAR(32),
147+
`Site` VARCHAR(100),
148+
`Owner` VARCHAR(32),
149+
`OwnerGroup` VARCHAR(128),
150+
`VO` VARCHAR(128),
151+
`JobGroup` VARCHAR(32),
152+
`JobType` VARCHAR(32),
153+
`ApplicationStatus` VARCHAR(255),
154+
`MinorStatus` VARCHAR(128),
155+
`JobCount` INT,
156+
RescheduleSum INT,
157+
UNIQUE KEY uq_summary (
158+
`Status`,
159+
`Site`,
160+
`Owner`,
161+
`OwnerGroup`(32),
162+
`VO`,
163+
`JobGroup`,
164+
`JobType`,
165+
`ApplicationStatus`(128),
166+
`MinorStatus`
167+
)
168+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
169+
170+
171+
-- now the triggers
172+
173+
DELIMITER //
174+
175+
CREATE TRIGGER trg_Jobs_insert
176+
AFTER INSERT ON Jobs
177+
FOR EACH ROW
178+
BEGIN
179+
INSERT INTO JobsHistorySummary (Status, Site, Owner, OwnerGroup, VO, JobGroup, JobType, ApplicationStatus, MinorStatus, JobCount, RescheduleSum)
180+
VALUES (NEW.Status, NEW.Site, NEW.Owner, NEW.OwnerGroup, NEW.VO, NEW.JobGroup, NEW.JobType, NEW.ApplicationStatus, NEW.MinorStatus, 1, NEW.RescheduleCounter)
181+
ON DUPLICATE KEY UPDATE JobCount = JobCount + 1, RescheduleSum = RescheduleSum + NEW.RescheduleCounter;
182+
END;
183+
//
184+
185+
CREATE TRIGGER trg_Jobs_delete
186+
AFTER DELETE ON Jobs
187+
FOR EACH ROW
188+
BEGIN
189+
UPDATE JobsHistorySummary
190+
SET JobCount = JobCount - 1, RescheduleSum = RescheduleSum - OLD.RescheduleCounter
191+
WHERE Status = OLD.Status
192+
AND Site = OLD.Site
193+
AND Owner = OLD.Owner
194+
AND OwnerGroup = OLD.OwnerGroup
195+
AND VO = OLD.VO
196+
AND JobGroup = OLD.JobGroup
197+
AND JobType = OLD.JobType
198+
AND ApplicationStatus = OLD.ApplicationStatus
199+
AND MinorStatus = OLD.MinorStatus;
200+
201+
-- Optional cleanup (remove zero rows)
202+
DELETE FROM JobsHistorySummary WHERE JobCount = 0;
203+
END;
204+
//
205+
206+
CREATE TRIGGER trg_Jobs_update_status
207+
AFTER UPDATE ON Jobs
208+
FOR EACH ROW
209+
BEGIN
210+
IF OLD.Status != NEW.Status THEN
211+
212+
-- Decrease count from old status
213+
UPDATE JobsHistorySummary
214+
SET JobCount = JobCount - 1, RescheduleSum = RescheduleSum - OLD.RescheduleCounter
215+
WHERE Status = OLD.Status
216+
AND Site = OLD.Site
217+
AND Owner = OLD.Owner
218+
AND OwnerGroup = OLD.OwnerGroup
219+
AND VO = OLD.VO
220+
AND JobGroup = OLD.JobGroup
221+
AND JobType = OLD.JobType
222+
AND ApplicationStatus = OLD.ApplicationStatus
223+
AND MinorStatus = OLD.MinorStatus;
224+
225+
-- Delete row if count drops to zero
226+
DELETE FROM JobsHistorySummary WHERE JobCount = 0;
227+
228+
-- Increase count for new status
229+
INSERT INTO JobsHistorySummary (Status, Site, Owner, OwnerGroup, JobGroup, VO, JobType, ApplicationStatus, MinorStatus, JobCount, RescheduleSum)
230+
VALUES (NEW.Status, NEW.Site, NEW.Owner, NEW.OwnerGroup, NEW.JobGroup, NEW.VO, NEW.JobType, NEW.ApplicationStatus, NEW.MinorStatus, 1, NEW.RescheduleCounter)
231+
ON DUPLICATE KEY UPDATE JobCount = JobCount + 1, RescheduleSum = RescheduleSum + NEW.RescheduleCounter;
232+
233+
END IF;
234+
END;
235+
//

0 commit comments

Comments
 (0)