Skip to content

Commit db98c79

Browse files
authored
Merge pull request #8171 from chrisburr/faster-job-cleaning
Faster job cleaning
2 parents 1a9245a + 026dc1a commit db98c79

File tree

6 files changed

+114
-110
lines changed

6 files changed

+114
-110
lines changed

src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2108,7 +2108,8 @@ def installDatabase(self, dbName):
21082108

21092109
perms = (
21102110
"SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,"
2111-
"CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE"
2111+
"CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE,"
2112+
"CREATE TEMPORARY TABLES"
21122113
)
21132114
cmd = f"GRANT {perms} ON `{dbName}`.* TO '{self.mysqlUser}'@'%'"
21142115
result = self.execMySQL(cmd)

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def _deleteRemoveJobs(self, jobList, remove=False):
236236
result = wmsClient.deleteJob(jobsList)
237237
if not result["OK"]:
238238
self.log.error(
239-
"Could not {'remove' if remove else 'delete'} jobs",
239+
f"Could not {'remove' if remove else 'delete'} jobs",
240240
f"for {user} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}",
241241
)
242242
fail = True

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 50 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.Decorators import deprecated
2222
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
23-
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
23+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
2424
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
2525
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
2626
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
@@ -106,18 +106,7 @@ def getJobParameters(self, jobID, paramList=None):
106106
Returns a dictionary with the Job Parameters.
107107
If parameterList is empty - all the parameters are returned.
108108
"""
109-
110-
if isinstance(jobID, (str, int)):
111-
jobID = [jobID]
112-
113-
jobIDList = []
114-
for jID in jobID:
115-
ret = self._escapeString(str(jID))
116-
if not ret["OK"]:
117-
return ret
118-
jobIDList.append(ret["Value"])
119-
120-
# self.log.debug('JobDB.getParameters: Getting Parameters for jobs %s' % ','.join(jobIDList))
109+
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID
121110

122111
resultDict = {}
123112
if paramList:
@@ -130,7 +119,7 @@ def getJobParameters(self, jobID, paramList=None):
130119
return ret
131120
paramNameList.append(ret["Value"])
132121
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
133-
",".join(jobIDList),
122+
",".join(str(int(j)) for j in jobIDList),
134123
",".join(paramNameList),
135124
)
136125
result = self._query(cmd)
@@ -207,13 +196,13 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
207196
return S_ERROR("JobDB.getAtticJobParameters: failed to retrieve parameters")
208197

209198
#############################################################################
199+
@convertToReturnValue
210200
def getJobsAttributes(self, jobIDs, attrList=None):
211201
"""Get all Job(s) Attributes for a given list of jobIDs.
212202
Return a dictionary with all Job Attributes as value pairs
213203
"""
214-
215204
if not jobIDs:
216-
return S_OK({})
205+
return {}
217206

218207
# If no list of attributes is given, return all attributes
219208
if not attrList:
@@ -229,28 +218,29 @@ def getJobsAttributes(self, jobIDs, attrList=None):
229218

230219
attrNameListS = []
231220
for x in attrList:
232-
ret = self._escapeString(x)
233-
if not ret["OK"]:
234-
return ret
235-
x = "`" + ret["Value"][1:-1] + "`"
221+
x = "`" + returnValueOrRaise(self._escapeString(x))[1:-1] + "`"
236222
attrNameListS.append(x)
237223
attrNames = "JobID," + ",".join(attrNameListS)
238224

239-
cmd = f"SELECT {attrNames} FROM Jobs WHERE JobID IN ({','.join(str(jobID) for jobID in jobIDs)})"
240-
res = self._query(cmd)
241-
if not res["OK"]:
242-
return res
243-
if not res["Value"]:
244-
return S_OK({})
225+
sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
226+
returnValueOrRaise(self._update(sqlCmd))
227+
try:
228+
sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )"
229+
returnValueOrRaise(self._updatemany(sqlCmd, [(int(j),) for j in jobIDs]))
230+
sqlCmd = f"SELECT {attrNames} FROM Jobs JOIN to_select_Jobs USING (JobID)"
231+
result = returnValueOrRaise(self._query(sqlCmd))
232+
finally:
233+
sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs"
234+
returnValueOrRaise(self._update(sqlCmd))
245235

246236
attributes = {}
247-
for t_att in res["Value"]:
237+
for t_att in result:
248238
jobID = int(t_att[0])
249239
attributes.setdefault(jobID, {})
250240
for tx, ax in zip(t_att[1:], attrList):
251241
attributes[jobID].setdefault(ax, tx)
252242

253-
return S_OK(attributes)
243+
return attributes
254244

255245
#############################################################################
256246
def getJobAttributes(self, jobID, attrList=None):
@@ -527,12 +517,10 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
527517
if not isinstance(jobID, (list, tuple)):
528518
jobIDList = [jobID]
529519

530-
jIDList = []
531-
for jID in jobIDList:
532-
ret = self._escapeString(jID)
533-
if not ret["OK"]:
534-
return ret
535-
jIDList.append(ret["Value"])
520+
try:
521+
jIDList = [int(jID) for jID in jobIDList]
522+
except ValueError as e:
523+
return S_ERROR(f"JobDB.setAttributes: {e}")
536524

537525
if len(attrNames) != len(attrValues):
538526
return S_ERROR("JobDB.setAttributes: incompatible Argument length")
@@ -561,7 +549,7 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
561549
if not attr:
562550
return S_ERROR("JobDB.setAttributes: Nothing to do")
563551

564-
cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(jIDList)} )"
552+
cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(str(int(j)) for j in jIDList)} )"
565553

566554
if myDate:
567555
cmd += f" AND LastUpdateTime < {myDate}"
@@ -987,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup,
987975
return S_OK()
988976

989977
#############################################################################
978+
@convertToReturnValue
990979
def removeJobFromDB(self, jobIDs):
991980
"""
992981
Remove jobs from the Job DB and clean up all the job related data in various tables
993982
"""
994-
995-
# ret = self._escapeString(jobID)
996-
# if not ret['OK']:
997-
# return ret
998-
# e_jobID = ret['Value']
999-
1000983
if not jobIDs:
1001-
return S_OK()
1002-
1003-
if not isinstance(jobIDs, list):
1004-
jobIDList = [jobIDs]
1005-
else:
1006-
jobIDList = jobIDs
984+
return None
985+
jobIDList = jobIDs if isinstance(jobIDs, list) else [jobIDs]
1007986

1008987
failedTablesList = []
1009-
for table in [
1010-
"InputData",
1011-
"JobParameters",
1012-
"AtticJobParameters",
1013-
"HeartBeatLoggingInfo",
1014-
"OptimizerParameters",
1015-
"JobCommands",
1016-
"Jobs",
1017-
"JobJDLs",
1018-
]:
1019-
cmd = f"DELETE FROM {table} WHERE JobID in ({','.join(str(j) for j in jobIDList)})"
1020-
result = self._update(cmd)
1021-
if not result["OK"]:
1022-
failedTablesList.append(table)
1023988

1024-
if failedTablesList:
1025-
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
989+
sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
990+
returnValueOrRaise(self._update(sqlCmd))
991+
try:
992+
sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )"
993+
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobIDList]))
994+
995+
for table in [
996+
"InputData",
997+
"JobParameters",
998+
"AtticJobParameters",
999+
"HeartBeatLoggingInfo",
1000+
"OptimizerParameters",
1001+
"JobCommands",
1002+
"Jobs",
1003+
"JobJDLs",
1004+
]:
1005+
sqlCmd = f"DELETE m from `{table}` m JOIN to_delete_Jobs t USING (JobID)"
1006+
if not self._update(sqlCmd)["OK"]:
1007+
failedTablesList.append(table)
1008+
finally:
1009+
sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs"
1010+
returnValueOrRaise(self._update(sqlCmd))
10261011

1027-
return S_OK()
1012+
if failedTablesList:
1013+
raise SErrorException(f"Errors while job removal (tables {','.join(failedTablesList)})")
10281014

10291015
#############################################################################
10301016
def rescheduleJob(self, jobID):

src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from DIRAC import S_ERROR, S_OK
1212
from DIRAC.Core.Base.DB import DB
1313
from DIRAC.Core.Utilities import TimeUtilities
14+
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue
1415
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
1516

1617
MAGIC_EPOC_NUMBER = 1270000000
@@ -145,21 +146,33 @@ def getJobLoggingInfo(self, jobID):
145146
return S_OK(return_value)
146147

147148
#############################################################################
149+
@convertToReturnValue
148150
def deleteJob(self, jobID):
149151
"""Delete logging records for given jobs"""
150152
if not jobID:
151-
return S_OK()
153+
return None
152154

153-
# Make sure that we have a list of strings of jobIDs
154155
if isinstance(jobID, int):
155-
jobList = [str(jobID)]
156+
jobList = [jobID]
156157
elif isinstance(jobID, str):
157158
jobList = jobID.replace(" ", "").split(",")
158159
else:
159-
jobList = list(str(j) for j in jobID)
160+
jobList = jobID
160161

161-
req = f"DELETE FROM LoggingInfo WHERE JobID IN ({','.join(jobList)})"
162-
return self._update(req)
162+
sqlCmd = (
163+
"CREATE TEMPORARY TABLE to_delete_LoggingInfo (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
164+
)
165+
returnValueOrRaise(self._update(sqlCmd))
166+
try:
167+
sqlCmd = "INSERT INTO to_delete_LoggingInfo (JobID) VALUES ( %s )"
168+
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobList]))
169+
sqlCmd = "DELETE l from `LoggingInfo` l JOIN to_delete_LoggingInfo t USING (JobID)"
170+
result = returnValueOrRaise(self._update(sqlCmd))
171+
finally:
172+
sqlCmd = "DROP TEMPORARY TABLE to_delete_LoggingInfo"
173+
returnValueOrRaise(self._update(sqlCmd))
174+
175+
return result
163176

164177
#############################################################################
165178
def getWMSTimeStamps(self, jobID):

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from DIRAC.Core.Base.DB import DB
2929
from DIRAC.Core.Utilities import DErrno
3030
from DIRAC.Core.Utilities.MySQL import _quotedList
31+
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
3132
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
3233
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
3334
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
@@ -401,9 +402,18 @@ def __getPilotID(self, pilotRef):
401402
if result["Value"]:
402403
return int(result["Value"][0][0])
403404
return 0
404-
refString = ",".join(["'" + ref + "'" for ref in pilotRef])
405-
req = f"SELECT PilotID from PilotAgents WHERE PilotJobReference in ( {refString} )"
406-
result = self._query(req)
405+
406+
sqlCmd = "CREATE TEMPORARY TABLE to_select_PilotAgents (PilotID VARCHAR(255) NOT NULL, PRIMARY KEY (PilotID)) ENGINE=MEMORY;"
407+
returnValueOrRaise(self._update(sqlCmd))
408+
try:
409+
sqlCmd = "INSERT INTO to_select_PilotAgents (PilotID) VALUES ( %s )"
410+
returnValueOrRaise(self._updatemany(sqlCmd, [(p,) for p in pilotRef]))
411+
sqlCmd = "SELECT PilotID FROM PilotAgents JOIN to_select_PilotAgents USING (PilotID)"
412+
result = self._query(sqlCmd)
413+
finally:
414+
sqlCmd = "DROP TEMPORARY TABLE to_select_PilotAgents"
415+
returnValueOrRaise(self._update(sqlCmd))
416+
407417
if not result["OK"]:
408418
return []
409419
if result["Value"]:

src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from DIRAC.Core.Base.DB import DB
66
from DIRAC.Core.Security import Properties
77
from DIRAC.Core.Utilities import List
8+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise
89

910

1011
class SandboxMetadataDB(DB):
@@ -219,11 +220,8 @@ def assignSandboxesToEntities(self, enDict, requesterName, requesterGroup, owner
219220
return result
220221
return S_OK(assigned)
221222

222-
def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGroup):
223-
"""
224-
Given a list of entities and a requester, return the ones that the requester is allowed to modify
225-
"""
226-
sqlCond = ["s.OwnerId=o.OwnerId", "s.SBId=e.SBId"]
223+
def __entitiesByRequesterCond(self, requesterName, requesterGroup):
224+
sqlCond = []
227225
requesterProps = Registry.getPropertiesForEntity(requesterGroup, name=requesterName)
228226
if Properties.JOB_ADMINISTRATOR in requesterProps:
229227
# Do nothing, just ensure it doesn't fit in the other cases
@@ -235,44 +233,40 @@ def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGrou
235233
sqlCond.append(f"o.Owner='{requesterName}'")
236234
else:
237235
return S_ERROR("Not authorized to access sandbox")
238-
for i in range(len(entitiesList)):
239-
entitiesList[i] = self._escapeString(entitiesList[i])["Value"]
240-
if len(entitiesList) == 1:
241-
sqlCond.append(f"e.EntityId = {entitiesList[0]}")
242-
else:
243-
sqlCond.append(f"e.EntityId in ( {', '.join(entitiesList)} )")
244-
sqlCmd = "SELECT DISTINCT e.EntityId FROM `sb_EntityMapping` e, `sb_SandBoxes` s, `sb_Owners` o WHERE"
245-
sqlCmd = f"{sqlCmd} {' AND '.join(sqlCond)}"
246-
result = self._query(sqlCmd)
247-
if not result["OK"]:
248-
return result
249-
return S_OK([row[0] for row in result["Value"]])
236+
return sqlCond
250237

238+
@convertToReturnValue
251239
def unassignEntities(self, entities, requesterName, requesterGroup):
252240
"""
253241
Unassign jobs to sandboxes
254242
255243
:param list entities: list of entities to unassign
256244
"""
257-
updated = 0
258245
if not entities:
259-
return S_OK()
260-
result = self.__filterEntitiesByRequester(entities, requesterName, requesterGroup)
261-
if not result["OK"]:
262-
gLogger.error("Cannot filter entities", result["Message"])
263-
return result
264-
ids = result["Value"]
265-
if not ids:
266-
return S_OK(0)
267-
sqlCmd = "DELETE FROM `sb_EntityMapping` WHERE EntityId in ( %s )" % ", ".join(
268-
["'%s'" % str(eid) for eid in ids]
269-
)
270-
result = self._update(sqlCmd)
271-
if not result["OK"]:
272-
gLogger.error("Cannot unassign entities", result["Message"])
273-
else:
274-
updated += 1
275-
return S_OK(updated)
246+
return None
247+
conds = self.__entitiesByRequesterCond(requesterName, requesterGroup)
248+
249+
sqlCmd = "CREATE TEMPORARY TABLE to_delete_EntityId (EntityId VARCHAR(128) NOT NULL, PRIMARY KEY (EntityId)) ENGINE=MEMORY;"
250+
returnValueOrRaise(self._update(sqlCmd))
251+
try:
252+
sqlCmd = "INSERT INTO to_delete_EntityId (EntityId) VALUES ( %s )"
253+
returnValueOrRaise(self._updatemany(sqlCmd, [(e,) for e in entities]))
254+
sqlCmd = "DELETE m from `sb_EntityMapping` m JOIN to_delete_EntityId t USING (EntityId)"
255+
if conds:
256+
sqlCmd = " ".join(
257+
[
258+
sqlCmd,
259+
"JOIN `sb_SandBoxes` s ON s.SBId = m.SBId",
260+
"JOIN `sb_Owners` o ON s.OwnerId = o.OwnerId",
261+
"WHERE",
262+
" AND ".join(conds),
263+
]
264+
)
265+
returnValueOrRaise(self._update(sqlCmd))
266+
finally:
267+
sqlCmd = "DROP TEMPORARY TABLE to_delete_EntityId"
268+
returnValueOrRaise(self._update(sqlCmd))
269+
return 1
276270

277271
def getSandboxesAssignedToEntity(self, entityId, requesterName, requesterGroup, requestedVO):
278272
"""

0 commit comments

Comments
 (0)