Skip to content

Commit d179cbd

Browse files
committed
feat: added a new table to TaskQueueDB for RAM requirements and matching
1 parent 253a377 commit d179cbd

File tree

3 files changed

+281
-28
lines changed

3 files changed

+281
-28
lines changed

src/DIRAC/WorkloadManagementSystem/Client/Matcher.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
1818
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
1919
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
20-
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueMatchFields, singleValueDefFields
20+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import (
21+
TaskQueueDB,
22+
multiValueMatchFields,
23+
singleValueDefFields,
24+
)
2125

2226

2327
class PilotVersionError(Exception):
@@ -69,8 +73,8 @@ def selectJob(self, resourceDescription, credDict):
6973

7074
# Make a nice print of the resource matching parameters
7175
toPrintDict = dict(resourceDict)
72-
if "MaxRAM" in resourceDescription:
73-
toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"]
76+
if "RAM" in resourceDict:
77+
toPrintDict["RAM"] = resourceDict["RAM"]
7478
if "NumberOfProcessors" in resourceDescription:
7579
toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"]
7680
toPrintDict["Tag"] = []
@@ -167,11 +171,7 @@ def _processResourceDescription(self, resourceDescription):
167171
"""
168172

169173
resourceDict = {}
170-
for name in singleValueDefFields:
171-
if name in resourceDescription:
172-
resourceDict[name] = resourceDescription[name]
173-
174-
for name in multiValueMatchFields:
174+
for name in singleValueDefFields + multiValueMatchFields + ["RAM"]:
175175
if name in resourceDescription:
176176
resourceDict[name] = resourceDescription[name]
177177

@@ -192,25 +192,18 @@ def _processResourceDescription(self, resourceDescription):
192192
if "JobID" in resourceDescription:
193193
resourceDict["JobID"] = resourceDescription["JobID"]
194194

195-
# Convert MaxRAM and NumberOfProcessors parameters into a list of tags
196-
maxRAM = resourceDescription.get("MaxRAM")
197-
if maxRAM:
198-
try:
199-
maxRAM = int(maxRAM)
200-
except ValueError:
201-
maxRAM = None
195+
# Convert NumberOfProcessors parameters into a list of tags
202196
nProcessors = resourceDescription.get("NumberOfProcessors")
203197
if nProcessors:
204198
try:
205199
nProcessors = int(nProcessors)
206200
except ValueError:
207201
nProcessors = None
208-
for param, key, limit, increment in [(maxRAM, "MB", 1024 * 1024, 256), (nProcessors, "Processors", 1024, 1)]:
209-
if param and param <= limit:
210-
paramList = list(range(increment, param + increment, increment))
211-
paramTags = ["%d%s" % (par, key) for par in paramList]
212-
if paramTags:
213-
resourceDict.setdefault("Tag", []).extend(paramTags)
202+
if nProcessors and nProcessors <= 1024:
203+
paramList = list(range(1, nProcessors + 1, 1))
204+
paramTags = ["%d%s" % (par, "Processors") for par in paramList]
205+
if paramTags:
206+
resourceDict.setdefault("Tag", []).extend(paramTags)
214207

215208
# Add 'MultiProcessor' to the list of tags
216209
if nProcessors and nProcessors > 1:

src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py

Lines changed: 137 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
""" TaskQueueDB class is a front-end to the task queues db
2-
"""
1+
"""TaskQueueDB class is a front-end to the task queues db"""
2+
33
import random
44
import string
55
from collections import defaultdict
@@ -22,12 +22,13 @@
2222
# For checks at insertion time, and not only
2323
singleValueDefFields = ("Owner", "OwnerGroup", "CPUTime")
2424
multiValueDefFields = ("Sites", "GridCEs", "BannedSites", "Platforms", "JobTypes", "Tags")
25+
rangeValueDefFields = ("MinRAM", "MaxRAM")
2526

2627
# Used for matching
2728
multiValueMatchFields = ("GridCE", "Site", "Platform", "JobType", "Tag")
2829
bannedJobMatchFields = ("Site",)
2930
mandatoryMatchFields = ("CPUTime",)
30-
priorityIgnoredFields = ("Sites", "BannedSites")
31+
priorityIgnoredFields = ("Sites", "BannedSites", "MinRAM", "MaxRAM")
3132

3233

3334
def _lowerAndRemovePunctuation(s):
@@ -129,6 +130,16 @@ def __initializeDB(self):
129130
"ForeignKeys": {"TQId": "tq_TaskQueues.TQId"},
130131
}
131132

133+
self.__tablesDesc["tq_RAM_requirements"] = {
134+
"Fields": {
135+
"TQId": "INTEGER(11) UNSIGNED NOT NULL",
136+
"MinRAM": "INTEGER UNSIGNED NOT NULL DEFAULT 0",
137+
"MaxRAM": "INTEGER UNSIGNED NOT NULL DEFAULT 0",
138+
},
139+
"PrimaryKey": "TQId",
140+
"ForeignKeys": {"TQId": "tq_TaskQueues.TQId"},
141+
}
142+
132143
for multiField in multiValueDefFields:
133144
tableName = f"tq_TQTo{multiField}"
134145
self.__tablesDesc[tableName] = {
@@ -206,6 +217,20 @@ def _checkTaskQueueDefinition(self, tqDefDict):
206217
return result
207218
tqDefDict[field] = result["Value"]
208219

220+
# Check range value fields (RAM requirements)
221+
for field in rangeValueDefFields:
222+
if field not in tqDefDict:
223+
continue
224+
if not isinstance(tqDefDict[field], int):
225+
return S_ERROR(f"Range value field {field} value type is not valid: {type(tqDefDict[field])}")
226+
if tqDefDict[field] < 0:
227+
return S_ERROR(f"Range value field {field} must be non-negative: {tqDefDict[field]}")
228+
229+
# Validate that MinRAM <= MaxRAM if both are specified
230+
if "MinRAM" in tqDefDict and "MaxRAM" in tqDefDict:
231+
if tqDefDict["MaxRAM"] > 0 and tqDefDict["MinRAM"] > tqDefDict["MaxRAM"]:
232+
return S_ERROR(f"MinRAM ({tqDefDict['MinRAM']}) cannot be greater than MaxRAM ({tqDefDict['MaxRAM']})")
233+
209234
return S_OK(tqDefDict)
210235

211236
def _checkMatchDefinition(self, tqMatchDict):
@@ -251,6 +276,13 @@ def travelAndCheckType(value, validTypes, escapeValues=True):
251276
return S_ERROR(f"Match definition field {field} failed : {result['Message']}")
252277
tqMatchDict[field] = result["Value"]
253278

279+
# Check range value fields (RAM requirements for matching)
280+
if "RAM" in tqMatchDict:
281+
result = travelAndCheckType(tqMatchDict["RAM"], int, escapeValues=False)
282+
if not result["OK"]:
283+
return S_ERROR(f"Match definition field RAM failed : {result['Message']}")
284+
tqMatchDict["RAM"] = result["Value"]
285+
254286
return S_OK(tqMatchDict)
255287

256288
def __createTaskQueue(self, tqDefDict, priority=1, connObj=False):
@@ -303,6 +335,20 @@ def __createTaskQueue(self, tqDefDict, priority=1, connObj=False):
303335
self.log.error("Failed to insert condition", f"{field} : {result['Message']}")
304336
self.cleanOrphanedTaskQueues(connObj=connObj)
305337
return S_ERROR(f"Can't insert values {values} for field {field}: {result['Message']}")
338+
339+
# Insert RAM requirements if specified and not both zero
340+
if "MinRAM" in tqDefDict or "MaxRAM" in tqDefDict:
341+
minRAM = tqDefDict.get("MinRAM", 0)
342+
maxRAM = tqDefDict.get("MaxRAM", 0)
343+
# Only insert if at least one value is non-zero (optimization: avoid unnecessary rows)
344+
if minRAM > 0 or maxRAM > 0:
345+
cmd = f"INSERT INTO `tq_RAM_requirements` (TQId, MinRAM, MaxRAM) VALUES ({tqId}, {minRAM}, {maxRAM})"
346+
result = self._update(cmd, conn=connObj)
347+
if not result["OK"]:
348+
self.log.error("Failed to insert RAM requirements", result["Message"])
349+
self.cleanOrphanedTaskQueues(connObj=connObj)
350+
return S_ERROR(f"Can't insert RAM requirements: {result['Message']}")
351+
306352
self.log.info("Created TQ", tqId)
307353
return S_OK(tqId)
308354

@@ -327,6 +373,13 @@ def cleanOrphanedTaskQueues(self, connObj=False):
327373
if not result["OK"]:
328374
return result
329375

376+
# Delete RAM requirements for orphaned TQs
377+
result = self._update(
378+
f"DELETE FROM `tq_RAM_requirements` WHERE TQId in ( {','.join(orphanedTQs)} )", conn=connObj
379+
)
380+
if not result["OK"]:
381+
return result
382+
330383
result = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId in ( {','.join(orphanedTQs)} )", conn=connObj)
331384
if not result["OK"]:
332385
return result
@@ -473,6 +526,26 @@ def __generateTQFindSQL(
473526
sqlCondList.append(f"{numValues} = ({secondQuery} {grouping})")
474527
else:
475528
sqlCondList.append(f"`tq_TaskQueues`.TQId not in ( SELECT DISTINCT {tableName}.TQId from {tableName} )")
529+
530+
# Handle RAM requirements matching
531+
hasRAMRequirements = "MinRAM" in tqDefDict or "MaxRAM" in tqDefDict
532+
if hasRAMRequirements:
533+
minRAM = tqDefDict.get("MinRAM", 0)
534+
maxRAM = tqDefDict.get("MaxRAM", 0)
535+
# Only match TQs with the same RAM requirements if at least one is non-zero
536+
if minRAM > 0 or maxRAM > 0:
537+
# Match TQs that have the exact same RAM requirements
538+
sqlCondList.append(
539+
f"`tq_TaskQueues`.TQId IN ( SELECT TQId FROM `tq_RAM_requirements` "
540+
f"WHERE MinRAM = {minRAM} AND MaxRAM = {maxRAM} )"
541+
)
542+
else:
543+
# Both are 0, so match TQs with no RAM requirements row
544+
sqlCondList.append("`tq_TaskQueues`.TQId NOT IN ( SELECT DISTINCT TQId FROM `tq_RAM_requirements` )")
545+
else:
546+
# Match TQs that have no RAM requirements
547+
sqlCondList.append("`tq_TaskQueues`.TQId NOT IN ( SELECT DISTINCT TQId FROM `tq_RAM_requirements` )")
548+
476549
# END MAGIC: That was easy ;)
477550
return S_OK(" AND ".join(sqlCondList))
478551

@@ -722,6 +795,19 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None)
722795
if "CPUTime" in tqMatchDict:
723796
sqlCondList.append(self.__generateSQLSubCond("tq.%s <= %%s" % "CPUTime", tqMatchDict["CPUTime"]))
724797

798+
# RAM matching logic
799+
if "RAM" in tqMatchDict:
800+
ram = tqMatchDict["RAM"]
801+
# Join with tq_RAM_requirements table
802+
sqlTables["tq_RAM_requirements"] = "ram_req"
803+
# Match if:
804+
# 1. No RAM requirement exists for this TQ (LEFT JOIN will give NULL)
805+
# 2. OR the resource has at least MinRAM
806+
# Note: MinRAM is used for matching, MaxRAM is informational for post-match scheduling
807+
# A job requiring MinRAM=2GB can run on any machine with 2GB or more
808+
ramCond = f"( ram_req.TQId IS NULL OR {ram} >= ram_req.MinRAM )"
809+
sqlCondList.append(ramCond)
810+
725811
tag_fv = []
726812

727813
# Match multi value fields
@@ -844,10 +930,14 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None)
844930
if negativeCond:
845931
sqlCondList.append(self.__generateNotSQL(negativeCond))
846932

847-
# Generate the final query string
848-
tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % (
849-
" AND ".join(sqlCondList)
850-
)
933+
# Generate the final query string with proper JOINs
934+
fromClause = "`tq_TaskQueues` tq"
935+
936+
# Add LEFT JOIN for RAM requirements if needed
937+
if "tq_RAM_requirements" in sqlTables:
938+
fromClause += " LEFT JOIN `tq_RAM_requirements` ram_req ON tq.TQId = ram_req.TQId"
939+
940+
tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM %s WHERE %s" % (fromClause, " AND ".join(sqlCondList))
851941

852942
# Apply priorities
853943
tqSqlCmd = f"{tqSqlCmd} ORDER BY RAND() / tq.Priority ASC"
@@ -994,6 +1084,12 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwner=False, tqOwnerGroup=False, connOb
9941084
retVal = self._update(f"DELETE FROM `tq_TQTo{mvField}` WHERE TQId = {tqId}", conn=connObj)
9951085
if not retVal["OK"]:
9961086
return retVal
1087+
1088+
# Delete RAM requirements if they exist
1089+
retVal = self._update(f"DELETE FROM `tq_RAM_requirements` WHERE TQId = {tqId}", conn=connObj)
1090+
if not retVal["OK"]:
1091+
return retVal
1092+
9971093
retVal = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId = {tqId}", conn=connObj)
9981094
if not retVal["OK"]:
9991095
return retVal
@@ -1065,6 +1161,40 @@ def retrieveTaskQueues(self, tqIdList=None):
10651161
if field not in tqData[tqId]:
10661162
tqData[tqId][field] = []
10671163
tqData[tqId][field].append(value)
1164+
1165+
# Retrieve RAM requirements (if table exists)
1166+
# Note: The table should be auto-created by __initializeDB, but we check for safety
1167+
sqlCmd = "SELECT TQId, MinRAM, MaxRAM FROM `tq_RAM_requirements`"
1168+
if tqIdList is not None:
1169+
if tqIdList:
1170+
# Only retrieve RAM requirements for specific TQIds
1171+
sqlCmd += f" WHERE TQId IN ( {', '.join([str(id_) for id_ in tqIdList])} )"
1172+
# else: empty list was already handled earlier with fast-track return
1173+
retVal = self._query(sqlCmd)
1174+
if not retVal["OK"]:
1175+
# If table doesn't exist (e.g., old installation), log a warning but continue
1176+
# This provides backward compatibility
1177+
if "doesn't exist" in retVal["Message"] or "Table" in retVal["Message"]:
1178+
self.log.warn("RAM requirements table not found, skipping RAM data retrieval", retVal["Message"])
1179+
else:
1180+
self.log.error("Can't retrieve RAM requirements", retVal["Message"])
1181+
return retVal
1182+
else:
1183+
for record in retVal["Value"]:
1184+
tqId = record[0]
1185+
minRAM = record[1]
1186+
maxRAM = record[2]
1187+
if tqId not in tqData:
1188+
if tqIdList is None or tqId in tqIdList:
1189+
self.log.verbose(
1190+
"Task Queue has RAM requirements but does not exist: triggering a cleaning",
1191+
f"TQID: {tqId}",
1192+
)
1193+
tqNeedCleaning = True
1194+
else:
1195+
tqData[tqId]["MinRAM"] = minRAM
1196+
tqData[tqId]["MaxRAM"] = maxRAM
1197+
10681198
if tqNeedCleaning:
10691199
self.cleanOrphanedTaskQueues()
10701200
return S_OK(tqData)

0 commit comments

Comments
 (0)