From 301496f01bbed04a6d84dc735a8c68fb99a12a67 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 6 Jan 2026 14:17:01 +0100 Subject: [PATCH 1/7] test: adding the input files to test job definitions --- src/DIRAC/tests/Utilities/testJobDefinitions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/DIRAC/tests/Utilities/testJobDefinitions.py b/src/DIRAC/tests/Utilities/testJobDefinitions.py index 8d5708155f2..082e3de0280 100644 --- a/src/DIRAC/tests/Utilities/testJobDefinitions.py +++ b/src/DIRAC/tests/Utilities/testJobDefinitions.py @@ -91,6 +91,7 @@ def helloWorld_input(): except IndexError: # we are in Jenkins J.setInputSandbox([find_all("exe-script-with-input.py", "/home/dirac", "DIRAC/tests/Workflow")[0]]) J.setExecutable("exe-script-with-input.py", "", "helloWorld.log") + J.setInputData(["/dteam/user/f/fstagni/test/testInputFile.txt"]) return endOfAllJobs(J) @@ -108,6 +109,7 @@ def helloWorld_input_single(): [find_all("exe-script-with-input-single-location.py", "/home/dirac", "DIRAC/tests/Workflow")[0]] ) J.setExecutable("exe-script-with-input-single-location.py", "", "helloWorld.log") + J.setInputData(["/dteam/user/f/fstagni/test/testInputFileSingleLocation.txt"]) return endOfAllJobs(J) From d55bd714488ad277b4a986b41d48c97c4f377a03 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Tue, 6 Jan 2026 15:43:12 +0100 Subject: [PATCH 2/7] docs: fix -- right tabulation --- .../DeveloperInstallation/stuffThatRun.rst | 2 +- .../Integration/WorkloadManagementSystem/Test_TaskQueueDB.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/source/DeveloperGuide/DevelopmentEnvironment/DeveloperInstallation/stuffThatRun.rst b/docs/source/DeveloperGuide/DevelopmentEnvironment/DeveloperInstallation/stuffThatRun.rst index df19860b3c7..de8cd6c1119 100644 --- a/docs/source/DeveloperGuide/DevelopmentEnvironment/DeveloperInstallation/stuffThatRun.rst +++ b/docs/source/DeveloperGuide/DevelopmentEnvironment/DeveloperInstallation/stuffThatRun.rst @@ -47,7 +47,7 @@ Now you can run the test with: .. code-block:: bash -pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py + pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/` diff --git a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py index 80c26faffd7..461a86a0b08 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py @@ -577,7 +577,7 @@ def test_chainWithTags(): # Matching - # Matching Everything with Tag = "ANY" + # Matching Everything with Tag = "" result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "ANY"}, numQueuesToGet=6) assert result["OK"] # this should match whatever @@ -682,8 +682,7 @@ def test_chainWithTags(): res = {int(x[0]) for x in result["Value"]} assert res == {tq_job1, tq_job6} - # NumberOfProcessors and MaxRAM - # This is translated to "#Processors" by the SiteDirector + # This is translated to "#Processors" by the Matcher result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "4Processors"}, numQueuesToGet=4) assert result["OK"] From 057f8a519ae9513f33c779c9684b402b440beb6a Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Thu, 8 Jan 2026 17:27:13 +0100 Subject: [PATCH 3/7] feat: added a new table to TaskQueueDB for RAM requirements and matching --- .../Client/Matcher.py | 35 ++--- .../DB/TaskQueueDB.py | 144 +++++++++++++++++- .../Test_TaskQueueDB.py | 130 ++++++++++++++++ 3 files changed, 281 insertions(+), 28 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py index ec01894a7ad..e6f5eab9671 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py @@ -17,7 +17,11 @@ from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueMatchFields, singleValueDefFields +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import ( + TaskQueueDB, + multiValueMatchFields, + singleValueDefFields, +) class PilotVersionError(Exception): @@ -69,8 +73,8 @@ def selectJob(self, resourceDescription, credDict): # Make a nice print of the resource matching parameters toPrintDict = dict(resourceDict) - if "MaxRAM" in resourceDescription: - toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"] + if "RAM" in resourceDict: + toPrintDict["RAM"] = resourceDict["RAM"] if "NumberOfProcessors" in resourceDescription: toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"] toPrintDict["Tag"] = [] @@ -167,11 +171,7 @@ def _processResourceDescription(self, resourceDescription): """ resourceDict = {} - for name in singleValueDefFields: - if name in resourceDescription: - resourceDict[name] = resourceDescription[name] - - for name in multiValueMatchFields: + for name in singleValueDefFields + multiValueMatchFields + ["RAM"]: if name in resourceDescription: resourceDict[name] = resourceDescription[name] @@ -192,25 +192,18 @@ def _processResourceDescription(self, resourceDescription): if "JobID" in resourceDescription: resourceDict["JobID"] = resourceDescription["JobID"] - # Convert MaxRAM and NumberOfProcessors parameters into a list of tags - maxRAM = resourceDescription.get("MaxRAM") - if maxRAM: - try: - maxRAM = int(maxRAM) - except ValueError: - maxRAM = None + # Convert NumberOfProcessors parameters into a list of tags nProcessors = resourceDescription.get("NumberOfProcessors") if nProcessors: try: nProcessors = int(nProcessors) except ValueError: nProcessors = None - for param, key, limit, increment in [(maxRAM, "MB", 1024 * 1024, 256), (nProcessors, "Processors", 1024, 1)]: - if param and param <= limit: - paramList = list(range(increment, param + increment, increment)) - paramTags = ["%d%s" % (par, key) for par in paramList] - if paramTags: - resourceDict.setdefault("Tag", []).extend(paramTags) + if nProcessors and nProcessors <= 1024: + paramList = list(range(1, nProcessors + 1, 1)) + paramTags = ["%d%s" % (par, "Processors") for par in paramList] + if paramTags: + resourceDict.setdefault("Tag", []).extend(paramTags) # Add 'MultiProcessor' to the list of tags if nProcessors and nProcessors > 1: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index f235c50ac2a..ad45941a3c1 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -1,5 +1,5 @@ -""" TaskQueueDB class is a front-end to the task queues db -""" +"""TaskQueueDB class is a front-end to the task queues db""" + import random import string from collections import defaultdict @@ -22,12 +22,13 @@ # For checks at insertion time, and not only singleValueDefFields = ("Owner", "OwnerGroup", "CPUTime") multiValueDefFields = ("Sites", "GridCEs", "BannedSites", "Platforms", "JobTypes", "Tags") +rangeValueDefFields = ("MinRAM", "MaxRAM") # Used for matching multiValueMatchFields = ("GridCE", "Site", "Platform", "JobType", "Tag") bannedJobMatchFields = ("Site",) mandatoryMatchFields = ("CPUTime",) -priorityIgnoredFields = ("Sites", "BannedSites") +priorityIgnoredFields = ("Sites", "BannedSites", "MinRAM", "MaxRAM") def _lowerAndRemovePunctuation(s): @@ -129,6 +130,16 @@ def __initializeDB(self): "ForeignKeys": {"TQId": "tq_TaskQueues.TQId"}, } + self.__tablesDesc["tq_RAM_requirements"] = { + "Fields": { + "TQId": "INTEGER(11) UNSIGNED NOT NULL", + "MinRAM": "INTEGER UNSIGNED NOT NULL DEFAULT 0", + "MaxRAM": "INTEGER UNSIGNED NOT NULL DEFAULT 0", + }, + "PrimaryKey": "TQId", + "ForeignKeys": {"TQId": "tq_TaskQueues.TQId"}, + } + for multiField in multiValueDefFields: tableName = f"tq_TQTo{multiField}" self.__tablesDesc[tableName] = { @@ -206,6 +217,20 @@ def _checkTaskQueueDefinition(self, tqDefDict): return result tqDefDict[field] = result["Value"] + # Check range value fields (RAM requirements) + for field in rangeValueDefFields: + if field not in tqDefDict: + continue + if not isinstance(tqDefDict[field], int): + return S_ERROR(f"Range value field {field} value type is not valid: {type(tqDefDict[field])}") + if tqDefDict[field] < 0: + return S_ERROR(f"Range value field {field} must be non-negative: {tqDefDict[field]}") + + # Validate that MinRAM <= MaxRAM if both are specified + if "MinRAM" in tqDefDict and "MaxRAM" in tqDefDict: + if tqDefDict["MaxRAM"] > 0 and tqDefDict["MinRAM"] > tqDefDict["MaxRAM"]: + return S_ERROR(f"MinRAM ({tqDefDict['MinRAM']}) cannot be greater than MaxRAM ({tqDefDict['MaxRAM']})") + return S_OK(tqDefDict) def _checkMatchDefinition(self, tqMatchDict): @@ -251,6 +276,13 @@ def travelAndCheckType(value, validTypes, escapeValues=True): return S_ERROR(f"Match definition field {field} failed : {result['Message']}") tqMatchDict[field] = result["Value"] + # Check range value fields (RAM requirements for matching) + if "RAM" in tqMatchDict: + result = travelAndCheckType(tqMatchDict["RAM"], int, escapeValues=False) + if not result["OK"]: + return S_ERROR(f"Match definition field RAM failed : {result['Message']}") + tqMatchDict["RAM"] = result["Value"] + return S_OK(tqMatchDict) def __createTaskQueue(self, tqDefDict, priority=1, connObj=False): @@ -303,6 +335,20 @@ def __createTaskQueue(self, tqDefDict, priority=1, connObj=False): self.log.error("Failed to insert condition", f"{field} : {result['Message']}") self.cleanOrphanedTaskQueues(connObj=connObj) return S_ERROR(f"Can't insert values {values} for field {field}: {result['Message']}") + + # Insert RAM requirements if specified and not both zero + if "MinRAM" in tqDefDict or "MaxRAM" in tqDefDict: + minRAM = tqDefDict.get("MinRAM", 0) + maxRAM = tqDefDict.get("MaxRAM", 0) + # Only insert if at least one value is non-zero (optimization: avoid unnecessary rows) + if minRAM > 0 or maxRAM > 0: + cmd = f"INSERT INTO `tq_RAM_requirements` (TQId, MinRAM, MaxRAM) VALUES ({tqId}, {minRAM}, {maxRAM})" + result = self._update(cmd, conn=connObj) + if not result["OK"]: + self.log.error("Failed to insert RAM requirements", result["Message"]) + self.cleanOrphanedTaskQueues(connObj=connObj) + return S_ERROR(f"Can't insert RAM requirements: {result['Message']}") + self.log.info("Created TQ", tqId) return S_OK(tqId) @@ -327,6 +373,13 @@ def cleanOrphanedTaskQueues(self, connObj=False): if not result["OK"]: return result + # Delete RAM requirements for orphaned TQs + result = self._update( + f"DELETE FROM `tq_RAM_requirements` WHERE TQId in ( {','.join(orphanedTQs)} )", conn=connObj + ) + if not result["OK"]: + return result + result = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId in ( {','.join(orphanedTQs)} )", conn=connObj) if not result["OK"]: return result @@ -473,6 +526,26 @@ def __generateTQFindSQL( sqlCondList.append(f"{numValues} = ({secondQuery} {grouping})") else: sqlCondList.append(f"`tq_TaskQueues`.TQId not in ( SELECT DISTINCT {tableName}.TQId from {tableName} )") + + # Handle RAM requirements matching + hasRAMRequirements = "MinRAM" in tqDefDict or "MaxRAM" in tqDefDict + if hasRAMRequirements: + minRAM = tqDefDict.get("MinRAM", 0) + maxRAM = tqDefDict.get("MaxRAM", 0) + # Only match TQs with the same RAM requirements if at least one is non-zero + if minRAM > 0 or maxRAM > 0: + # Match TQs that have the exact same RAM requirements + sqlCondList.append( + f"`tq_TaskQueues`.TQId IN ( SELECT TQId FROM `tq_RAM_requirements` " + f"WHERE MinRAM = {minRAM} AND MaxRAM = {maxRAM} )" + ) + else: + # Both are 0, so match TQs with no RAM requirements row + sqlCondList.append("`tq_TaskQueues`.TQId NOT IN ( SELECT DISTINCT TQId FROM `tq_RAM_requirements` )") + else: + # Match TQs that have no RAM requirements + sqlCondList.append("`tq_TaskQueues`.TQId NOT IN ( SELECT DISTINCT TQId FROM `tq_RAM_requirements` )") + # END MAGIC: That was easy ;) return S_OK(" AND ".join(sqlCondList)) @@ -722,6 +795,19 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) if "CPUTime" in tqMatchDict: sqlCondList.append(self.__generateSQLSubCond("tq.%s <= %%s" % "CPUTime", tqMatchDict["CPUTime"])) + # RAM matching logic + if "RAM" in tqMatchDict: + ram = tqMatchDict["RAM"] + # Join with tq_RAM_requirements table + sqlTables["tq_RAM_requirements"] = "ram_req" + # Match if: + # 1. No RAM requirement exists for this TQ (LEFT JOIN will give NULL) + # 2. OR the resource has at least MinRAM + # Note: MinRAM is used for matching, MaxRAM is informational for post-match scheduling + # A job requiring MinRAM=2GB can run on any machine with 2GB or more + ramCond = f"( ram_req.TQId IS NULL OR {ram} >= ram_req.MinRAM )" + sqlCondList.append(ramCond) + tag_fv = [] # Match multi value fields @@ -844,10 +930,14 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) if negativeCond: sqlCondList.append(self.__generateNotSQL(negativeCond)) - # Generate the final query string - tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % ( - " AND ".join(sqlCondList) - ) + # Generate the final query string with proper JOINs + fromClause = "`tq_TaskQueues` tq" + + # Add LEFT JOIN for RAM requirements if needed + if "tq_RAM_requirements" in sqlTables: + fromClause += " LEFT JOIN `tq_RAM_requirements` ram_req ON tq.TQId = ram_req.TQId" + + tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM %s WHERE %s" % (fromClause, " AND ".join(sqlCondList)) # Apply priorities tqSqlCmd = f"{tqSqlCmd} ORDER BY RAND() / tq.Priority ASC" @@ -994,6 +1084,12 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwner=False, tqOwnerGroup=False, connOb retVal = self._update(f"DELETE FROM `tq_TQTo{mvField}` WHERE TQId = {tqId}", conn=connObj) if not retVal["OK"]: return retVal + + # Delete RAM requirements if they exist + retVal = self._update(f"DELETE FROM `tq_RAM_requirements` WHERE TQId = {tqId}", conn=connObj) + if not retVal["OK"]: + return retVal + retVal = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId = {tqId}", conn=connObj) if not retVal["OK"]: return retVal @@ -1065,6 +1161,40 @@ def retrieveTaskQueues(self, tqIdList=None): if field not in tqData[tqId]: tqData[tqId][field] = [] tqData[tqId][field].append(value) + + # Retrieve RAM requirements (if table exists) + # Note: The table should be auto-created by __initializeDB, but we check for safety + sqlCmd = "SELECT TQId, MinRAM, MaxRAM FROM `tq_RAM_requirements`" + if tqIdList is not None: + if tqIdList: + # Only retrieve RAM requirements for specific TQIds + sqlCmd += f" WHERE TQId IN ( {', '.join([str(id_) for id_ in tqIdList])} )" + # else: empty list was already handled earlier with fast-track return + retVal = self._query(sqlCmd) + if not retVal["OK"]: + # If table doesn't exist (e.g., old installation), log a warning but continue + # This provides backward compatibility + if "doesn't exist" in retVal["Message"] or "Table" in retVal["Message"]: + self.log.warn("RAM requirements table not found, skipping RAM data retrieval", retVal["Message"]) + else: + self.log.error("Can't retrieve RAM requirements", retVal["Message"]) + return retVal + else: + for record in retVal["Value"]: + tqId = record[0] + minRAM = record[1] + maxRAM = record[2] + if tqId not in tqData: + if tqIdList is None or tqId in tqIdList: + self.log.verbose( + "Task Queue has RAM requirements but does not exist: triggering a cleaning", + f"TQID: {tqId}", + ) + tqNeedCleaning = True + else: + tqData[tqId]["MinRAM"] = minRAM + tqData[tqId]["MaxRAM"] = maxRAM + if tqNeedCleaning: self.cleanOrphanedTaskQueues() return S_OK(tqData) diff --git a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py index 461a86a0b08..d5689cb0dc4 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py @@ -1089,6 +1089,136 @@ def test_ComplexMatching(): assert result["OK"] +def test_chainWithRAM(): + """put - remove with parameters including RAM requirements + + Note: MinRAM is the minimum RAM required for matching (resource must have at least this) + MaxRAM is informational only, used after matching for scheduling/allocation decisions + """ + + # Job 1: MinRAM=2048, MaxRAM=8192 (requires at least 2GB for matching, may use up to 8GB) + tqDefDict = { + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 5000, + "MinRAM": 2048, + "MaxRAM": 8192, + } + result = tqDB.insertJob(301, tqDefDict, 10) + assert result["OK"] + result = tqDB.getTaskQueueForJob(301) + tq_job1 = result["Value"] + assert tq_job1 > 0 + + # Job 2: MinRAM=4096, MaxRAM=0 (requires at least 4GB, MaxRAM unknown/unspecified) + tqDefDict = { + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 5000, + "MinRAM": 4096, + "MaxRAM": 0, + } + result = tqDB.insertJob(302, tqDefDict, 10) + assert result["OK"] + result = tqDB.getTaskQueueForJob(302) + tq_job2 = result["Value"] + assert tq_job2 > tq_job1 + + # Job 3: No RAM requirements (can run on any RAM) + tqDefDict = { + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 5000, + } + result = tqDB.insertJob(303, tqDefDict, 10) + assert result["OK"] + result = tqDB.getTaskQueueForJob(303) + tq_job3 = result["Value"] + assert tq_job3 > tq_job2 + + # Job 4: MinRAM=1024, MaxRAM=2048 (requires at least 1GB, may use up to 2GB) + tqDefDict = { + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 5000, + "MinRAM": 1024, + "MaxRAM": 2048, + } + result = tqDB.insertJob(304, tqDefDict, 10) + assert result["OK"] + result = tqDB.getTaskQueueForJob(304) + tq_job4 = result["Value"] + assert tq_job4 > tq_job3 + + # Verify RAM requirements are stored correctly + result = tqDB.retrieveTaskQueues([tq_job1, tq_job2, tq_job3, tq_job4]) + assert result["OK"] + tqData = result["Value"] + assert tqData[tq_job1]["MinRAM"] == 2048 + assert tqData[tq_job1]["MaxRAM"] == 8192 + assert tqData[tq_job2]["MinRAM"] == 4096 + assert tqData[tq_job2]["MaxRAM"] == 0 + assert "MinRAM" not in tqData[tq_job3] # No RAM requirements + assert "MaxRAM" not in tqData[tq_job3] + assert tqData[tq_job4]["MinRAM"] == 1024 + assert tqData[tq_job4]["MaxRAM"] == 2048 + + # Matching tests + # Remember: Matching is based on MinRAM only (resource_RAM >= MinRAM) + + # Resource with 1536 MB RAM (1.5 GB) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 1536}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should match: tq_job3 (no requirement), tq_job4 (MinRAM=1024, 1536 >= 1024) + assert res == {tq_job3, tq_job4} + + # Resource with 3072 MB RAM (3 GB) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 3072}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should match: tq_job1 (MinRAM=2048), tq_job3 (no requirement), tq_job4 (MinRAM=1024) + # tq_job2 has MinRAM=4096, so 3072 is not enough + assert res == {tq_job1, tq_job3, tq_job4} + + # Resource with 6144 MB RAM (6 GB) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 6144}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should match: all jobs (6144 >= all MinRAM values: 2048, 4096, 0, 1024) + assert res == {tq_job1, tq_job2, tq_job3, tq_job4} + + # Resource with 10240 MB RAM (10 GB) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 10240}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should match: all jobs (10GB is enough for all MinRAM requirements) + assert res == {tq_job1, tq_job2, tq_job3, tq_job4} + + # Resource with 512 MB RAM + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 512}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should only match: tq_job3 (no requirement) - 512MB is below all other MinRAM values + assert res == {tq_job3} + + # No RAM specified in match (should match all) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000}, numQueuesToGet=5) + assert result["OK"] + res = {int(x[0]) for x in result["Value"]} + # Should only match: tq_job3 (no requirement) + assert res == {tq_job1, tq_job2, tq_job3, tq_job4} + + # Clean up + for jobID in [301, 302, 303, 304]: + result = tqDB.deleteJob(jobID) + assert result["OK"] + + for tqID in [tq_job1, tq_job2, tq_job3, tq_job4]: + result = tqDB.deleteTaskQueueIfEmpty(tqID) + assert result["OK"] + + def test_TQ(): """test of various functions""" tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} From e3a310d49ea9efb6a11f22fad0b9bf2601076e5d Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Fri, 9 Jan 2026 10:47:24 +0100 Subject: [PATCH 4/7] fix: use MaxRAM instead of RAM for consistency --- src/DIRAC/WorkloadManagementSystem/Client/Matcher.py | 6 +++--- src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py | 12 ++++++------ .../WorkloadManagementSystem/Test_TaskQueueDB.py | 10 +++++----- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py index e6f5eab9671..c76e0fa89e8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py @@ -73,8 +73,8 @@ def selectJob(self, resourceDescription, credDict): # Make a nice print of the resource matching parameters toPrintDict = dict(resourceDict) - if "RAM" in resourceDict: - toPrintDict["RAM"] = resourceDict["RAM"] + if "MaxRAM" in resourceDict: + toPrintDict["MaxRAM"] = resourceDict["MaxRAM"] if "NumberOfProcessors" in resourceDescription: toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"] toPrintDict["Tag"] = [] @@ -171,7 +171,7 @@ def _processResourceDescription(self, resourceDescription): """ resourceDict = {} - for name in singleValueDefFields + multiValueMatchFields + ["RAM"]: + for name in singleValueDefFields + multiValueMatchFields + ["MaxRAM"]: if name in resourceDescription: resourceDict[name] = resourceDescription[name] diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index ad45941a3c1..96f664ec180 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -277,11 +277,11 @@ def travelAndCheckType(value, validTypes, escapeValues=True): tqMatchDict[field] = result["Value"] # Check range value fields (RAM requirements for matching) - if "RAM" in tqMatchDict: - result = travelAndCheckType(tqMatchDict["RAM"], int, escapeValues=False) + if "MaxRAM" in tqMatchDict: + result = travelAndCheckType(tqMatchDict["MaxRAM"], int, escapeValues=False) if not result["OK"]: return S_ERROR(f"Match definition field RAM failed : {result['Message']}") - tqMatchDict["RAM"] = result["Value"] + tqMatchDict["MaxRAM"] = result["Value"] return S_OK(tqMatchDict) @@ -796,8 +796,8 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) sqlCondList.append(self.__generateSQLSubCond("tq.%s <= %%s" % "CPUTime", tqMatchDict["CPUTime"])) # RAM matching logic - if "RAM" in tqMatchDict: - ram = tqMatchDict["RAM"] + if "MaxRAM" in tqMatchDict: + ram = tqMatchDict["MaxRAM"] # Join with tq_RAM_requirements table sqlTables["tq_RAM_requirements"] = "ram_req" # Match if: @@ -937,7 +937,7 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None) if "tq_RAM_requirements" in sqlTables: fromClause += " LEFT JOIN `tq_RAM_requirements` ram_req ON tq.TQId = ram_req.TQId" - tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM %s WHERE %s" % (fromClause, " AND ".join(sqlCondList)) + tqSqlCmd = f"SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM {fromClause} WHERE {' AND '.join(sqlCondList)}" # Apply priorities tqSqlCmd = f"{tqSqlCmd} ORDER BY RAND() / tq.Priority ASC" diff --git a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py index d5689cb0dc4..f438db99d76 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py @@ -1167,14 +1167,14 @@ def test_chainWithRAM(): # Remember: Matching is based on MinRAM only (resource_RAM >= MinRAM) # Resource with 1536 MB RAM (1.5 GB) - result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 1536}, numQueuesToGet=5) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "MaxRAM": 1536}, numQueuesToGet=5) assert result["OK"] res = {int(x[0]) for x in result["Value"]} # Should match: tq_job3 (no requirement), tq_job4 (MinRAM=1024, 1536 >= 1024) assert res == {tq_job3, tq_job4} # Resource with 3072 MB RAM (3 GB) - result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 3072}, numQueuesToGet=5) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "MaxRAM": 3072}, numQueuesToGet=5) assert result["OK"] res = {int(x[0]) for x in result["Value"]} # Should match: tq_job1 (MinRAM=2048), tq_job3 (no requirement), tq_job4 (MinRAM=1024) @@ -1182,21 +1182,21 @@ def test_chainWithRAM(): assert res == {tq_job1, tq_job3, tq_job4} # Resource with 6144 MB RAM (6 GB) - result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 6144}, numQueuesToGet=5) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "MaxRAM": 6144}, numQueuesToGet=5) assert result["OK"] res = {int(x[0]) for x in result["Value"]} # Should match: all jobs (6144 >= all MinRAM values: 2048, 4096, 0, 1024) assert res == {tq_job1, tq_job2, tq_job3, tq_job4} # Resource with 10240 MB RAM (10 GB) - result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 10240}, numQueuesToGet=5) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "MaxRAM": 10240}, numQueuesToGet=5) assert result["OK"] res = {int(x[0]) for x in result["Value"]} # Should match: all jobs (10GB is enough for all MinRAM requirements) assert res == {tq_job1, tq_job2, tq_job3, tq_job4} # Resource with 512 MB RAM - result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "RAM": 512}, numQueuesToGet=5) + result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "MaxRAM": 512}, numQueuesToGet=5) assert result["OK"] res = {int(x[0]) for x in result["Value"]} # Should only match: tq_job3 (no requirement) - 512MB is below all other MinRAM values From 8b7fef4e8d9ba593a586e1f43fd4b4c01302e1a8 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Fri, 9 Jan 2026 14:29:47 +0100 Subject: [PATCH 5/7] test: use fixture for removing jobs and tqs --- .../Client/Matcher.py | 2 +- .../Test_TaskQueueDB.py | 231 ++++++++++++++---- 2 files changed, 181 insertions(+), 52 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py index c76e0fa89e8..6a81b8f85d2 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py @@ -171,7 +171,7 @@ def _processResourceDescription(self, resourceDescription): """ resourceDict = {} - for name in singleValueDefFields + multiValueMatchFields + ["MaxRAM"]: + for name in singleValueDefFields + multiValueMatchFields + ("MaxRAM",): if name in resourceDescription: resourceDict[name] = resourceDescription[name] diff --git a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py index f438db99d76..a2892a192a9 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_TaskQueueDB.py @@ -23,14 +23,67 @@ tqDB = TaskQueueDB() -def test_basiChain(): +import pytest + + +@pytest.fixture +def tq_cleanup(): + """ + Fixture to track and cleanup task queues and jobs after test completion. + + Returns: + tuple: (jobs_to_delete: list, tqs_to_delete: list) + Tests should append job IDs and TQ IDs to these lists as they create them. + + Example: + def test_something(tq_cleanup): + jobs, tqs = tq_cleanup + + result = tqDB.insertJob(123, tqDefDict, 10) + jobs.append(123) + + result = tqDB.getTaskQueueForJob(123) + tq = result["Value"] + tqs.append(tq) + + # Test assertions... + # Cleanup happens automatically even if assertions fail + """ + jobs_to_delete = [] + tqs_to_delete = [] + + yield (jobs_to_delete, tqs_to_delete) + + # Cleanup phase - runs even if test fails + gLogger.debug(f"Cleaning up {len(jobs_to_delete)} jobs and {len(set(tqs_to_delete))} task queues") + + # Delete jobs first + for job_id in jobs_to_delete: + result = tqDB.deleteJob(job_id) + if not result["OK"]: + gLogger.error(f"Failed to cleanup job {job_id}: {result.get('Message', 'Unknown error')}") + + # Delete task queues (use set to avoid duplicates) + for tq_id in set(tqs_to_delete): + result = tqDB.deleteTaskQueueIfEmpty(tq_id) + if not result["OK"]: + gLogger.error(f"Failed to cleanup task queue {tq_id}: {result.get('Message', 'Unknown error')}") + + +def test_basiChain(tq_cleanup): """a basic put - remove""" + jobs, tqs = tq_cleanup + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} result = tqDB.insertJob(123, tqDefDict, 10) assert result["OK"] + jobs.append(123) + result = tqDB.getTaskQueueForJob(123) assert result["OK"] tq = result["Value"] + tqs.append(tq) + result = tqDB.deleteJob(123) assert result["OK"] result = tqDB.cleanOrphanedTaskQueues() @@ -39,16 +92,21 @@ def test_basiChain(): assert result["OK"] -def test_chainWithParameter(): +def test_chainWithParameter(tq_cleanup): """put - remove with parameters""" + jobs, tqs = tq_cleanup + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} # first job result = tqDB.insertJob(123, tqDefDict, 10) assert result["OK"] + jobs.append(123) + result = tqDB.getTaskQueueForJob(123) assert result["OK"] tq = result["Value"] + tqs.append(tq) result = tqDB.cleanOrphanedTaskQueues() assert result["OK"] @@ -58,8 +116,12 @@ def test_chainWithParameter(): # second job result = tqDB.insertJob(125, tqDefDict, 10) assert result["OK"] + jobs.append(125) + result = tqDB.getTaskQueueForJob(125) tq = result["Value"] + tqs.append(tq) + result = tqDB.deleteTaskQueueIfEmpty(tq) # this won't delete anything, as both 123 and 125 are in assert result["OK"] # but still it won't fail assert result["Value"] is False @@ -77,8 +139,10 @@ def test_chainWithParameter(): assert result["OK"] -def test_chainWithSites(): +def test_chainWithSites(tq_cleanup): """put - remove with parameters including sites""" + jobs, tqs = tq_cleanup + tqDefDict = { "Owner": "userName", "OwnerGroup": "myGroup", @@ -87,13 +151,19 @@ def test_chainWithSites(): } result = tqDB.insertJob(201, tqDefDict, 10) assert result["OK"] + jobs.append(201) + result = tqDB.getTaskQueueForJob(201) tq_job1 = result["Value"] + tqs.append(tq_job1) result = tqDB.insertJob(2011, tqDefDict, 10) assert result["OK"] + jobs.append(2011) + result = tqDB.getTaskQueueForJob(2011) tq_job11 = result["Value"] + tqs.append(tq_job11) tqDefDict = { "Owner": "userName", @@ -103,8 +173,11 @@ def test_chainWithSites(): } result = tqDB.insertJob(203, tqDefDict, 10) assert result["OK"] + jobs.append(203) + result = tqDB.getTaskQueueForJob(203) tq_job2 = result["Value"] + tqs.append(tq_job2) tqDefDict = { "Owner": "userName", @@ -114,8 +187,10 @@ def test_chainWithSites(): } result = tqDB.insertJob(203, tqDefDict, 10) assert result["OK"] + result = tqDB.getTaskQueueForJob(203) tq_job3 = result["Value"] + tqs.append(tq_job3) # matching # this should match everything @@ -151,8 +226,10 @@ def test_chainWithSites(): assert result["OK"] -def test_chainWithBannedSites(): +def test_chainWithBannedSites(tq_cleanup): """put - remove with parameters including Banned sites""" + jobs, tqs = tq_cleanup + tqDefDict = { "Owner": "userName", "OwnerGroup": "myGroup", @@ -161,8 +238,11 @@ def test_chainWithBannedSites(): } result = tqDB.insertJob(127, tqDefDict, 10) assert result["OK"] + jobs.append(127) + result = tqDB.getTaskQueueForJob(127) tq_job1 = result["Value"] + tqs.append(tq_job1) tqDefDict = { "Owner": "userName", @@ -172,8 +252,11 @@ def test_chainWithBannedSites(): } result = tqDB.insertJob(128, tqDefDict, 10) assert result["OK"] + jobs.append(128) + result = tqDB.getTaskQueueForJob(128) tq_job2 = result["Value"] + tqs.append(tq_job2) # matching # this should match everything @@ -225,8 +308,9 @@ def test_chainWithBannedSites(): assert tqId not in result["Value"] -def test_chainWithPlatforms(): +def test_chainWithPlatforms(tq_cleanup): """put - remove with parameters including a platform""" + jobs, tqs = tq_cleanup # We'll try the following case # @@ -244,14 +328,20 @@ def test_chainWithPlatforms(): } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] + jobs.append(1) + result = tqDB.getTaskQueueForJob(1) tq_job1 = result["Value"] + tqs.append(tq_job1) assert tq_job1 > 0 result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] + jobs.append(2) + result = tqDB.getTaskQueueForJob(2) tq_job2 = result["Value"] + tqs.append(tq_job2) assert tq_job1 == tq_job2 tqDefDict = { @@ -262,8 +352,11 @@ def test_chainWithPlatforms(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] + jobs.append(3) + result = tqDB.getTaskQueueForJob(3) tq_job3 = result["Value"] + tqs.append(tq_job3) assert tq_job3 == tq_job1 + 1 tqDefDict = { @@ -274,8 +367,11 @@ def test_chainWithPlatforms(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] + jobs.append(4) + result = tqDB.getTaskQueueForJob(4) tq_job4 = result["Value"] + tqs.append(tq_job4) assert tq_job4 == tq_job3 + 1 tqDefDict = { @@ -286,8 +382,11 @@ def test_chainWithPlatforms(): } result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] + jobs.append(5) + result = tqDB.getTaskQueueForJob(5) tq_job5 = result["Value"] + tqs.append(tq_job5) assert tq_job5 == tq_job4 + 1 # We should be in this situation (TQIds are obviously invented): @@ -358,8 +457,11 @@ def test_chainWithPlatforms(): tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} result = tqDB.insertJob(6, tqDefDict, 10) assert result["OK"] + jobs.append(6) + result = tqDB.getTaskQueueForJob(6) tq_job6 = result["Value"] + tqs.append(tq_job6) assert tq_job6 == tq_job5 + 1 # matching for this one @@ -414,8 +516,11 @@ def test_chainWithPlatforms(): tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000, "Platform": "ANY"} result = tqDB.insertJob(7, tqDefDict, 10) assert result["OK"] + jobs.append(7) + result = tqDB.getTaskQueueForJob(7) tq_job7 = result["Value"] + tqs.append(tq_job7) assert tq_job7 == tq_job6 # would be inserted in the same TQ # matching for this one @@ -456,17 +561,10 @@ def test_chainWithPlatforms(): # but here it returns only 1 (those for ANY), by construction # so, this should be in theory improved - for jobId in range(1, 8): - result = tqDB.deleteJob(jobId) - assert result["OK"] - - for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6, tq_job7]: - result = tqDB.deleteTaskQueueIfEmpty(tqId) - assert result["OK"] - -def test_chainWithTags(): +def test_chainWithTags(tq_cleanup): """put - remove with parameters including one or more Tag(s) and/or RequiredTag(s)""" + jobs, tqs = tq_cleanup # We'll try the following case # @@ -488,8 +586,11 @@ def test_chainWithTags(): } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] + jobs.append(1) + result = tqDB.getTaskQueueForJob(1) tq_job1 = result["Value"] + tqs.append(tq_job1) assert tq_job1 > 0 tqDefDict = { @@ -500,8 +601,11 @@ def test_chainWithTags(): } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] + jobs.append(2) + result = tqDB.getTaskQueueForJob(2) tq_job2 = result["Value"] + tqs.append(tq_job2) assert tq_job2 > tq_job1 tqDefDict = { @@ -512,8 +616,11 @@ def test_chainWithTags(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] + jobs.append(3) + result = tqDB.getTaskQueueForJob(3) tq_job3 = result["Value"] + tqs.append(tq_job3) assert tq_job3 > tq_job2 tqDefDict = { @@ -524,15 +631,21 @@ def test_chainWithTags(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] + jobs.append(4) + result = tqDB.getTaskQueueForJob(4) tq_job4 = result["Value"] + tqs.append(tq_job4) assert tq_job4 > tq_job3 tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 5000} result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] + jobs.append(5) + result = tqDB.getTaskQueueForJob(5) tq_job5 = result["Value"] + tqs.append(tq_job5) assert tq_job5 > tq_job4 tqDefDict = { @@ -543,8 +656,11 @@ def test_chainWithTags(): } result = tqDB.insertJob(6, tqDefDict, 10) assert result["OK"] + jobs.append(6) + result = tqDB.getTaskQueueForJob(6) tq_job6 = result["Value"] + tqs.append(tq_job6) assert tq_job6 > tq_job5 # We should be in this situation (TQIds are obviously invented): @@ -686,17 +802,10 @@ def test_chainWithTags(): result = tqDB.matchAndGetTaskQueue({"CPUTime": 50000, "Tag": "4Processors"}, numQueuesToGet=4) assert result["OK"] - for jobId in range(1, 8): - result = tqDB.deleteJob(jobId) - assert result["OK"] - - for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5, tq_job6]: - result = tqDB.deleteTaskQueueIfEmpty(tqId) - assert result["OK"] - -def test_chainWithTagsAndPlatforms(): +def test_chainWithTagsAndPlatforms(tq_cleanup): """put - remove with parameters including one or more Tag(s) and platforms""" + jobs, tqs = tq_cleanup # platform only tqDefDict = { @@ -707,8 +816,11 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] + jobs.append(1) + result = tqDB.getTaskQueueForJob(1) tq_job1 = result["Value"] + tqs.append(tq_job1) assert tq_job1 > 0 # Tag only @@ -720,8 +832,11 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] + jobs.append(2) + result = tqDB.getTaskQueueForJob(2) tq_job2 = result["Value"] + tqs.append(tq_job2) assert tq_job2 > tq_job1 # Platforms and Tag @@ -734,8 +849,11 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] + jobs.append(3) + result = tqDB.getTaskQueueForJob(3) tq_job3 = result["Value"] + tqs.append(tq_job3) assert tq_job3 > tq_job2 # Tag and another platform @@ -748,8 +866,11 @@ def test_chainWithTagsAndPlatforms(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] + jobs.append(4) + result = tqDB.getTaskQueueForJob(4) tq_job4 = result["Value"] + tqs.append(tq_job4) assert tq_job4 > tq_job3 # We should be in this situation (TQIds are obviously invented): @@ -812,16 +933,8 @@ def test_chainWithTagsAndPlatforms(): res = {int(x[0]) for x in result["Value"]} assert res == {tq_job1, tq_job2, tq_job3} - for jobId in range(1, 8): - result = tqDB.deleteJob(jobId) - assert result["OK"] - - for tqId in [tq_job1, tq_job2, tq_job3, tq_job4]: - result = tqDB.deleteTaskQueueIfEmpty(tqId) - assert result["OK"] - -def test_ComplexMatching(): +def test_ComplexMatching(tq_cleanup): """test of a complex (realistic) matching. Something like: {'NumberOfProcessors': 1, @@ -833,6 +946,7 @@ def test_ComplexMatching(): 'Tag': [], 'CPUTime': 9999999} """ + jobs, tqs = tq_cleanup # Let's first insert few jobs (no tags, for now, and always a platform) @@ -845,8 +959,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(1, tqDefDict, 10) assert result["OK"] + jobs.append(1) + result = tqDB.getTaskQueueForJob(1) tq_job1 = result["Value"] + tqs.append(tq_job1) tqDefDict = { "Owner": "userName", @@ -857,8 +974,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(2, tqDefDict, 10) assert result["OK"] + jobs.append(2) + result = tqDB.getTaskQueueForJob(2) tq_job2 = result["Value"] + tqs.append(tq_job2) tqDefDict = { "Owner": "userName", @@ -869,8 +989,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(3, tqDefDict, 10) assert result["OK"] + jobs.append(3) + result = tqDB.getTaskQueueForJob(3) tq_job3 = result["Value"] + tqs.append(tq_job3) tqDefDict = { "Owner": "userName", @@ -881,8 +1004,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(4, tqDefDict, 10) assert result["OK"] + jobs.append(4) + result = tqDB.getTaskQueueForJob(4) tq_job4 = result["Value"] + tqs.append(tq_job4) # now let's try some matching @@ -1009,8 +1135,11 @@ def test_ComplexMatching(): } result = tqDB.insertJob(5, tqDefDict, 10) assert result["OK"] + jobs.append(5) + result = tqDB.getTaskQueueForJob(5) tq_job5 = result["Value"] + tqs.append(tq_job5) result = tqDB.matchAndGetTaskQueue( { @@ -1080,21 +1209,14 @@ def test_ComplexMatching(): res = {int(x[0]) for x in result["Value"]} assert res == {tq_job4, tq_job5} - for jobId in range(1, 8): - result = tqDB.deleteJob(jobId) - assert result["OK"] - - for tqId in [tq_job1, tq_job2, tq_job3, tq_job4, tq_job5]: - result = tqDB.deleteTaskQueueIfEmpty(tqId) - assert result["OK"] - -def test_chainWithRAM(): +def test_chainWithRAM(tq_cleanup): """put - remove with parameters including RAM requirements Note: MinRAM is the minimum RAM required for matching (resource must have at least this) MaxRAM is informational only, used after matching for scheduling/allocation decisions """ + jobs, tqs = tq_cleanup # Job 1: MinRAM=2048, MaxRAM=8192 (requires at least 2GB for matching, may use up to 8GB) tqDefDict = { @@ -1106,8 +1228,11 @@ def test_chainWithRAM(): } result = tqDB.insertJob(301, tqDefDict, 10) assert result["OK"] + jobs.append(301) + result = tqDB.getTaskQueueForJob(301) tq_job1 = result["Value"] + tqs.append(tq_job1) assert tq_job1 > 0 # Job 2: MinRAM=4096, MaxRAM=0 (requires at least 4GB, MaxRAM unknown/unspecified) @@ -1120,8 +1245,11 @@ def test_chainWithRAM(): } result = tqDB.insertJob(302, tqDefDict, 10) assert result["OK"] + jobs.append(302) + result = tqDB.getTaskQueueForJob(302) tq_job2 = result["Value"] + tqs.append(tq_job2) assert tq_job2 > tq_job1 # Job 3: No RAM requirements (can run on any RAM) @@ -1132,8 +1260,11 @@ def test_chainWithRAM(): } result = tqDB.insertJob(303, tqDefDict, 10) assert result["OK"] + jobs.append(303) + result = tqDB.getTaskQueueForJob(303) tq_job3 = result["Value"] + tqs.append(tq_job3) assert tq_job3 > tq_job2 # Job 4: MinRAM=1024, MaxRAM=2048 (requires at least 1GB, may use up to 2GB) @@ -1146,8 +1277,11 @@ def test_chainWithRAM(): } result = tqDB.insertJob(304, tqDefDict, 10) assert result["OK"] + jobs.append(304) + result = tqDB.getTaskQueueForJob(304) tq_job4 = result["Value"] + tqs.append(tq_job4) assert tq_job4 > tq_job3 # Verify RAM requirements are stored correctly @@ -1209,20 +1343,14 @@ def test_chainWithRAM(): # Should only match: tq_job3 (no requirement) assert res == {tq_job1, tq_job2, tq_job3, tq_job4} - # Clean up - for jobID in [301, 302, 303, 304]: - result = tqDB.deleteJob(jobID) - assert result["OK"] - - for tqID in [tq_job1, tq_job2, tq_job3, tq_job4]: - result = tqDB.deleteTaskQueueIfEmpty(tqID) - assert result["OK"] - -def test_TQ(): +def test_TQ(tq_cleanup): """test of various functions""" + jobs, tqs = tq_cleanup + tqDefDict = {"Owner": "userName", "OwnerGroup": "myGroup", "CPUTime": 50000} tqDB.insertJob(123, tqDefDict, 10) + jobs.append(123) result = tqDB.retrieveTaskQueues() assert result["OK"] @@ -1244,6 +1372,7 @@ def test_TQ(): assert result["Value"]["matchFound"] is True assert result["Value"]["jobId"] in [123, 125] tq = result["Value"]["taskQueueId"] + tqs.append(tq) result = tqDB.deleteTaskQueueIfEmpty(tq) assert result["OK"] From 4eaa121348bbfa2c95add69348fc2bd4abad2097 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 12 Jan 2026 15:03:00 +0100 Subject: [PATCH 6/7] fix: fixed table creations when other tables already exists --- src/DIRAC/Core/Utilities/MySQL.py | 76 +++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index e69af1a5fbc..823bac69b15 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -903,6 +903,21 @@ def _createViews(self, viewsDict, force=False): return createView return S_OK() + def _parseForeignKeyReference(self, auxTable, defaultKey): + """ + Parse foreign key reference in format 'Table' or 'Table.key' + + :param str auxTable: Foreign key reference (e.g., 'MyTable' or 'MyTable.id') + :param str defaultKey: Default key name if not specified in auxTable + :return: tuple (table_name, key_name) + """ + if "." in auxTable: + parts = auxTable.split(".", 1) + if len(parts) != 2: + raise ValueError(f"Invalid foreign key reference format: {auxTable}") + return parts[0], parts[1] + return auxTable, defaultKey + def _createTables(self, tableDict, force=False): """ tableDict: @@ -957,30 +972,37 @@ def _createTables(self, tableDict, force=False): if "Fields" not in thisTable: return S_ERROR(DErrno.EMYSQL, f"Missing `Fields` key in `{table}` table dictionary") - tableCreationList = [[]] - + # Build dependency-ordered list of tables to create + # Tables with foreign keys must be created after their referenced tables + tableCreationList = [] auxiliaryTableList = [] - i = 0 + # Get list of existing tables in the database to handle migrations + existingTablesResult = self._query("SHOW TABLES") + if not existingTablesResult["OK"]: + return existingTablesResult + existingTables = [t[0] for t in existingTablesResult["Value"]] + extracted = True while tableList and extracted: # iterate extracting tables from list if they only depend on # already extracted tables. extracted = False - auxiliaryTableList += tableCreationList[i] - i += 1 - tableCreationList.append([]) + currentLevelTables = [] + for table in list(tableList): toBeExtracted = True thisTable = tableDict[table] if "ForeignKeys" in thisTable: thisKeys = thisTable["ForeignKeys"] for key, auxTable in thisKeys.items(): - forTable = auxTable.split(".")[0] - forKey = key - if forTable != auxTable: - forKey = auxTable.split(".")[1] - if forTable not in auxiliaryTableList: + try: + forTable, forKey = self._parseForeignKeyReference(auxTable, key) + except ValueError as e: + return S_ERROR(DErrno.EMYSQL, str(e)) + + # Check if the referenced table is either being created or already exists + if forTable not in auxiliaryTableList and forTable not in existingTables: toBeExtracted = False break if key not in thisTable["Fields"]: @@ -988,24 +1010,29 @@ def _createTables(self, tableDict, force=False): DErrno.EMYSQL, f"ForeignKey `{key}` -> `{forKey}` not defined in Primary table `{table}`.", ) - if forKey not in tableDict[forTable]["Fields"]: + # Only validate field existence if the referenced table is in tableDict + if forTable in tableDict and forKey not in tableDict[forTable]["Fields"]: return S_ERROR( DErrno.EMYSQL, - "ForeignKey `%s` -> `%s` not defined in Auxiliary table `%s`." - % (key, forKey, forTable), + f"ForeignKey `{key}` -> `{forKey}` not defined in Auxiliary table `{forTable}`.", ) if toBeExtracted: # self.log.debug('Table %s ready to be created' % table) extracted = True tableList.remove(table) - tableCreationList[i].append(table) + currentLevelTables.append(table) + + if currentLevelTables: + tableCreationList.append(currentLevelTables) + auxiliaryTableList.extend(currentLevelTables) if tableList: return S_ERROR(DErrno.EMYSQL, f"Recursive Foreign Keys in {', '.join(tableList)}") - for tableList in tableCreationList: - for table in tableList: + # Create tables level by level + for levelTables in tableCreationList: + for table in levelTables: # Check if Table exist retDict = self.__checkTable(table, force=force) if not retDict["OK"]: @@ -1035,18 +1062,17 @@ def _createTables(self, tableDict, force=False): for index in indexDict: indexedFields = "`, `".join(indexDict[index]) cmdList.append(f"UNIQUE INDEX `{index}` ( `{indexedFields}` )") + if "ForeignKeys" in thisTable: thisKeys = thisTable["ForeignKeys"] for key, auxTable in thisKeys.items(): - forTable = auxTable.split(".")[0] - forKey = key - if forTable != auxTable: - forKey = auxTable.split(".")[1] + try: + forTable, forKey = self._parseForeignKeyReference(auxTable, key) + except ValueError as e: + return S_ERROR(DErrno.EMYSQL, str(e)) - # cmdList.append( '`%s` %s' % ( forTable, tableDict[forTable]['Fields'][forKey] ) cmdList.append( - "FOREIGN KEY ( `%s` ) REFERENCES `%s` ( `%s` )" - " ON DELETE RESTRICT" % (key, forTable, forKey) + f"FOREIGN KEY ( `{key}` ) REFERENCES `{forTable}` ( `{forKey}` ) ON DELETE RESTRICT" ) engine = thisTable.get("Engine", "InnoDB") @@ -1058,7 +1084,7 @@ def _createTables(self, tableDict, force=False): engine, charset, ) - retDict = self._transaction([cmd]) + retDict = self._update(cmd) if not retDict["OK"]: return retDict # self.log.debug('Table %s created' % table) From d3c8663820ca1b5e3f0359c78b4010fd915ab7c6 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 12 Jan 2026 18:29:38 +0100 Subject: [PATCH 7/7] fix: RAM is not anymore a Tag --- .../Client/JobState/JobState.py | 15 ++++++++++++--- .../Executor/JobScheduling.py | 10 ++-------- .../Executor/test/Test_Executor.py | 4 +--- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py index b91bb470852..bc18deab8a4 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py @@ -1,11 +1,16 @@ -""" This object is a wrapper for setting and getting jobs states -""" +"""This object is a wrapper for setting and getting jobs states""" + from DIRAC import S_ERROR, S_OK, gLogger from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import ( + TaskQueueDB, + multiValueDefFields, + singleValueDefFields, + rangeValueDefFields, +) from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_CHANGE_STATUS, RIGHT_GET_INFO, @@ -351,6 +356,10 @@ def insertIntoTQ(self, manifest=None): if name in reqCfg: jobReqDict[name] = reqCfg.getOption(name, []) + for name in rangeValueDefFields: + if name in reqCfg: + jobReqDict[name] = int(reqCfg[name]) + jobPriority = reqCfg.getOption("UserPriority", 1) result = self.__retryFunction(2, JobState.__db.tqDB.insertJob, (self.__jid, jobReqDict, jobPriority)) diff --git a/src/DIRAC/WorkloadManagementSystem/Executor/JobScheduling.py b/src/DIRAC/WorkloadManagementSystem/Executor/JobScheduling.py index ea05dbb4a9a..30c81800a2c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Executor/JobScheduling.py +++ b/src/DIRAC/WorkloadManagementSystem/Executor/JobScheduling.py @@ -249,7 +249,7 @@ def optimizeJob(self, jid, jobState): # Get stageSites[0] because it has already been randomized and it's as good as any in stageSites stageSite = stageSites[0] - self.jobLog.verbose(" Staging site will be", stageSite) + self.jobLog.verbose("Staging site will be", stageSite) stageData = idSites[stageSite] # Set as if everything has already been staged stageData["disk"] += stageData["tape"] @@ -351,12 +351,6 @@ def _getTagsFromManifest(self, jobManifest): tagList.append("WholeNode") tagList.append("MultiProcessor") - # sorting out the RAM (this should be probably coded ~same as number of processors) - if "MaxRAM" in jobManifest: - maxRAM = jobManifest.getOption("MaxRAM", 0) - if maxRAM: - tagList.append(f"{maxRAM}MB") - # other tags? Just add them if "Tags" in jobManifest: tagList.extend(jobManifest.getOption("Tags", [])) @@ -391,7 +385,7 @@ def __sendToTQ(self, jobState, jobManifest, sites, bannedSites, onlineSites=None # Job multivalue requirement keys are specified as singles in the job descriptions # but for backward compatibility can be also plurals - for key in ("JobType", "GridRequiredCEs", "GridCE", "Tags"): + for key in ("JobType", "GridRequiredCEs", "GridCE", "MinRAM", "MaxRAM", "Tags"): reqKey = key if key == "JobType": reqKey = "JobTypes" diff --git a/src/DIRAC/WorkloadManagementSystem/Executor/test/Test_Executor.py b/src/DIRAC/WorkloadManagementSystem/Executor/test/Test_Executor.py index 732cbffc6ff..899dbb1db74 100644 --- a/src/DIRAC/WorkloadManagementSystem/Executor/test/Test_Executor.py +++ b/src/DIRAC/WorkloadManagementSystem/Executor/test/Test_Executor.py @@ -54,9 +54,7 @@ def test__applySiteFilter(sites, banned, expected): ({}, []), ({"Tag": "bof"}, ["bof"]), ({"Tags": "bof, bif"}, ["bof", "bif"]), - ({"MaxRAM": 2500}, ["2500MB"]), - ({"Tags": "bof, bif", "MaxRAM": 2048}, ["bof", "bif", "2048MB"]), - ({"WholeNode": "yes", "MaxRAM": 2048}, ["WholeNode", "MultiProcessor", "2048MB"]), + ({"WholeNode": "yes"}, ["WholeNode", "MultiProcessor"]), ({"NumberOfProcessors": 1}, []), ({"NumberOfProcessors": 4}, ["MultiProcessor", "4Processors"]), ({"NumberOfProcessors": 4, "MinNumberOfProcessors": 2}, ["MultiProcessor", "4Processors"]),