Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Now you can run the test with:

.. code-block:: bash

pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py
pytest --no-check-dirac-environment LocalRepo/ALTERNATIVE_MODULES/DIRAC/tests/Integration/WorkloadManagementSystem/Test_JobDB.py

You can find the logs of the services in `/home/dirac/ServerInstallDIR/diracos/runit/`

Expand Down
76 changes: 51 additions & 25 deletions src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,21 @@ def _createViews(self, viewsDict, force=False):
return createView
return S_OK()

def _parseForeignKeyReference(self, auxTable, defaultKey):
"""
Parse foreign key reference in format 'Table' or 'Table.key'

:param str auxTable: Foreign key reference (e.g., 'MyTable' or 'MyTable.id')
:param str defaultKey: Default key name if not specified in auxTable
:return: tuple (table_name, key_name)
"""
if "." in auxTable:
parts = auxTable.split(".", 1)
if len(parts) != 2:
raise ValueError(f"Invalid foreign key reference format: {auxTable}")
return parts[0], parts[1]
return auxTable, defaultKey

def _createTables(self, tableDict, force=False):
"""
tableDict:
Expand Down Expand Up @@ -957,55 +972,67 @@ def _createTables(self, tableDict, force=False):
if "Fields" not in thisTable:
return S_ERROR(DErrno.EMYSQL, f"Missing `Fields` key in `{table}` table dictionary")

tableCreationList = [[]]

# Build dependency-ordered list of tables to create
# Tables with foreign keys must be created after their referenced tables
tableCreationList = []
auxiliaryTableList = []

i = 0
# Get list of existing tables in the database to handle migrations
existingTablesResult = self._query("SHOW TABLES")
if not existingTablesResult["OK"]:
return existingTablesResult
existingTables = [t[0] for t in existingTablesResult["Value"]]

extracted = True
while tableList and extracted:
# iterate extracting tables from list if they only depend on
# already extracted tables.
extracted = False
auxiliaryTableList += tableCreationList[i]
i += 1
tableCreationList.append([])
currentLevelTables = []

for table in list(tableList):
toBeExtracted = True
thisTable = tableDict[table]
if "ForeignKeys" in thisTable:
thisKeys = thisTable["ForeignKeys"]
for key, auxTable in thisKeys.items():
forTable = auxTable.split(".")[0]
forKey = key
if forTable != auxTable:
forKey = auxTable.split(".")[1]
if forTable not in auxiliaryTableList:
try:
forTable, forKey = self._parseForeignKeyReference(auxTable, key)
except ValueError as e:
return S_ERROR(DErrno.EMYSQL, str(e))

# Check if the referenced table is either being created or already exists
if forTable not in auxiliaryTableList and forTable not in existingTables:
toBeExtracted = False
break
if key not in thisTable["Fields"]:
return S_ERROR(
DErrno.EMYSQL,
f"ForeignKey `{key}` -> `{forKey}` not defined in Primary table `{table}`.",
)
if forKey not in tableDict[forTable]["Fields"]:
# Only validate field existence if the referenced table is in tableDict
if forTable in tableDict and forKey not in tableDict[forTable]["Fields"]:
return S_ERROR(
DErrno.EMYSQL,
"ForeignKey `%s` -> `%s` not defined in Auxiliary table `%s`."
% (key, forKey, forTable),
f"ForeignKey `{key}` -> `{forKey}` not defined in Auxiliary table `{forTable}`.",
)

if toBeExtracted:
# self.log.debug('Table %s ready to be created' % table)
extracted = True
tableList.remove(table)
tableCreationList[i].append(table)
currentLevelTables.append(table)

if currentLevelTables:
tableCreationList.append(currentLevelTables)
auxiliaryTableList.extend(currentLevelTables)

if tableList:
return S_ERROR(DErrno.EMYSQL, f"Recursive Foreign Keys in {', '.join(tableList)}")

for tableList in tableCreationList:
for table in tableList:
# Create tables level by level
for levelTables in tableCreationList:
for table in levelTables:
# Check if Table exist
retDict = self.__checkTable(table, force=force)
if not retDict["OK"]:
Expand Down Expand Up @@ -1035,18 +1062,17 @@ def _createTables(self, tableDict, force=False):
for index in indexDict:
indexedFields = "`, `".join(indexDict[index])
cmdList.append(f"UNIQUE INDEX `{index}` ( `{indexedFields}` )")

if "ForeignKeys" in thisTable:
thisKeys = thisTable["ForeignKeys"]
for key, auxTable in thisKeys.items():
forTable = auxTable.split(".")[0]
forKey = key
if forTable != auxTable:
forKey = auxTable.split(".")[1]
try:
forTable, forKey = self._parseForeignKeyReference(auxTable, key)
except ValueError as e:
return S_ERROR(DErrno.EMYSQL, str(e))

# cmdList.append( '`%s` %s' % ( forTable, tableDict[forTable]['Fields'][forKey] )
cmdList.append(
"FOREIGN KEY ( `%s` ) REFERENCES `%s` ( `%s` )"
" ON DELETE RESTRICT" % (key, forTable, forKey)
f"FOREIGN KEY ( `{key}` ) REFERENCES `{forTable}` ( `{forKey}` ) ON DELETE RESTRICT"
)

engine = thisTable.get("Engine", "InnoDB")
Expand All @@ -1058,7 +1084,7 @@ def _createTables(self, tableDict, force=False):
engine,
charset,
)
retDict = self._transaction([cmd])
retDict = self._update(cmd)
if not retDict["OK"]:
return retDict
# self.log.debug('Table %s created' % table)
Expand Down
15 changes: 12 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
""" This object is a wrapper for setting and getting jobs states
"""
"""This object is a wrapper for setting and getting jobs states"""

from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueDefFields, singleValueDefFields
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import (
TaskQueueDB,
multiValueDefFields,
singleValueDefFields,
rangeValueDefFields,
)
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_CHANGE_STATUS,
RIGHT_GET_INFO,
Expand Down Expand Up @@ -351,6 +356,10 @@ def insertIntoTQ(self, manifest=None):
if name in reqCfg:
jobReqDict[name] = reqCfg.getOption(name, [])

for name in rangeValueDefFields:
if name in reqCfg:
jobReqDict[name] = int(reqCfg[name])

jobPriority = reqCfg.getOption("UserPriority", 1)

result = self.__retryFunction(2, JobState.__db.tqDB.insertJob, (self.__jid, jobReqDict, jobPriority))
Expand Down
35 changes: 14 additions & 21 deletions src/DIRAC/WorkloadManagementSystem/Client/Matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueMatchFields, singleValueDefFields
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import (
TaskQueueDB,
multiValueMatchFields,
singleValueDefFields,
)


class PilotVersionError(Exception):
Expand Down Expand Up @@ -69,8 +73,8 @@ def selectJob(self, resourceDescription, credDict):

# Make a nice print of the resource matching parameters
toPrintDict = dict(resourceDict)
if "MaxRAM" in resourceDescription:
toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"]
if "MaxRAM" in resourceDict:
toPrintDict["MaxRAM"] = resourceDict["MaxRAM"]
if "NumberOfProcessors" in resourceDescription:
toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"]
toPrintDict["Tag"] = []
Expand Down Expand Up @@ -167,11 +171,7 @@ def _processResourceDescription(self, resourceDescription):
"""

