diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/StatesAccountingAgent/index.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/StatesAccountingAgent/index.rst index 4b2052d9d3d..05815957696 100644 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/StatesAccountingAgent/index.rst +++ b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/StatesAccountingAgent/index.rst @@ -1,7 +1,7 @@ Systems / WorkloadManagement / / Agents / StatesAccountingAgent - Sub-subsection =========================================================================================== -StatesAccountingAgent sends periodically numbers of jobs in various states for various sites to the -Monitoring system to create historical plots. +StatesAccountingAgent sends exactly every 15 minutes counts of jobs in various states for various sites to the +Monitoring system to create monitoring and historical plots. This agent doesn't have special options to configure. diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/index.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/index.rst index 6a6da82f59e..aad8c6ee6d4 100644 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/index.rst +++ b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/WorkloadManagement/Agents/index.rst @@ -3,13 +3,6 @@ Systems / WorkloadManagement / / Agents - Sub-subsection In this subsection each agent is described. -+----------+----------------------------------+----------------+ -| **Name** | **Description** | **Example** | -+----------+----------------------------------+----------------+ -| *Agent* | Subsection named as the agent is | InputDataAgent | -| | called. | | -+----------+----------------------------------+----------------+ - Common options for all the agents are described in the table below: +---------------------+---------------------------------------+------------------------------+ diff --git a/integration_tests.py b/integration_tests.py index bdca32baaf0..d4a3b1e3342 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -509,6 +509,20 @@ def exec_mysql(): os.execvp(cmd[0], cmd) +@app.command() +def exec_mysql_root(): + """Start an interactive session in the server container.""" + _check_containers_running() + cmd = _build_docker_cmd("mysql", use_root=True, cwd="/") + cmd += [ + "bash", + "-c", + f"exec mysql --user={DB_ROOTUSER} --password={DB_ROOTPWD}", + ] + typer.secho("Opening prompt inside server container", err=True, fg=c.GREEN) + os.execvp(cmd[0], cmd) + + @app.command() def list_services(): """List the services which have been running. diff --git a/src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py b/src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py index 7591bb6f948..c5475ae0efe 100644 --- a/src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py +++ b/src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py @@ -97,7 +97,9 @@ def setValuesFromDict(self, dataDict): if errKeys: return S_ERROR(f"Key(s) {', '.join(errKeys)} are not valid") for key in dataDict: - self.setValueByKey(key, dataDict[key]) + res = self.setValueByKey(key, dataDict[key]) + if not res["OK"]: + return res return S_OK() def getValue(self, key): diff --git a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py index 8540636f77a..1569692c2e9 100644 --- a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py +++ b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py @@ -2132,14 +2132,39 @@ def installDatabase(self, dbName): try: cmdLines = self._createMySQLCMDLines(dbSql) - # We need to run one SQL cmd at once, mysql is much happier that way. - # Create a string of commands, ignoring comment lines - sqlString = "\n".join(x for x in cmdLines if not x.startswith("--")) - - # Now run each command (They are seperated by ;) - # Ignore any empty ones - cmds = [x.strip() for x in sqlString.split(";") if x.strip()] - for cmd in cmds: + # Now run each command (They are seperated by ;, or by a DELIMITER) + # We need to split the string into commands, and ignore any empty ones + + # Handle DELIMITER statements in SQL + delimiter = ";" + commands = [] + current_command = [] + for line in cmdLines: + if line.startswith("--"): + continue + if line.startswith("DELIMITER "): + delimiter = line.split("DELIMITER ", 1)[1].strip() + continue + if delimiter != ";": + if line == delimiter: + commands.append("\n".join(current_command).strip()) + current_command = [] + else: + current_command.append(line) + else: + if line.endswith(";"): + current_command.append(line[:-1]) + commands.append("\n".join(current_command).strip()) + current_command = [] + else: + current_command.append(line) + if current_command: + commands.append("\n".join(current_command).strip()) + + # Remove empty commands + commands = [cmd for cmd in commands if cmd] + + for cmd in commands: result = self.execMySQL(cmd, dbName) if not result["OK"]: error = "Failed to initialize Database" diff --git a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py index ff101940541..2d41fa92a00 100644 --- a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py +++ b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py @@ -17,7 +17,7 @@ def __init__(self): super().__init__() - self.keyFields = ["GridSite", "GridType", "Status"] + self.keyFields = ["GridSite", "ComputingElement", "GridType", "Status", "VO"] self.monitoringFields = ["NumOfPilots"] @@ -26,8 +26,10 @@ def __init__(self): self.addMapping( { "GridSite": {"type": "keyword"}, + "ComputingElement": {"type": "keyword"}, "GridType": {"type": "keyword"}, "Status": {"type": "keyword"}, + "VO": {"type": "keyword"}, "NumOfPilots": {"type": "long"}, } ) diff --git a/src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py b/src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py index 3ff6e52e872..840dddceead 100644 --- a/src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py +++ b/src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py @@ -27,6 +27,7 @@ def __init__(self): "User", "UserGroup", "JobGroup", + "VO", "MinorStatus", "ApplicationStatus", "JobSplitType", @@ -47,6 +48,7 @@ def __init__(self): "MinorStatus": {"type": "keyword"}, "User": {"type": "keyword"}, "JobGroup": {"type": "keyword"}, + "VO": {"type": "keyword"}, "UserGroup": {"type": "keyword"}, "Tier": {"type": "keyword"}, "Type": {"type": "keyword"}, diff --git a/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py b/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py index 09762aeb044..8279a90e52b 100644 --- a/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py +++ b/src/DIRAC/MonitoringSystem/Service/WebAppHandler.py @@ -336,25 +336,25 @@ def export_getSiteSummarySelectors(cls): types_getApplicationStates = [] @classmethod - def export_getApplicationStates(cls, condDict=None, older=None, newer=None): + def export_getApplicationStates(cls): """Return Distinct Values of ApplicationStatus job Attribute in WMS""" - return cls.jobDB.getDistinctJobAttributes("ApplicationStatus", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT ApplicationStatus FROM JobsHistorySummary") types_getJobTypes = [] @classmethod - def export_getJobTypes(cls, condDict=None, older=None, newer=None): + def export_getJobTypes(cls): """Return Distinct Values of JobType job Attribute in WMS""" - return cls.jobDB.getDistinctJobAttributes("JobType", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT JobType FROM JobsHistorySummary") types_getOwners = [] @classmethod - def export_getOwners(cls, condDict=None, older=None, newer=None): + def export_getOwners(cls): """ Return Distinct Values of Owner job Attribute in WMS """ - return cls.jobDB.getDistinctJobAttributes("Owner", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT Owner FROM JobsHistorySummary") types_getOwnerGroup = [] @@ -363,43 +363,43 @@ def export_getOwnerGroup(cls): """ Return Distinct Values of OwnerGroup from the JobDB """ - return cls.jobDB.getDistinctJobAttributes("OwnerGroup") + return cls.jobDB._query("SELECT DISTINCT OwnerGroup FROM JobsHistorySummary") types_getJobGroups = [] @classmethod - def export_getJobGroups(cls, condDict=None, older=None, cutDate=None): + def export_getJobGroups(cls): """ Return Distinct Values of ProductionId job Attribute in WMS """ - return cls.jobDB.getDistinctJobAttributes("JobGroup", condDict, older, newer=cutDate) + return cls.jobDB._query("SELECT DISTINCT JobGroup FROM JobsHistorySummary") types_getSites = [] @classmethod - def export_getSites(cls, condDict=None, older=None, newer=None): + def export_getSites(cls): """ Return Distinct Values of Site job Attribute in WMS """ - return cls.jobDB.getDistinctJobAttributes("Site", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT Site FROM JobsHistorySummary") types_getStates = [] @classmethod - def export_getStates(cls, condDict=None, older=None, newer=None): + def export_getStates(cls): """ Return Distinct Values of Status job Attribute in WMS """ - return cls.jobDB.getDistinctJobAttributes("Status", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT Status FROM JobsHistorySummary") types_getMinorStates = [] @classmethod - def export_getMinorStates(cls, condDict=None, older=None, newer=None): + def export_getMinorStates(cls): """ Return Distinct Values of Minor Status job Attribute in WMS """ - return cls.jobDB.getDistinctJobAttributes("MinorStatus", condDict, older, newer) + return cls.jobDB._query("SELECT DISTINCT MinorStatus FROM JobsHistorySummary") ############################################################################## # Transformations diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index 2d9f815262e..97063acdd6b 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -32,7 +32,6 @@ from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.TransformationSystem.Client import TransformationStatus from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient -from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB @@ -44,7 +43,6 @@ class TransformationCleaningAgent(AgentModule): """ .. class:: TransformationCleaningAgent - :param ~DIRAC.DataManagementSystem.Client.DataManager.DataManager dm: DataManager instance :param ~TransformationClient.TransformationClient transClient: TransformationClient instance :param ~FileCatalogClient.FileCatalogClient metadataClient: FileCatalogClient instance @@ -126,8 +124,6 @@ def initialize(self): self.reqClient = ReqClient() # # file catalog client self.metadataClient = FileCatalogClient() - # # job monitoring client - self.jobMonitoringClient = JobMonitoringClient() # # job DB self.jobDB = JobDB() @@ -227,7 +223,14 @@ def finalize(self): So, we should just clean from time to time. What I added here is done only when the agent finalize, and it's quite light-ish operation anyway. """ - res = self.jobDB.getDistinctJobAttributes("JobGroup", None, datetime.utcnow() - timedelta(days=365)) + + res = self.jobDB.getDistinctAttributeValues( + "Jobs", + "JobGroup", + older=datetime.utcnow() - timedelta(days=365), + timeStamp="LastUpdateTime", + ) + if not res["OK"]: self.log.error("Failed to get job groups", res["Message"]) return res @@ -271,7 +274,7 @@ def finalize(self): # Remove JobIDs that were unknown to the TransformationSystem jobGroupsToCheck = [str(transDict["TransformationID"]).zfill(8) for transDict in toClean + toArchive] - res = self.jobMonitoringClient.getJobs({"JobGroup": jobGroupsToCheck}) + res = self.jobDB.selectJobs({"JobGroup": jobGroupsToCheck}) if not res["OK"]: return res jobIDsToRemove = [int(jobID) for jobID in res["Value"]] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index 24728897175..7ca4eaf4d83 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -87,7 +87,7 @@ def initialize(self): def _getAllowedJobTypes(self): """Get valid jobTypes""" - result = self.jobDB.getDistinctJobAttributes("JobType") + result = self.jobDB._query("SELECT DISTINCT JobType FROM JobsHistorySummary") if not result["OK"]: return result cleanJobTypes = [] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index 097f8d72aa4..c366982657d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -32,19 +32,25 @@ class StatesAccountingAgent(AgentModule): __summaryKeyFieldsMapping = [ "Status", "Site", - "User", - "UserGroup", + "Owner", + "OwnerGroup", "JobGroup", + "VO", "JobType", "ApplicationStatus", "MinorStatus", ] - __summaryDefinedFields = [("ApplicationStatus", "unset"), ("MinorStatus", "unset")] - __summaryValueFieldsMapping = ["Jobs", "Reschedules"] - __renameFieldsMapping = {"JobType": "JobSplitType"} + __summaryValueFieldsMapping = ["JobCount", "RescheduleSum"] + __renameFieldsMapping = { + "Owner": "User", + "OwnerGroup": "UserGroup", + "JobType": "JobSplitType", + "JobCount": "Jobs", + "RescheduleSum": "Reschedules", + } # PilotsHistory fields - __pilotsMapping = ["GridSite", "GridType", "Status", "NumOfPilots"] + __pilotsMapping = ["GridSite", "ComputingElement", "GridType", "Status", "VO", "NumOfPilots"] def initialize(self): """Standard initialization""" @@ -88,7 +94,8 @@ def execute(self): # PilotsHistory to Monitoring if "Monitoring" in self.pilotMonitoringOption: self.log.info("Committing PilotsHistory to Monitoring") - result = PilotAgentsDB().getSummarySnapshot() + sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, ComputingElement, GridType, Status, VO;" + result = PilotAgentsDB()._query(sql) now = datetime.datetime.utcnow() if not result["OK"]: self.log.error( @@ -96,7 +103,7 @@ def execute(self): f"{result['Message']}: won't commit PilotsHistory at this cycle", ) - values = result["Value"][1] + values = result["Value"] for record in values: rD = {} for iP, _ in enumerate(self.__pilotsMapping): @@ -112,25 +119,20 @@ def execute(self): # WMSHistory to Monitoring or Accounting self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend") - result = JobDB().getSummarySnapshot(self.__jobDBFields) - now = datetime.datetime.utcnow() + result = JobDB()._query( + f"SELECT {','.join(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping)} FROM JobsHistorySummary ORDER BY {','.join(self.__summaryKeyFieldsMapping)}" + ) if not result["OK"]: self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle") return S_ERROR() - values = result["Value"][1] - + values = result["Value"] + now = datetime.datetime.utcnow() self.log.info("Start sending WMSHistory records") for record in values: rD = {} - for fV in self.__summaryDefinedFields: - rD[fV[0]] = fV[1] - for iP, _ in enumerate(self.__summaryKeyFieldsMapping): - fieldName = self.__summaryKeyFieldsMapping[iP] + for iP, fieldName in enumerate(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping): rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP] - record = record[len(self.__summaryKeyFieldsMapping) :] - for iP, _ in enumerate(self.__summaryValueFieldsMapping): - rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) for backend in self.datastores: if backend.lower() == "monitoring": @@ -141,6 +143,8 @@ def execute(self): self.datastores["Monitoring"].addRecord(rD) elif backend.lower() == "accounting": + rD.pop("VO") # Remove VO field for Accounting + rD.pop("ApplicationStatus") # Remove ApplicationStatus for Accounting acWMS = WMSHistory() acWMS.setStartTime(now) acWMS.setEndTime(now) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py index f885ce9071a..169c74ae79b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py @@ -26,9 +26,7 @@ def jca(mocker): create=True, ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) - mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply - ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB._query", side_effect=mockReply) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 829bda4a2c0..24f1747c4a9 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -26,10 +26,10 @@ from DIRAC.Core.Utilities.ReturnValues import ( S_ERROR, S_OK, + DReturnType, + SErrorException, convertToReturnValue, returnValueOrRaise, - SErrorException, - DReturnType, ) from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus @@ -103,13 +103,6 @@ def __getAttributeNames(self): return S_OK() - ############################################################################# - def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=None, timeStamp="LastUpdateTime"): - """Get distinct values of the job attribute under specified conditions""" - return self.getDistinctAttributeValues( - "Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp - ) - ############################################################################# def getJobParameters(self, jobID, paramList=None): """Get Job Parameters defined for jobID. @@ -1480,19 +1473,6 @@ def setJobCommandStatus(self, jobID, command, status): return self._update(f"UPDATE JobCommands SET Status={status} WHERE JobID={jobID} AND Command={command}") - ##################################################################################### - def getSummarySnapshot(self, requestedFields=False): - """Get the summary snapshot for a given combination""" - if not requestedFields: - requestedFields = ["Status", "MinorStatus", "Site", "Owner", "OwnerGroup", "JobGroup"] - valueFields = ["COUNT(JobID)", "SUM(RescheduleCounter)"] - defString = ", ".join(requestedFields) - valueString = ", ".join(valueFields) - result = self._query(f"SELECT {defString}, {valueString} FROM Jobs GROUP BY {defString}") - if not result["OK"]: - return result - return S_OK(((requestedFields + valueFields), result["Value"])) - def removeInfoFromHeartBeatLogging(self, status, delTime, maxLines): """Remove HeartBeatLoggingInfo from DB. diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql index 6b6704b364e..2d040de8489 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql @@ -40,7 +40,7 @@ CREATE TABLE `Jobs` ( `JobName` VARCHAR(128) NOT NULL DEFAULT 'Unknown', `Owner` VARCHAR(64) NOT NULL DEFAULT 'Unknown', `OwnerGroup` VARCHAR(128) NOT NULL DEFAULT 'Unknown', - `VO` VARCHAR(32) NOT NULL DEFAULT 'Unknown', + `VO` VARCHAR(64) NOT NULL DEFAULT 'Unknown', `SubmissionTime` DATETIME DEFAULT NULL, `RescheduleTime` DATETIME DEFAULT NULL, `LastUpdateTime` DATETIME DEFAULT NULL, @@ -126,9 +126,123 @@ CREATE TABLE `JobCommands` ( `JobID` INT(11) UNSIGNED NOT NULL, `Command` VARCHAR(100) NOT NULL, `Arguments` VARCHAR(100) NOT NULL, - `Status` VARCHAR(64) NOT NULL DEFAULT 'Received', + `Status` VARCHAR(32) NOT NULL DEFAULT 'Received', `ReceptionTime` DATETIME NOT NULL, `ExecutionTime` DATETIME DEFAULT NULL, PRIMARY KEY (`JobID`,`Arguments`,`ReceptionTime`), FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +-- ------------------------------------------------------------------------------ +-- summary table and triggers +-- ------------------------------------------------------------------------------ + +-- summary for JobsHistory + +DROP TABLE IF EXISTS `JobsHistorySummary`; +CREATE TABLE `JobsHistorySummary` ( + `ID` INT UNSIGNED NOT NULL AUTO_INCREMENT, + `Status` VARCHAR(32), + `Site` VARCHAR(100), + `Owner` VARCHAR(32), + `OwnerGroup` VARCHAR(128), + `VO` VARCHAR(64), + `JobGroup` VARCHAR(32), + `JobType` VARCHAR(32), + `ApplicationStatus` VARCHAR(255), + `MinorStatus` VARCHAR(128), + `JobCount` INT, + `RescheduleSum` INT, + PRIMARY KEY (`ID`), + UNIQUE KEY uq_summary ( + `Status`, + `Site`, + `Owner`, + `OwnerGroup`(32), + `VO`, + `JobGroup`, + `JobType`, + `ApplicationStatus`(128), + `MinorStatus` + ) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +-- now the triggers + +DELIMITER // + +CREATE TRIGGER trg_Jobs_insert +AFTER INSERT ON Jobs +FOR EACH ROW +BEGIN + INSERT INTO JobsHistorySummary (Status, Site, Owner, OwnerGroup, VO, JobGroup, JobType, ApplicationStatus, MinorStatus, JobCount, RescheduleSum) + VALUES (NEW.Status, NEW.Site, NEW.Owner, NEW.OwnerGroup, NEW.VO, NEW.JobGroup, NEW.JobType, NEW.ApplicationStatus, NEW.MinorStatus, 1, NEW.RescheduleCounter) + ON DUPLICATE KEY UPDATE JobCount = JobCount + 1, RescheduleSum = RescheduleSum + NEW.RescheduleCounter; +END; +// + +CREATE TRIGGER trg_Jobs_delete +AFTER DELETE ON Jobs +FOR EACH ROW +BEGIN + UPDATE JobsHistorySummary + SET JobCount = JobCount - 1, RescheduleSum = RescheduleSum - OLD.RescheduleCounter + WHERE Status = OLD.Status + AND Site = OLD.Site + AND Owner = OLD.Owner + AND OwnerGroup = OLD.OwnerGroup + AND VO = OLD.VO + AND JobGroup = OLD.JobGroup + AND JobType = OLD.JobType + AND ApplicationStatus = OLD.ApplicationStatus + AND MinorStatus = OLD.MinorStatus; + + -- Remove zero rows + DELETE FROM JobsHistorySummary + WHERE JobCount = 0 + AND Status = OLD.Status + AND Site = OLD.Site + AND Owner = OLD.Owner + AND OwnerGroup = OLD.OwnerGroup + AND VO = OLD.VO + AND JobGroup = OLD.JobGroup + AND JobType = OLD.JobType + AND ApplicationStatus = OLD.ApplicationStatus + AND MinorStatus = OLD.MinorStatus; +END; +// + +CREATE TRIGGER trg_Jobs_update_status +AFTER UPDATE ON Jobs +FOR EACH ROW +BEGIN + IF OLD.Status != NEW.Status THEN + + -- Decrease count from old status + UPDATE JobsHistorySummary + SET JobCount = JobCount - 1, RescheduleSum = RescheduleSum - OLD.RescheduleCounter + WHERE Status = OLD.Status + AND Site = OLD.Site + AND Owner = OLD.Owner + AND OwnerGroup = OLD.OwnerGroup + AND VO = OLD.VO + AND JobGroup = OLD.JobGroup + AND JobType = OLD.JobType + AND ApplicationStatus = OLD.ApplicationStatus + AND MinorStatus = OLD.MinorStatus; + + -- Delete row if count drops to zero + DELETE FROM JobsHistorySummary WHERE JobCount = 0; + + -- Increase count for new status + INSERT INTO JobsHistorySummary (Status, Site, Owner, OwnerGroup, JobGroup, VO, JobType, ApplicationStatus, MinorStatus, JobCount, RescheduleSum) + VALUES (NEW.Status, NEW.Site, NEW.Owner, NEW.OwnerGroup, NEW.JobGroup, NEW.VO, NEW.JobType, NEW.ApplicationStatus, NEW.MinorStatus, 1, NEW.RescheduleCounter) + ON DUPLICATE KEY UPDATE JobCount = JobCount + 1, RescheduleSum = RescheduleSum + NEW.RescheduleCounter; + + END IF; +END; +// + +DELIMITER ; diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index 1a791ea54ef..53f6dc78c27 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -943,15 +943,13 @@ def getPilotSummaryWeb(self, selectDict, sortList, startItem, maxItems): def getPilotMonitorSelectors(self): """Get distinct values for the Pilot Monitor page selectors""" - paramNames = ["VO", "GridType", "Status", "DestinationSite", "GridSite"] - resultDict = {} - for param in paramNames: - result = self.getDistinctAttributeValues("PilotAgents", param) - if result["OK"]: - resultDict[param] = result["Value"] - else: + for param in ["VO", "Status", "ComputingElement", "GridSite"]: + result = self.getDistinctAttributeValues("PilotsHistorySummary", param) + if not result["OK"]: resultDict = [] + else: + resultDict[param] = result["Value"] return S_OK(resultDict) @@ -1048,18 +1046,6 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems): return S_OK(resultDict) - def getSummarySnapshot(self, requestedFields=False): - """Get the summary snapshot for a given combination""" - if not requestedFields: - requestedFields = ["GridSite", "GridType", "Status"] - valueFields = ["COUNT(PilotID)"] - defString = ", ".join(requestedFields) - valueString = ", ".join(valueFields) - result = self._query(f"SELECT {defString}, {valueString} FROM PilotAgents GROUP BY {defString}") - if not result["OK"]: - return result - return S_OK(((requestedFields + valueFields), result["Value"])) - class PivotedPilotSummaryTable: """ diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql index 7168e529f74..548484c7782 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql @@ -1,5 +1,3 @@ --- $Header: /tmp/libdirac/tmp.stZoy15380/dirac/DIRAC3/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql,v 1.20 2009/08/26 09:39:53 rgracian Exp $ - -- ------------------------------------------------------------------------------ -- -- Schema definition for the PilotAgentsDB database - containing the Pilots status @@ -34,7 +32,7 @@ CREATE TABLE `PilotAgents` ( `DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned', `Queue` VARCHAR(128) NOT NULL DEFAULT 'Unknown', `GridSite` VARCHAR(128) NOT NULL DEFAULT 'Unknown', - `VO` VARCHAR(128) NOT NULL, + `VO` VARCHAR(64) NOT NULL, `GridType` VARCHAR(32) NOT NULL DEFAULT 'LCG', `BenchMark` DOUBLE NOT NULL DEFAULT 0.0, `SubmissionTime` DATETIME DEFAULT NULL, @@ -48,7 +46,6 @@ CREATE TABLE `PilotAgents` ( KEY `Statuskey` (`GridSite`,`DestinationSite`,`Status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - DROP TABLE IF EXISTS `JobToPilotMapping`; CREATE TABLE `JobToPilotMapping` ( `PilotID` INT(11) UNSIGNED NOT NULL, @@ -65,3 +62,88 @@ CREATE TABLE `PilotOutput` ( `StdError` MEDIUMTEXT, PRIMARY KEY (`PilotID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +-- ------------------------------------------------------------------------------ +-- summary table and triggers +-- ------------------------------------------------------------------------------ + +-- summary for PilotsHistory + +DROP TABLE IF EXISTS `PilotsHistorySummary`; +CREATE TABLE `PilotsHistorySummary` ( + `GridSite` VARCHAR(128), + `ComputingElement` VARCHAR(128), + `GridType` VARCHAR(128), + `Status` VARCHAR(32), + `VO` VARCHAR(64), + `PilotCount` INT, + PRIMARY KEY (`GridSite`,`ComputingElement`,`GridType`,`Status`, `VO`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +-- now the triggers + +DELIMITER // + +CREATE TRIGGER trg_PilotAgents_insert +AFTER INSERT ON PilotAgents +FOR EACH ROW +BEGIN + INSERT INTO PilotsHistorySummary (GridSite, ComputingElement, GridType, Status, VO, PilotCount) + VALUES (NEW.GridSite, NEW.DestinationSite, NEW.GridType, NEW.Status, NEW.VO, 1) + ON DUPLICATE KEY UPDATE PilotCount = PilotCount + 1; +END; +// + +CREATE TRIGGER trg_PilotAgents_delete +AFTER DELETE ON PilotAgents +FOR EACH ROW +BEGIN + UPDATE PilotsHistorySummary + SET PilotCount = PilotCount - 1 + WHERE GridSite = OLD.GridSite + AND ComputingElement = OLD.DestinationSite + AND GridType = OLD.GridType + AND Status = OLD.Status + AND VO = OLD.VO; + + -- Remove zero rows + DELETE FROM PilotsHistorySummary + WHERE PilotCount = 0 + AND GridSite = OLD.GridSite + AND ComputingElement = OLD.DestinationSite + AND GridType = OLD.GridType + AND Status = OLD.Status + AND VO = OLD.VO; +END; +// + +CREATE TRIGGER trg_PilotAgents_update_status +AFTER UPDATE ON PilotAgents +FOR EACH ROW +BEGIN + IF OLD.Status != NEW.Status THEN + + -- Decrease count from old status + UPDATE PilotsHistorySummary + SET PilotCount = PilotCount - 1 + WHERE GridSite = OLD.GridSite + AND ComputingElement = OLD.DestinationSite + AND GridType = OLD.GridType + AND Status = OLD.Status + AND VO = OLD.VO; + + -- Delete row if count drops to zero + DELETE FROM PilotsHistorySummary WHERE PilotCount = 0; + + -- Increase count for new status + INSERT INTO PilotsHistorySummary (GridSite, ComputingElement, GridType, Status, VO, PilotCount) + VALUES (NEW.GridSite, NEW.DestinationSite, NEW.GridType, NEW.Status, NEW.VO, 1) + ON DUPLICATE KEY UPDATE PilotCount = PilotCount + 1; + + END IF; +END; +// + +DELIMITER ; diff --git a/tests/Integration/Monitoring/Test_MonitoringSystem.py b/tests/Integration/Monitoring/Test_MonitoringSystem.py index 85e9d550740..38dfec1c72d 100644 --- a/tests/Integration/Monitoring/Test_MonitoringSystem.py +++ b/tests/Integration/Monitoring/Test_MonitoringSystem.py @@ -73,18 +73,7 @@ def test_listUniqueKeyValues(putAndDelete): assert "User" in result["Value"] assert "JobGroup" in result["Value"] assert "UserGroup" in result["Value"] - assert result["Value"] == { - "Status": [], - "JobSplitType": [], - "MinorStatus": [], - "Site": [], - "ApplicationStatus": [], - "User": [], - "JobGroup": [], - "UserGroup": [], - "Tier": [], - "Type": [], - } + assert "VO" in result["Value"] def test_generateDelayedPlot(putAndDelete): diff --git a/tests/Integration/Monitoring/Test_WebAppClient.py b/tests/Integration/Monitoring/Test_WebAppClient.py index aa76003fdb6..e4c48059e67 100644 --- a/tests/Integration/Monitoring/Test_WebAppClient.py +++ b/tests/Integration/Monitoring/Test_WebAppClient.py @@ -13,7 +13,6 @@ def test_WebAppClient(): res = WebAppClient().getSiteSummaryWeb({}, [], 0, 100) assert res["OK"], res["Message"] - assert res["Value"]["TotalRecords"] in [0, 1, 2, 34] res = WebAppClient().getSiteSummarySelectors() assert res["OK"], res["Message"] diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py index 0987ebdc618..9d7308216b7 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py @@ -6,6 +6,7 @@ # pylint: disable=wrong-import-position, missing-docstring +import csv from datetime import datetime, timedelta from unittest.mock import MagicMock, patch @@ -443,3 +444,104 @@ def test_attributes(jobDB): res = jobDB.getJobsAttributes([jobID_1, jobID_2], ["Status"]) assert res["OK"], res["Message"] assert res["Value"] == {jobID_1: {"Status": JobStatus.DONE}, jobID_2: {"Status": JobStatus.RUNNING}} + + +# Parse date strings into datetime objects +def process_data(jobIDs, data): + converted_data = [] + + full_data = [] + + for j, d in zip(jobIDs, data): + row = list(d) + row.insert(0, j) # Insert JobID at the beginning of the row + full_data.append(row) + + for row in full_data: + # date fields + date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields + for i in date_indices: + if not row[i]: + row[i] = None + else: + try: + row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S") + except ValueError: + # Handle invalid dates + row[i] = None + # Convert other fields to appropriate types + int_indices = [17, 18] # Positions of integer fields + for i in int_indices: + if not row[i]: + row[i] = 0 + else: + try: + row[i] = int(row[i]) + except ValueError: + # Handle invalid integers + row[i] = 0 + converted_data.append(tuple(row)) + return converted_data + + +def test_summarySnapshot(): + # first delete all jobs + jobDB = JobDB() + for table in [ + "InputData", + "JobParameters", + "AtticJobParameters", + "HeartBeatLoggingInfo", + "OptimizerParameters", + "JobCommands", + "Jobs", + "JobJDLs", + ]: + sqlCmd = f"DELETE from `{table}`" + jobDB._update(sqlCmd) + sql = "DELETE FROM JobsHistorySummary" + res = jobDB._update(sql) + assert res["OK"], res["Message"] + + # insert some predefined jobs to test the summary snapshot + with open("jobs.csv", newline="", encoding="utf-8") as csvfile: + csvreader = csv.reader(csvfile) + data = list(csvreader) + + # First inserting the JDLs + jdlData = [(jdl, "", "")] * len(data) + res = jobDB._updatemany("INSERT INTO JobJDLs (JDL, JobRequirements, OriginalJDL) VALUES (%s,%s,%s)", jdlData) + assert res["OK"], res["Message"] + # Getting which JobIDs were inserted + res = jobDB._query("SELECT JobID FROM JobJDLs") + assert res["OK"], res["Message"] + jobIDs = [row[0] for row in res["Value"]][0 : len(data)] + + # Now inserting the jobs + processed_data = process_data(jobIDs, data) + placeholders = ",".join(["%s"] * len(processed_data[0])) + sql = f"INSERT INTO Jobs (JobID, JobType, JobGroup, Site, JobName, Owner, OwnerGroup, VO, SubmissionTime, RescheduleTime, LastUpdateTime, StartExecTime, HeartBeatTime, EndExecTime, Status, MinorStatus, ApplicationStatus, UserPriority, RescheduleCounter, VerifiedFlag, AccountedFlag) VALUES ({placeholders})" + res = jobDB._updatemany(sql, processed_data) + assert res["OK"], res["Message"] + + requestedFields = [ + "Status", + "MinorStatus", + "Site", + "Owner", + "OwnerGroup", + "VO", + "JobGroup", + "JobType", + "ApplicationStatus", + ] + defString = ", ".join(requestedFields) + + # Check it corresponds to the basic "GROUP BY" query + simple_query = f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum FROM Jobs GROUP BY {defString} ORDER BY {defString};" + res_sq = jobDB._query(simple_query) + assert res_sq["OK"], res_sq["Message"] + sql = f"SELECT {defString}, JobCount, RescheduleSum FROM JobsHistorySummary ORDER BY {defString};" + result_summary = jobDB._query(sql) + assert result_summary["OK"], result_summary["Message"] + assert sorted(res_sq["Value"]) == sorted(result_summary["Value"]) diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py index 3243ae9ad54..5b836db16dd 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py @@ -5,6 +5,8 @@ """ # pylint: disable=wrong-import-position +import csv +from datetime import datetime, timedelta from unittest.mock import patch import DIRAC @@ -186,3 +188,96 @@ def test_PivotedPilotSummaryTable(): assert "Total" in columns cleanUpPilots(pilotRef) + + +# Parse date strings into datetime objects +def process_data(data): + converted_data = [] + + for row in data: + # date fields + date_indices = [10, 11] # Positions of date fields + for i in date_indices: + if not row[i]: + row[i] = None + else: + try: + row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S") + except ValueError: + # Handle invalid dates + row[i] = None + # Convert other fields to appropriate types + int_indices = [0, 1] # Positions of integer fields + for i in int_indices: + if not row[i]: + row[i] = 0 + else: + try: + row[i] = int(row[i]) + except ValueError: + # Handle invalid integers + row[i] = 0 + float_indices = [9] # Positions of float fields + for i in float_indices: + if not row[i]: + row[i] = 0 + else: + try: + row[i] = float(row[i]) + except ValueError: + # Handle invalid float + row[i] = 0 + converted_data.append(tuple(row)) + return converted_data + + +def test_summarySnapshot(): + # first delete all pilots + sql = "DELETE FROM PilotAgents" + res = paDB._update(sql) + assert res["OK"], res["Message"] + sql = "DELETE FROM PilotsHistorySummary" + res = paDB._update(sql) + assert res["OK"], res["Message"] + + # insert some predefined pilots to test the summary snapshot + with open("pilots.csv", newline="", encoding="utf-8") as csvfile: + csvreader = csv.reader(csvfile) + data = list(csvreader) + processed_data = process_data(data) + placeholders = ",".join(["%s"] * len(processed_data[0])) + sql = f"INSERT INTO PilotAgents (InitialJobID, CurrentJobID, PilotJobReference, PilotStamp, DestinationSite, Queue, GridSite, VO, GridType, BenchMark, SubmissionTime, LastUpdateTime, Status, StatusReason, AccountingSent) VALUES ({placeholders})" + res = paDB._updatemany(sql, processed_data) + assert res["OK"], res["Message"] + sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, ComputingElement, GridType, Status, VO;" + result = PilotAgentsDB()._query(sql) + assert result["OK"], result["Message"] + values = result["Value"][1] + assert len(values) == 6, "Expected 6 record in the summary" + # Check it corresponds to the basic "GROUP BY" query + sql = "SELECT GridSite, DestinationSite, GridType, Status, VO, COUNT(*) FROM PilotAgents GROUP BY GridSite, DestinationSite, GridType, Status, VO ORDER BY GridSite, DestinationSite, GridType, Status, VO;" + result_grouped = PilotAgentsDB()._query(sql) + assert result_grouped["OK"], result_grouped["Message"] + sql = "SELECT * FROM PilotsHistorySummary ORDER BY GridSite, ComputingElement, GridType, Status, VO;" + result_summary = PilotAgentsDB()._query(sql) + assert result_summary["OK"], result_summary["Message"] + assert result_grouped["Value"] == result_summary["Value"], "Summary and grouped query results differ" + + # deleting now + with open("pilots.csv", newline="", encoding="utf-8") as csvfile: + csvreader = csv.reader(csvfile) + data = list(csvreader) + processed_data = process_data(data) + pilotStamps = [row[3] for row in processed_data] + pilotStampsStr = ",".join("'" + p + "'" for p in pilotStamps) + sql = f"DELETE FROM PilotAgents WHERE PilotStamp IN (%s)" % pilotStampsStr + res = paDB._update(sql) + assert res["OK"], res["Message"] + # Check it corresponds to the basic "GROUP BY" query + sql = "SELECT GridSite, DestinationSite, GridType, Status, VO, COUNT(*) FROM PilotAgents GROUP BY GridSite, DestinationSite, GridType, Status, VO ORDER BY GridSite, DestinationSite, GridType, Status, VO;" + result_grouped = PilotAgentsDB()._query(sql) + assert result_grouped["OK"], result_grouped["Message"] + sql = "select * FROM PilotsHistorySummary ORDER BY GridSite, ComputingElement, GridType, Status, VO;" + result_summary = PilotAgentsDB()._query(sql) + assert result_summary["OK"], result_summary["Message"] + assert result_grouped["Value"] == result_summary["Value"], "Summary and grouped query results differ" diff --git a/tests/Integration/WorkloadManagementSystem/examples.tgz b/tests/Integration/WorkloadManagementSystem/examples.tgz new file mode 100644 index 00000000000..d831aa82791 Binary files /dev/null and b/tests/Integration/WorkloadManagementSystem/examples.tgz differ diff --git a/tests/Integration/all_integration_server_tests.sh b/tests/Integration/all_integration_server_tests.sh index 71eb8f3ee66..09f035b7ca0 100644 --- a/tests/Integration/all_integration_server_tests.sh +++ b/tests/Integration/all_integration_server_tests.sh @@ -46,6 +46,8 @@ pytest --no-check-dirac-environment "${THIS_DIR}/ResourceStatusSystem/Test_FullC #-------------------------------------------------------------------------------# echo -e "*** $(date -u) **** WMS TESTS ****\n" +cp "${THIS_DIR}/WorkloadManagementSystem/examples.tgz" . +tar -xzf examples.tgz pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_JobLoggingDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" )) pytest --no-check-dirac-environment "${THIS_DIR}/WorkloadManagementSystem/Test_TaskQueueDB.py" |& tee -a "${SERVER_TEST_OUTPUT}"; (( ERR |= "${?}" ))