Skip to content

Commit 026dc1a

Browse files
committed
fix: Optimize JobDB.removeJobFromDB
1 parent df6a61d commit 026dc1a

File tree

1 file changed

+28
-30
lines changed
  • src/DIRAC/WorkloadManagementSystem/DB

1 file changed

+28
-30
lines changed

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.Decorators import deprecated
2222
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
23-
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise
23+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
2424
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
2525
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
2626
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
@@ -975,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup,
975975
return S_OK()
976976

977977
#############################################################################
978+
@convertToReturnValue
978979
def removeJobFromDB(self, jobIDs):
979980
"""
980981
Remove jobs from the Job DB and clean up all the job related data in various tables
981982
"""
982-
983-
# ret = self._escapeString(jobID)
984-
# if not ret['OK']:
985-
# return ret
986-
# e_jobID = ret['Value']
987-
988983
if not jobIDs:
989-
return S_OK()
990-
991-
if not isinstance(jobIDs, list):
992-
jobIDList = [jobIDs]
993-
else:
994-
jobIDList = jobIDs
984+
return None
985+
jobIDList = jobIDs if isinstance(jobIDs, list) else [jobIDs]
995986

996987
failedTablesList = []
997-
for table in [
998-
"InputData",
999-
"JobParameters",
1000-
"AtticJobParameters",
1001-
"HeartBeatLoggingInfo",
1002-
"OptimizerParameters",
1003-
"JobCommands",
1004-
"Jobs",
1005-
"JobJDLs",
1006-
]:
1007-
cmd = f"DELETE FROM {table} WHERE JobID in ({','.join(str(j) for j in jobIDList)})"
1008-
result = self._update(cmd)
1009-
if not result["OK"]:
1010-
failedTablesList.append(table)
1011988

1012-
if failedTablesList:
1013-
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
989+
sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
990+
returnValueOrRaise(self._update(sqlCmd))
991+
try:
992+
sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )"
993+
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobIDList]))
994+
995+
for table in [
996+
"InputData",
997+
"JobParameters",
998+
"AtticJobParameters",
999+
"HeartBeatLoggingInfo",
1000+
"OptimizerParameters",
1001+
"JobCommands",
1002+
"Jobs",
1003+
"JobJDLs",
1004+
]:
1005+
sqlCmd = f"DELETE m from `{table}` m JOIN to_delete_Jobs t USING (JobID)"
1006+
if not self._update(sqlCmd)["OK"]:
1007+
failedTablesList.append(table)
1008+
finally:
1009+
sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs"
1010+
returnValueOrRaise(self._update(sqlCmd))
10141011

1015-
return S_OK()
1012+
if failedTablesList:
1013+
raise SErrorException(f"Errors while job removal (tables {','.join(failedTablesList)})")
10161014

10171015
#############################################################################
10181016
def rescheduleJob(self, jobID):

0 commit comments

Comments
 (0)