Skip to content

Commit a066a22

Browse files
committed
feat: moving from OwnerDN to Owner for TaskQueueDB
1 parent 4340797 commit a066a22

File tree

3 files changed

+76
-80
lines changed

3 files changed

+76
-80
lines changed

src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
TQ_MIN_SHARE = 0.001
1818

1919
# For checks at insertion time, and not only
20-
singleValueDefFields = ("OwnerDN", "OwnerGroup", "CPUTime")
20+
singleValueDefFields = ("Owner", "OwnerGroup", "CPUTime")
2121
multiValueDefFields = ("Sites", "GridCEs", "BannedSites", "Platforms", "JobTypes", "Tags")
2222

2323
# Used for matching
@@ -94,14 +94,15 @@ def __initializeDB(self):
9494
self.__tablesDesc["tq_TaskQueues"] = {
9595
"Fields": {
9696
"TQId": "INTEGER(11) UNSIGNED AUTO_INCREMENT NOT NULL",
97-
"OwnerDN": "VARCHAR(255) NOT NULL",
97+
"Owner": "VARCHAR(255) NOT NULL",
98+
"OwnerDN": "VARCHAR(255)",
9899
"OwnerGroup": "VARCHAR(32) NOT NULL",
99100
"CPUTime": "BIGINT(20) UNSIGNED NOT NULL",
100101
"Priority": "FLOAT NOT NULL",
101102
"Enabled": "TINYINT(1) NOT NULL DEFAULT 0",
102103
},
103104
"PrimaryKey": "TQId",
104-
"Indexes": {"TQOwner": ["OwnerDN", "OwnerGroup", "CPUTime"]},
105+
"Indexes": {"TQOwner": ["Owner", "OwnerGroup", "CPUTime"]},
105106
}
106107