resourceDict = {}
for name in singleValueDefFields:
if name in resourceDescription:
resourceDict[name] = resourceDescription[name]

for name in multiValueMatchFields:
for name in singleValueDefFields + multiValueMatchFields + ("MaxRAM",):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not having a rangeValueMatchFields in TaskQueueDB that would contain MaxRAM?
So that we avoid having hardcoded value like MaxRAM spread throughout the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rangeValueMatchFields also includes MinRAM, which here is not needed.

if name in resourceDescription:
resourceDict[name] = resourceDescription[name]

Expand All @@ -192,25 +192,18 @@ def _processResourceDescription(self, resourceDescription):
if "JobID" in resourceDescription:
resourceDict["JobID"] = resourceDescription["JobID"]

# Convert MaxRAM and NumberOfProcessors parameters into a list of tags
maxRAM = resourceDescription.get("MaxRAM")
if maxRAM:
try:
maxRAM = int(maxRAM)
except ValueError:
maxRAM = None
# Convert NumberOfProcessors parameters into a list of tags
nProcessors = resourceDescription.get("NumberOfProcessors")
if nProcessors:
try:
nProcessors = int(nProcessors)
except ValueError:
nProcessors = None
for param, key, limit, increment in [(maxRAM, "MB", 1024 * 1024, 256), (nProcessors, "Processors", 1024, 1)]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, don't you need that mechanism anymore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I think I get it, it's because you now define range values, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mechanism is still there, just it's only needed for Processors (since RAM requirements are not stored anymore as tags).

if param and param <= limit:
paramList = list(range(increment, param + increment, increment))
paramTags = ["%d%s" % (par, key) for par in paramList]
if paramTags:
resourceDict.setdefault("Tag", []).extend(paramTags)
if nProcessors and nProcessors <= 1024:
paramList = list(range(1, nProcessors + 1, 1))
paramTags = ["%d%s" % (par, "Processors") for par in paramList]
if paramTags:
resourceDict.setdefault("Tag", []).extend(paramTags)

# Add 'MultiProcessor' to the list of tags
if nProcessors and nProcessors > 1:
Expand Down
Loading
Loading