Skip to content

Commit 54e3f3f

Browse files
authored
Merge pull request #5598 from fstagni/v7r2-fixes71
[v7r2] Fix bug in TransformationCleaningAgent, that was NOT removing jobs but only deleting them
2 parents 7a39a0d + 9bc116d commit 54e3f3f

File tree

4 files changed

+46
-39
lines changed

4 files changed

+46
-39
lines changed

src/DIRAC/Interfaces/API/Dirac.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1651,8 +1651,9 @@ def getOutputSandbox(self, jobID, outputDir=None, oversized=True, noJobDir=False
16511651
#############################################################################
16521652

16531653
def deleteJob(self, jobID):
1654-
"""Delete job or list of jobs from the WMS, if running these jobs will
1655-
also be killed.
1654+
"""
1655+
Delete (set status=DELETED) to job or list of jobs from the WMS
1656+
If running, these jobs will be first killed.
16561657
16571658
Example Usage:
16581659

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -283,26 +283,30 @@ def _executeClean(self, transDict):
283283
res = self.archiveTransformation(transDict["TransformationID"])
284284
if not res["OK"]:
285285
self.log.error(
286-
"Problems archiving transformation %s: %s" % (transDict["TransformationID"], res["Message"])
286+
"Problems archiving transformation", "%s: %s" % (transDict["TransformationID"], res["Message"])
287287
)
288288
else:
289289
res = self.cleanTransformation(transDict["TransformationID"])
290290
if not res["OK"]:
291291
self.log.error(
292-
"Problems cleaning transformation %s: %s" % (transDict["TransformationID"], res["Message"])
292+
"Problems cleaning transformation", "%s: %s" % (transDict["TransformationID"], res["Message"])
293293
)
294294

295295
def _executeRemoval(self, transDict):
296296
"""Remove files from given transformation."""
297297
res = self.removeTransformationOutput(transDict["TransformationID"])
298298
if not res["OK"]:
299-
self.log.error("Problems removing transformation %s: %s" % (transDict["TransformationID"], res["Message"]))
299+
self.log.error(
300+
"Problems removing transformation", "%s: %s" % (transDict["TransformationID"], res["Message"])
301+
)
300302

301303
def _executeArchive(self, transDict):
302304
"""Archive the given transformation."""
303305
res = self.archiveTransformation(transDict["TransformationID"])
304306
if not res["OK"]:
305-
self.log.error("Problems archiving transformation %s: %s" % (transDict["TransformationID"], res["Message"]))
307+
self.log.error(
308+
"Problems archiving transformation", "%s: %s" % (transDict["TransformationID"], res["Message"])
309+
)
306310

307311
return S_OK()
308312

@@ -385,7 +389,7 @@ def cleanContent(self, directory):
385389
if not filesFound:
386390
self.log.info("No files are registered in the catalog directory %s" % directory)
387391
return S_OK()
388-
self.log.info("Attempting to remove %d possible remnants from the catalog and storage" % len(filesFound))
392+
self.log.info("Attempting to remove possible remnants from the catalog and storage", "(n=%d)" % len(filesFound))
389393

390394
# Executing with shifter proxy
391395
gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
@@ -427,12 +431,12 @@ def __getCatalogDirectoryContents(self, directories):
427431
if "No such file or directory" in res["Message"]:
428432
self.log.info("%s: %s" % (currentDir, res["Message"]))
429433
else:
430-
self.log.error("Failed to get directory %s content: %s" % (currentDir, res["Message"]))
434+
self.log.error("Failed to get directory %s content" % currentDir, res["Message"])
431435
else:
432436
dirContents = res["Value"]
433437
activeDirs.extend(dirContents["SubDirs"])
434438
allFiles.update(dirContents["Files"])
435-
self.log.info("Found %d files" % len(allFiles))
439+
self.log.info("", "Found %d files" % len(allFiles))
436440
return S_OK(list(allFiles))
437441

438442
def cleanTransformationLogFiles(self, directory):
@@ -441,7 +445,7 @@ def cleanTransformationLogFiles(self, directory):
441445
:param self: self reference
442446
:param str directory: folder name
443447
"""
444-
self.log.verbose("Removing log files found in the directory %s" % directory)
448+
self.log.verbose("Removing log files found in the directory", directory)
445449
res = returnSingleResult(StorageElement(self.logSE).removeDirectory(directory, recursive=True))
446450
if not res["OK"]:
447451
if cmpError(res, errno.ENOENT): # No such file or directory
@@ -462,7 +466,7 @@ def removeTransformationOutput(self, transID):
462466
self.log.info("Removing output data for transformation %s" % transID)
463467
res = self.getTransformationDirectories(transID)
464468
if not res["OK"]:
465-
self.log.error('Problem obtaining directories for transformation %s with result "%s"' % (transID, res))
469+
self.log.error("Problem obtaining directories for transformation", "%s with result '%s'" % (transID, res))
466470
return S_OK()
467471
directories = res["Value"]
468472
for directory in directories:
@@ -480,7 +484,7 @@ def removeTransformationOutput(self, transID):
480484
res = self.cleanMetadataCatalogFiles(transID)
481485
if not res["OK"]:
482486
return res
483-
self.log.info("Successfully removed output of transformation %d" % transID)
487+
self.log.info("Successfully removed output of transformation", transID)
484488
# Change the status of the transformation to RemovedFiles
485489
res = self.transClient.setTransformationParameter(transID, "Status", "RemovedFiles")
486490
if not res["OK"]:
@@ -517,10 +521,12 @@ def cleanTransformation(self, transID):
517521
"""This removes what was produced by the supplied transformation,
518522
leaving only some info and log in the transformation DB.
519523
"""
520-
self.log.info("Cleaning transformation %s" % transID)
524+
self.log.info("Cleaning transformation", transID)
521525
res = self.getTransformationDirectories(transID)
522526
if not res["OK"]:
523-
self.log.error('Problem obtaining directories for transformation %s with result "%s"' % (transID, res))
527+
self.log.error(
528+
"Problem obtaining directories for transformation", "%s with result '%s'" % (transID, res["Message"])
529+
)
524530
return S_OK()
525531
directories = res["Value"]
526532
# Clean the jobs in the WMS and any failover requests found
@@ -545,12 +551,12 @@ def cleanTransformation(self, transID):
545551
res = self.transClient.cleanTransformation(transID)
546552
if not res["OK"]:
547553
return res
548-
self.log.info("Successfully cleaned transformation %d" % transID)
554+
self.log.info("Successfully cleaned transformation", transID)
549555
res = self.transClient.setTransformationParameter(transID, "Status", "Cleaned")
550556
if not res["OK"]:
551557
self.log.error("Failed to update status of transformation %s to Cleaned" % (transID), res["Message"])
552558
return res
553-
self.log.info("Updated status of transformation %s to Cleaned" % (transID))
559+
self.log.info("Updated status of transformation", "%s to Cleaned" % (transID))
554560
return S_OK()
555561

556562
def cleanMetadataCatalogFiles(self, transID):
@@ -560,7 +566,7 @@ def cleanMetadataCatalogFiles(self, transID):
560566
return res
561567
fileToRemove = res["Value"]
562568
if not fileToRemove:
563-
self.log.info("No files found for transID %s" % transID)
569+
self.log.info("No files found for transID", transID)
564570
return S_OK()
565571

566572
# Executing with shifter proxy
@@ -574,7 +580,7 @@ def cleanMetadataCatalogFiles(self, transID):
574580
self.log.error("Failed to remove file found in metadata catalog", "%s %s" % (lfn, reason))
575581
if res["Value"]["Failed"]:
576582
return S_ERROR("Failed to remove all files found in the metadata catalog")
577-
self.log.info("Successfully removed all files found in the BK")
583+
self.log.info("Successfully removed all files found in the DFC")
578584
return S_OK()
579585

580586
#############################################################################
@@ -584,7 +590,7 @@ def cleanMetadataCatalogFiles(self, transID):
584590

585591
def cleanTransformationTasks(self, transID):
586592
"""clean tasks from WMS, or from the RMS if it is a DataManipulation transformation"""
587-
self.log.verbose("Cleaning Transformation tasks of transformation %d" % transID)
593+
self.log.verbose("Cleaning Transformation tasks of transformation", transID)
588594
res = self.__getTransformationExternalIDs(transID)
589595
if not res["OK"]:
590596
return res
@@ -640,24 +646,26 @@ def __removeWMSTasks(self, transJobIDs):
640646
if res["OK"]:
641647
self.log.info("Successfully killed %d jobs from WMS" % len(jobList))
642648
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
643-
self.log.info("Found %s jobs which did not exist in the WMS" % len(res["InvalidJobIDs"]))
649+
self.log.info("Found jobs which did not exist in the WMS", "(n=%d)" % len(res["InvalidJobIDs"]))
644650
elif "NonauthorizedJobIDs" in res:
645-
self.log.error("Failed to kill %s jobs because not authorized" % len(res["NonauthorizedJobIDs"]))
651+
self.log.error("Failed to kill jobs because not authorized", "(n=%d)" % len(res["NonauthorizedJobIDs"]))
646652
allRemove = False
647653
elif "FailedJobIDs" in res:
648-
self.log.error("Failed to kill %s jobs" % len(res["FailedJobIDs"]))
654+
self.log.error("Failed to kill jobs", "(n=%d)" % len(res["FailedJobIDs"]))
649655
allRemove = False
650656

651-
res = self.wmsClient.deleteJob(jobList)
657+
res = self.wmsClient.removeJob(jobList)
652658
if res["OK"]:
653-
self.log.info("Successfully removed %d jobs from WMS" % len(jobList))
659+
self.log.info("Successfully removed jobs from WMS", "(n=%d)" % len(jobList))
654660
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
655-
self.log.info("Found %s jobs which did not exist in the WMS" % len(res["InvalidJobIDs"]))
661+
self.log.info("Found jobs which did not exist in the WMS", "(n=%d)" % len(res["InvalidJobIDs"]))
656662
elif "NonauthorizedJobIDs" in res:
657-
self.log.error("Failed to remove %s jobs because not authorized" % len(res["NonauthorizedJobIDs"]))
663+
self.log.error(
664+
"Failed to remove jobs because not authorized", "(n=%d)" % len(res["NonauthorizedJobIDs"])
665+
)
658666
allRemove = False
659667
elif "FailedJobIDs" in res:
660-
self.log.error("Failed to remove %s jobs" % len(res["FailedJobIDs"]))
668+
self.log.error("Failed to remove jobs", "(n=%d)" % len(res["FailedJobIDs"]))
661669
allRemove = False
662670

663671
if not allRemove:
@@ -689,8 +697,8 @@ def __removeWMSTasks(self, transJobIDs):
689697
self.log.verbose("Removed request %s associated to job %d." % (requestID, jobID))
690698

691699
if failed:
692-
self.log.info("Successfully removed %s requests" % (len(failoverRequests) - failed))
693-
self.log.info("Failed to remove %s requests" % failed)
700+
self.log.info("Successfully removed requests", "(n=%d)" % (len(failoverRequests) - failed))
701+
self.log.info("Failed to remove requests", "(n=%d)" % failed)
694702
return S_ERROR("Failed to remove all the request from RequestDB")
695703
self.log.info("Successfully removed all the associated failover requests")
696704
return S_OK()

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ def _getAllowedJobTypes(self):
103103
def execute(self):
104104
"""Remove or delete jobs in various status"""
105105

106-
# TODO: check the WMS SM before calling the functions below (v7r3)
107-
108106
# First, fully remove jobs in JobStatus.DELETED state
109107
result = self.removeDeletedJobs()
110108
if not result["OK"]:
@@ -143,14 +141,13 @@ def execute(self):
143141

144142
return S_OK()
145143

146-
def removeDeletedJobs(self, delay=False):
144+
def removeDeletedJobs(self):
147145
"""Fully remove jobs that are already in status "DELETED", unless there are still requests.
148146
149-
:param int delay: days of delay
150147
:returns: S_OK/S_ERROR
151148
"""
152149

153-
res = self._getJobsList({"Status": JobStatus.DELETED}, delay)
150+
res = self._getJobsList({"Status": JobStatus.DELETED})
154151
if not res["OK"]:
155152
return res
156153
jobList = res["Value"]
@@ -270,7 +267,7 @@ def _getJobsList(self, condDict, delay=None):
270267
:returns: S_OK with jobsList
271268
"""
272269
jobIDsS = set()
273-
delayStr = "and older than %s day(s)" % delay if delay else ""
270+
delayStr = "and older than %s" % delay if delay else ""
274271
self.log.info("Get jobs with %s %s" % (str(condDict), delayStr))
275272
for order in ["JobID:ASC", "JobID:DESC"]:
276273
result = self.jobDB.selectJobs(condDict, older=delay, orderAttribute=order, limit=self.maxJobsAtOnce)
@@ -282,8 +279,9 @@ def _getJobsList(self, condDict, delay=None):
282279

283280
def _getOwnerJobsDict(self, jobList):
284281
"""
285-
gets in input a list of int(JobID) and return a dict with a grouping of them by owner, e.g.
286-
{'dn;group': [1, 3, 4], 'dn;group_1': [5], 'dn_1;group': [2]}
282+
:param list jobList: list of int(JobID)
283+
284+
:returns: a dict with a grouping of them by owner, e.g.{'dn;group': [1, 3, 4], 'dn;group_1': [5], 'dn_1;group': [2]}
287285
"""
288286
res = self.jobDB.getJobsAttributes(jobList, ["OwnerDN", "OwnerGroup"])
289287
if not res["OK"]:

src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ def submitJob(self, jdl, jobDescriptionObject=None):
205205
break
206206
time.sleep(1)
207207
if not confirmed:
208-
# The bulk submission failed, try to delete the created jobs
209-
resultDelete = self.jobManager.deleteJob(jobIDList)
208+
# The bulk submission failed, try to remove the created jobs
209+
resultDelete = self.jobManager.removeJob(jobIDList)
210210
error = "Job submission failed to confirm bulk transaction"
211211
if not resultDelete["OK"]:
212212
error += "; removal of created jobs failed"

0 commit comments

Comments
 (0)