107108
self.__tablesDesc["tq_Jobs"] = {
@@ -167,11 +168,9 @@ def _checkTaskQueueDefinition(self, tqDefDict):
167168
"""
168169

169170
for field in singleValueDefFields:
170-
if field not in tqDefDict:
171-
return S_ERROR(f"Missing mandatory field '{field}' in task queue definition")
172-
if field in ["CPUTime"]:
171+
if field == "CPUTime":
173172
if not isinstance(tqDefDict[field], int):
174-
return S_ERROR(f"Mandatory field {field} value type is not valid: {type(tqDefDict[field])}")
173+
return S_ERROR(f"Mandatory field 'CPUTime' value type is not valid: {type(tqDefDict['CPUTime'])}")
175174
else:
176175
if not isinstance(tqDefDict[field], str):
177176
return S_ERROR(f"Mandatory field {field} value type is not valid: {type(tqDefDict[field])}")
@@ -261,7 +260,7 @@ def __createTaskQueue(self, tqDefDict, priority=1, connObj=False):
261260
)
262261
result = self._update(cmd, conn=connObj)
263262
if not result["OK"]:
264-
self.log.error("Can't insert TQ in DB", result["Value"])
263+
self.log.error("Can't insert TQ in DB", result["Message"])
265264
return result
266265
if "lastRowId" in result:
267266
tqId = result["lastRowId"]
@@ -383,7 +382,7 @@ def insertJob(self, jobId, tqDefDict, jobPriority, skipTQDefCheck=False):
383382
self.log.error("Error inserting job in TQ", f"Job {jobId} TQ {tqId}: {result['Message']}")
384383
return result
385384
if newTQ:
386-
self.recalculateTQSharesForEntity(tqDefDict["OwnerDN"], tqDefDict["OwnerGroup"], connObj=connObj)
385+
self.recalculateTQSharesForEntity(tqDefDict["Owner"], tqDefDict["OwnerGroup"], connObj=connObj)
387386
finally:
388387
self.__setTaskQueueEnabled(tqId, True)
389388
return S_OK()
@@ -549,7 +548,7 @@ def matchAndGetJob(self, tqMatchDict, numJobsPerTry=50, numQueuesPerTry=10, nega
549548
if not tqList:
550549
self.log.info("No TQ matches requirements")
551550
return S_OK({"matchFound": False, "tqMatch": tqMatchDict})
552-
for tqId, tqOwnerDN, tqOwnerGroup in tqList:
551+
for tqId, tqOwner, tqOwnerGroup in tqList:
553552
self.log.verbose("Trying to extract jobs from TQ", tqId)
554553
retVal = self._query(prioSQL % tqId, conn=connObj)
555554
if not retVal["OK"]:
@@ -564,7 +563,7 @@ def matchAndGetJob(self, tqMatchDict, numJobsPerTry=50, numQueuesPerTry=10, nega
564563
jobTQList = [(row[0], row[1]) for row in retVal["Value"]]
565564
if not jobTQList:
566565
self.log.info("Task queue seems to be empty, triggering a cleaning of", tqId)
567-
self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwnerDN, tqOwnerGroup))
566+
self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwner, tqOwnerGroup))
568567
while jobTQList:
569568
jobId, tqId = jobTQList.pop(random.randint(0, len(jobTQList) - 1))
570569
self.log.verbose("Trying to extract job from TQ", f"{jobId} : {tqId}")
@@ -681,25 +680,22 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None)
681680
# Only enabled TQs
682681
sqlCondList = []
683682
sqlTables = {"tq_TaskQueues": "tq"}
684-
# If OwnerDN and OwnerGroup are defined only use those combinations that make sense
685-
if "OwnerDN" in tqMatchDict and "OwnerGroup" in tqMatchDict:
683+
# If Owner and OwnerGroup are defined only use those combinations that make sense
684+
if "Owner" in tqMatchDict and "OwnerGroup" in tqMatchDict:
686685
groups = tqMatchDict["OwnerGroup"]
687686
if not isinstance(groups, (list, tuple)):
688687
groups = [groups]
689-
dns = tqMatchDict["OwnerDN"]
690-
if not isinstance(dns, (list, tuple)):
691-
dns = [dns]
688+
owner = tqMatchDict["Owner"]
692689
ownerConds = []
693690
for group in groups:
694691
if Properties.JOB_SHARING in Registry.getPropertiesForGroup(group.replace('"', "")):
695692
ownerConds.append(f"tq.OwnerGroup = {group}")
696693
else:
697-
for dn in dns:
698-
ownerConds.append(f"( tq.OwnerDN = {dn} AND tq.OwnerGroup = {group} )")
694+
ownerConds.append(f"( tq.Owner = {owner} AND tq.OwnerGroup = {group} )")
699695
sqlCondList.append(" OR ".join(ownerConds))
700696
else:
701697
# If not both are defined, just add the ones that are defined
702-
for field in ("OwnerGroup", "OwnerDN"):
698+
for field in ("OwnerGroup", "Owner"):
703699
if field in tqMatchDict:
704700
sqlCondList.append(self.__generateSQLSubCond("tq.%s = %%s" % field, tqMatchDict[field]))
705701
# Type of single value conditions
@@ -829,7 +825,7 @@ def __generateTQMatchSQL(self, tqMatchDict, numQueuesToGet=1, negativeCond=None)
829825
sqlCondList.append(self.__generateNotSQL(negativeCond))
830826

831827
# Generate the final query string
832-
tqSqlCmd = "SELECT tq.TQId, tq.OwnerDN, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % (
828+
tqSqlCmd = "SELECT tq.TQId, tq.Owner, tq.OwnerGroup FROM `tq_TaskQueues` tq WHERE %s" % (
833829
" AND ".join(sqlCondList)
834830
)
835831

@@ -883,7 +879,7 @@ def deleteJob(self, jobId, connObj=False):
883879
return S_ERROR(f"Can't delete job: {retVal['Message']}")
884880
connObj = retVal["Value"]
885881
retVal = self._query(
886-
"SELECT t.TQId, t.OwnerDN, t.OwnerGroup \
882+
"SELECT t.TQId, t.Owner, t.OwnerGroup \
887883
FROM `tq_TaskQueues` t, `tq_Jobs` j \
888884
WHERE j.JobId = %s AND t.TQId = j.TQId"
889885
% jobId,
@@ -894,7 +890,7 @@ def deleteJob(self, jobId, connObj=False):
894890
data = retVal["Value"]
895891
if not data:
896892
return S_OK(False)
897-
tqId, tqOwnerDN, tqOwnerGroup = data[0]
893+
tqId, tqOwner, tqOwnerGroup = data[0]
898894
self.log.verbose("Deleting job", jobId)
899895
retVal = self._update(f"DELETE FROM `tq_Jobs` WHERE JobId = {jobId}", conn=connObj)
900896
if not retVal["OK"]:
@@ -903,7 +899,7 @@ def deleteJob(self, jobId, connObj=False):
903899
# No job deleted
904900
return S_OK(False)
905901
# Always return S_OK() because job has already been taken out from the TQ
906-
self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwnerDN, tqOwnerGroup))
902+
self.__deleteTQWithDelay.add(tqId, 300, (tqId, tqOwner, tqOwnerGroup))
907903
return S_OK(True)
908904

909905
def getTaskQueueForJob(self, jobId, connObj=False):
@@ -928,7 +924,7 @@ def getTaskQueueForJob(self, jobId, connObj=False):
928924
return S_OK(retVal["Value"][0][0])
929925

930926
def __getOwnerForTaskQueue(self, tqId, connObj=False):
931-
retVal = self._query(f"SELECT OwnerDN, OwnerGroup from `tq_TaskQueues` WHERE TQId={tqId}", conn=connObj)
927+
retVal = self._query(f"SELECT Owner, OwnerGroup from `tq_TaskQueues` WHERE TQId={tqId}", conn=connObj)
932928
if not retVal["OK"]:
933929
return retVal
934930
data = retVal["Value"]
@@ -937,16 +933,16 @@ def __getOwnerForTaskQueue(self, tqId, connObj=False):
937933
return S_OK(retVal["Value"][0])
938934

939935
def __deleteTQIfEmpty(self, args):
940-
(tqId, tqOwnerDN, tqOwnerGroup) = args
936+
(tqId, tqOwner, tqOwnerGroup) = args
941937
retries = 3
942938
while retries:
943939
retries -= 1
944-
result = self.deleteTaskQueueIfEmpty(tqId, tqOwnerDN, tqOwnerGroup)
940+
result = self.deleteTaskQueueIfEmpty(tqId, tqOwner, tqOwnerGroup)
945941
if result["OK"]:
946942
return
947943
self.log.error("Could not delete TQ", f"{tqId}: {result['Message']}")
948944

949-
def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, connObj=False):
945+
def deleteTaskQueueIfEmpty(self, tqId, tqOwner=False, tqOwnerGroup=False, connObj=False):
950946
"""
951947
Try to delete a task queue if its empty
952948
"""
@@ -956,14 +952,14 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, conn
956952
self.log.error("Can't insert job", retVal["Message"])
957953
return retVal
958954
connObj = retVal["Value"]
959-
if not tqOwnerDN or not tqOwnerGroup:
955+
if not tqOwner or not tqOwnerGroup:
960956
retVal = self.__getOwnerForTaskQueue(tqId, connObj=connObj)
961957
if not retVal["OK"]:
962958
return retVal
963959
data = retVal["Value"]
964960
if not data:
965961
return S_OK(False)
966-
tqOwnerDN, tqOwnerGroup = data
962+
tqOwner, tqOwnerGroup = data
967963

968964
sqlCmd = f"SELECT TQId FROM `tq_TaskQueues` WHERE Enabled >= 1 AND `tq_TaskQueues`.TQId = {tqId} "
969965
sqlCmd += "AND `tq_TaskQueues`.TQId not in ( SELECT DISTINCT TQId from `tq_Jobs` )"
@@ -981,7 +977,7 @@ def deleteTaskQueueIfEmpty(self, tqId, tqOwnerDN=False, tqOwnerGroup=False, conn
981977
retVal = self._update(f"DELETE FROM `tq_TaskQueues` WHERE TQId = {tqId}", conn=connObj)
982978
if not retVal["OK"]:
983979
return retVal
984-
self.recalculateTQSharesForEntity(tqOwnerDN, tqOwnerGroup, connObj=connObj)
980+
self.recalculateTQSharesForEntity(tqOwner, tqOwnerGroup, connObj=connObj)
985981
self.log.info("Deleted empty and enabled TQ", tqId)
986982
return S_OK()
987983
return S_OK(False)
@@ -1093,20 +1089,20 @@ def recalculateTQSharesForAll(self):
10931089
self.recalculateTQSharesForEntity("all", group)
10941090
return S_OK()
10951091

1096-
def recalculateTQSharesForEntity(self, userDN, userGroup, connObj=False):
1092+
def recalculateTQSharesForEntity(self, user, userGroup, connObj=False):
10971093
"""
1098-
Recalculate the shares for a userDN/userGroup combo
1094+
Recalculate the shares for a user/userGroup combo
10991095
"""
1100-
self.log.info("Recalculating shares", f"for {userDN}@{userGroup} TQs")
1096+
self.log.info("Recalculating shares", f"for {user}@{userGroup} TQs")
11011097
if userGroup in self.__groupShares:
11021098
share = self.__groupShares[userGroup]
11031099
else:
11041100
share = float(DEFAULT_GROUP_SHARE)
11051101
if Properties.JOB_SHARING in Registry.getPropertiesForGroup(userGroup):
1106-
# If group has JobSharing just set prio for that entry, userDN is irrelevant
1107-
return self.__setPrioritiesForEntity(userDN, userGroup, share, connObj=connObj)
1102+
# If group has JobSharing just set prio for that entry, user is irrelevant
1103+
return self.__setPrioritiesForEntity(user, userGroup, share, connObj=connObj)
11081104

1109-
selSQL = "SELECT Owner, COUNT(Owner) FROM `tq_TaskQueues` WHERE OwnerGroup='%s' GROUP BY Owner" % (userGroup)
1105+
selSQL = f"SELECT Owner, COUNT(Owner) FROM `tq_TaskQueues` WHERE OwnerGroup='{userGroup}' GROUP BY Owner"
11101106
result = self._query(selSQL, conn=connObj)
11111107
if not result["OK"]:
11121108
return result
@@ -1126,26 +1122,26 @@ def recalculateTQSharesForEntity(self, userDN, userGroup, connObj=False):
11261122
owners = dict(data)
11271123
# IF the user is already known and has more than 1 tq, the rest of the users don't need to be modified
11281124
# (The number of owners didn't change)
1129-
if userDN in owners and owners[userDN] > 1:
1130-
return self.__setPrioritiesForEntity(userDN, userGroup, entitiesShares[userDN], connObj=connObj)
1125+
if user in owners and owners[user] > 1:
1126+
return self.__setPrioritiesForEntity(user, userGroup, entitiesShares[user], connObj=connObj)
11311127
# Oops the number of owners may have changed so we recalculate the prio for all owners in the group
1132-
for userDN in owners:
1133-
self.__setPrioritiesForEntity(userDN, userGroup, entitiesShares[userDN], connObj=connObj)
1128+
for user in owners:
1129+
self.__setPrioritiesForEntity(user, userGroup, entitiesShares[user], connObj=connObj)
11341130
return S_OK()
11351131

1136-
def __setPrioritiesForEntity(self, userDN, userGroup, share, connObj=False, consolidationFunc="AVG"):
1132+
def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consolidationFunc="AVG"):
11371133
"""
1138-
Set the priority for a userDN/userGroup combo given a splitted share
1134+
Set the priority for a user/userGroup combo given a splitted share
11391135
"""
1140-
self.log.info("Setting priorities", f"to {userDN}@{userGroup} TQs")
1136+
self.log.info("Setting priorities", f"to {user}@{userGroup} TQs")
11411137
tqCond = [f"t.OwnerGroup='{userGroup}'"]
11421138
allowBgTQs = gConfig.getValue(f"/Registry/Groups/{userGroup}/AllowBackgroundTQs", False)
11431139
if Properties.JOB_SHARING not in Registry.getPropertiesForGroup(userGroup):
1144-
res = self._escapeString(userDN)
1140+
res = self._escapeString(user)
11451141
if not res["OK"]:
11461142
return res
11471143
userDN = res["Value"]
1148-
tqCond.append(f"t.OwnerDN= {userDN} ")
1144+
tqCond.append(f"t.Owner= {user} ")
11491145
tqCond.append("t.TQId = j.TQId")
11501146
if consolidationFunc == "AVG":
11511147
selectSQL = "SELECT j.TQId, SUM( j.RealPriority )/COUNT(j.RealPriority) \

tests/Integration/WorkloadManagementSystem/Test_JobParameters_MySQLandES.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ def test_MySQLandES_jobParameters():
117117
# So now we are using the ES backend
118118

119119
# This will still be in MySQL, but first it will look if it's in ES
120-
res = jobMonitoringClient.getJobParameter(jobID, "ParName-fromMySQL")
121-
assert res["OK"], res["Message"]
122-
assert res["Value"] == {"ParName-fromMySQL": "ParValue-fromMySQL"}, res["Value"]
120+
_checkWithRetries(
121+
jobMonitoringClient.getJobParameter, (jobID, "ParName-fromMySQL"), {"ParName-fromMySQL": "ParValue-fromMySQL"}
122+
)
123123

124124
# Now we insert (in ES)
125125
res = jobStateUpdateClient.setJobParameter(jobID, "ParName-fromES", "ParValue-fromES")

0 commit comments

Comments
 (0)