Skip to content

Commit 6d2c3f1

Browse files
authored
Merge pull request #8414 from fstagni/90_more_RAM_matching
[9.0] more on RAM matching
2 parents cbb4b91 + d3c8663 commit 6d2c3f1

File tree

9 files changed

+520
-110
lines changed

9 files changed

+520
-110
lines changed

docs/source/DeveloperGuide/DevelopmentEnvironment/DeveloperInstallation/stuffThatRun.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Now you can run the test with:
4747

4848
.. code-block:: bash
4949
50-
pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py
50+
pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py
5151
5252
You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/`
5353

src/DIRAC/Core/Utilities/MySQL.py

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,21 @@ def _createViews(self, viewsDict, force=False):
903903
return createView
904904
return S_OK()
905905

906+
def _parseForeignKeyReference(self, auxTable, defaultKey):
907+
"""
908+
Parse foreign key reference in format 'Table' or 'Table.key'
909+
910+
:param str auxTable: Foreign key reference (e.g., 'MyTable' or 'MyTable.id')
911+
:param str defaultKey: Default key name if not specified in auxTable
912+
:return: tuple (table_name, key_name)
913+
"""
914+
if "." in auxTable:
915+
parts = auxTable.split(".", 1)
916+
if len(parts) != 2:
917+
raise ValueError(f"Invalid foreign key reference format: {auxTable}")
918+
return parts[0], parts[1]
919+
return auxTable, defaultKey
920+
906921
def _createTables(self, tableDict, force=False):
907922
"""
908923
tableDict:
@@ -957,55 +972,67 @@ def _createTables(self, tableDict, force=False):
957972
if "Fields" not in thisTable:
958973
return S_ERROR(DErrno.EMYSQL, f"Missing `Fields` key in `{table}` table dictionary")
959974

960-
tableCreationList = [[]]
961-
975+
# Build dependency-ordered list of tables to create
976+
# Tables with foreign keys must be created after their referenced tables
977+
tableCreationList = []
962978
auxiliaryTableList = []
963979

964-
i = 0
980+
# Get list of existing tables in the database to handle migrations
981+
existingTablesResult = self._query("SHOW TABLES")
982+
if not existingTablesResult["OK"]:
983+
return existingTablesResult
984+
existingTables = [t[0] for t in existingTablesResult["Value"]]
985+
965986
extracted = True
966987
while tableList and extracted:
967988
# iterate extracting tables from list if they only depend on
968989
# already extracted tables.
969990
extracted = False
970-
auxiliaryTableList += tableCreationList[i]
971-
i += 1
972-
tableCreationList.append([])
991+
currentLevelTables = []
992+
973993
for table in list(tableList):
974994
toBeExtracted = True
975995
thisTable = tableDict[table]
976996
if "ForeignKeys" in thisTable:
977997
thisKeys = thisTable["ForeignKeys"]
978998
for key, auxTable in thisKeys.items():
979-
forTable = auxTable.split(".")[0]
980-
forKey = key
981-
if forTable != auxTable:
982-
forKey = auxTable.split(".")[1]
983-
if forTable not in auxiliaryTableList:
999+
try:
1000+
forTable, forKey = self._parseForeignKeyReference(auxTable, key)
1001+
except ValueError as e:
1002+
return S_ERROR(DErrno.EMYSQL, str(e))
1003+
1004+
# Check if the referenced table is either being created or already exists
1005+
if forTable not in auxiliaryTableList and forTable not in existingTables:
9841006
toBeExtracted = False
9851007
break
9861008
if key not in thisTable["Fields"]:
9871009
return S_ERROR(
9881010
DErrno.EMYSQL,
9891011
f"ForeignKey `{key}` -> `{forKey}` not defined in Primary table `{table}`.",
9901012
)
991-
if forKey not in tableDict[forTable]["Fields"]:
1013+
# Only validate field existence if the referenced table is in tableDict
1014+
if forTable in tableDict and forKey not in tableDict[forTable]["Fields"]:
9921015
return S_ERROR(
9931016
DErrno.EMYSQL,
994-
"ForeignKey `%s` -> `%s` not defined in Auxiliary table `%s`."
995-
% (key, forKey, forTable),
1017+
f"ForeignKey `{key}` -> `{forKey}` not defined in Auxiliary table `{forTable}`.",
9961018
)
9971019

9981020
if toBeExtracted:
9991021
# self.log.debug('Table %s ready to be created' % table)
10001022
extracted = True
10011023
tableList.remove(table)
1002-
tableCreationList[i].append(table)
1024+
currentLevelTables.append(table)
1025+
1026+
if currentLevelTables:
1027+
tableCreationList.append(currentLevelTables)
1028+
auxiliaryTableList.extend(currentLevelTables)
10031029

10041030
if tableList:
10051031
return S_ERROR(DErrno.EMYSQL, f"Recursive Foreign Keys in {', '.join(tableList)}")
10061032

1007-
for tableList in tableCreationList:
1008-
for table in tableList:
1033+
# Create tables level by level
1034+
for levelTables in tableCreationList:
1035+
for table in levelTables:
10091036
# Check if Table exist
10101037
retDict = self.__checkTable(table, force=force)
10111038
if not retDict["OK"]:
@@ -1035,18 +1062,17 @@ def _createTables(self, tableDict, force=False):
10351062
for index in indexDict:
10361063
indexedFields = "`, `".join(indexDict[index])
10371064
cmdList.append(f"UNIQUE INDEX `{index}` ( `{indexedFields}` )")
1065+
10381066
if "ForeignKeys" in thisTable:
10391067
thisKeys = thisTable["ForeignKeys"]
10401068
for key, auxTable in thisKeys.items():
1041-
forTable = auxTable.split(".")[0]
1042-
forKey = key
1043-
if forTable != auxTable:
1044-
forKey = auxTable.split(".")[1]
1069+
try:
1070+
forTable, forKey = self._parseForeignKeyReference(auxTable, key)
1071+
except ValueError as e:
1072+
return S_ERROR(DErrno.EMYSQL, str(e))
10451073

1046-
# cmdList.append( '`%s` %s' % ( forTable, tableDict[forTable]['Fields'][forKey] )
10471074
cmdList.append(
1048-
"FOREIGN KEY ( `%s` ) REFERENCES `%s` ( `%s` )"
1049-
" ON DELETE RESTRICT" % (key, forTable, forKey)
1075+
f"FOREIGN KEY ( `{key}` ) REFERENCES `{forTable}` ( `{forKey}` ) ON DELETE RESTRICT"
10501076
)
10511077

10521078
engine = thisTable.get("Engine", "InnoDB")
@@ -1058,7 +1084,7 @@ def _createTables(self, tableDict, force=False):
10581084
engine,
10591085
charset,
10601086
)
1061-
retDict = self._transaction([cmd])
1087+
retDict = self._update(cmd)
10621088
if not retDict["OK"]:
10631089
return retDict
10641090
# self.log.debug('Table %s created' % table)

src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
""" This object is a wrapper for setting and getting jobs states
2-
"""
1+
"""This object is a wrapper for setting and getting jobs states"""
2+
33
from DIRAC import S_ERROR, S_OK, gLogger
44
from DIRAC.WorkloadManagementSystem.Client import JobStatus
55
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
66
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
77
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
8-
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields
8+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import (
9+
TaskQueueDB,
10+
multiValueDefFields,
11+
singleValueDefFields,
12+
rangeValueDefFields,
13+
)
914
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
1015
RIGHT_CHANGE_STATUS,
1116
RIGHT_GET_INFO,
@@ -351,6 +356,10 @@ def insertIntoTQ(self, manifest=None):
351356
if name in reqCfg:
352357
jobReqDict[name] = reqCfg.getOption(name, [])
353358

359+
for name in rangeValueDefFields:
360+
if name in reqCfg:
361+
jobReqDict[name] = int(reqCfg[name])
362+
354363
jobPriority = reqCfg.getOption("UserPriority", 1)
355364

356365
result = self.__retryFunction(2, JobState.__db.tqDB.insertJob, (self.__jid, jobReqDict, jobPriority))

src/DIRAC/WorkloadManagementSystem/Client/Matcher.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
1818
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
1919
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
20-
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueMatchFields, singleValueDefFields
20+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import (
21+
TaskQueueDB,
22+
multiValueMatchFields,
23+
singleValueDefFields,
24+
)
2125

2226

2327
class PilotVersionError(Exception):
@@ -69,8 +73,8 @@ def selectJob(self, resourceDescription, credDict):
6973

7074
# Make a nice print of the resource matching parameters
7175
toPrintDict = dict(resourceDict)
72-
if "MaxRAM" in resourceDescription:
73-
toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"]
76+
if "MaxRAM" in resourceDict:
77+
toPrintDict["MaxRAM"] = resourceDict["MaxRAM"]
7478
if "NumberOfProcessors" in resourceDescription:
7579
toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"]
7680
toPrintDict["Tag"] = []
@@ -167,11 +171,7 @@ def _processResourceDescription(self, resourceDescription):
167171
"""
168172

169173
resourceDict = {}
170-
for name in singleValueDefFields:
171-
if name in resourceDescription:
172-
resourceDict[name] = resourceDescription[name]
173-
174-
for name in multiValueMatchFields:
174+
for name in singleValueDefFields + multiValueMatchFields + ("MaxRAM",):
175175
if name in resourceDescription:
176176
resourceDict[name] = resourceDescription[name]
177177

@@ -192,25 +192,18 @@ def _processResourceDescription(self, resourceDescription):
192192
if "JobID" in resourceDescription:
193193
resourceDict["JobID"] = resourceDescription["JobID"]
194194

195-
# Convert MaxRAM and NumberOfProcessors parameters into a list of tags
196-
maxRAM = resourceDescription.get("MaxRAM")
197-
if maxRAM:
198-
try:
199-
maxRAM = int(maxRAM)
200-
except ValueError:
201-
maxRAM = None
195+
# Convert NumberOfProcessors parameters into a list of tags
202196
nProcessors = resourceDescription.get("NumberOfProcessors")
203197
if nProcessors:
204198
try:
205199
nProcessors = int(nProcessors)
206200
except ValueError:
207201
nProcessors = None
208-
for param, key, limit, increment in [(maxRAM, "MB", 1024 * 1024, 256), (nProcessors, "Processors", 1024, 1)]:
209-
if param and param <= limit:
210-
paramList = list(range(increment, param + increment, increment))
211-
paramTags = ["%d%s" % (par, key) for par in paramList]
212-
if paramTags:
213-
resourceDict.setdefault("Tag", []).extend(paramTags)
202+
if nProcessors and nProcessors <= 1024:
203+
paramList = list(range(1, nProcessors + 1, 1))
204+
paramTags = ["%d%s" % (par, "Processors") for par in paramList]
205+
if paramTags:
206+
resourceDict.setdefault("Tag", []).extend(paramTags)
214207

215208
# Add 'MultiProcessor' to the list of tags
216209
if nProcessors and nProcessors > 1:

0 commit comments

Comments
 (0)