Skip to content

Commit 6f18538

Browse files
authored
Merge pull request DIRACGrid#7906 from fstagni/cherry-pick-2-da6ec1987-integration
[sweep:integration] the TransformationCleaningAgent forces jobs to be killed
2 parents 4b5f728 + 76d9d45 commit 6f18538

File tree

4 files changed

+28
-22
lines changed

4 files changed

+28
-22
lines changed

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
import errno
1313
import os
1414
import re
15-
import time
1615
from datetime import datetime, timedelta
17-
from hashlib import md5
1816

1917
# # from DIRAC
2018
from DIRAC import S_ERROR, S_OK
@@ -613,7 +611,7 @@ def __removeWMSTasks(self, transJobIDs):
613611
jobIDs = [int(j) for j in transJobIDs if int(j)]
614612
allRemove = True
615613
for jobList in breakListIntoChunks(jobIDs, 500):
616-
res = self.wmsClient.killJob(jobList)
614+
res = self.wmsClient.killJob(jobList, force=True)
617615
if res["OK"]:
618616
self.log.info(f"Successfully killed {len(jobList)} jobs from WMS")
619617
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
@@ -679,6 +677,11 @@ def __submitRemovalRequests(self, lfns, transID=0):
679677
:param int transID: transformationID, only used in RequestName
680678
:returns: S_ERROR/S_OK
681679
"""
680+
681+
# These imports are used only in this function
682+
import time
683+
from hashlib import md5
684+
682685
for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)):
683686
oRequest = Request()
684687
requestName = "TCA_{transID}_{index}_{md5(repr(time.time()).encode()).hexdigest()[:5]}"

src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,17 +212,17 @@ def submitJob(self, jdl, jobDescriptionObject=None):
212212

213213
return result
214214

215-
def killJob(self, jobID):
215+
def killJob(self, jobID, force=False):
216216
"""Kill running job.
217217
jobID can be an integer representing a single DIRAC job ID or a list of IDs
218218
"""
219-
return self.jobManager.killJob(jobID)
219+
return self.jobManager.killJob(jobID, force=force)
220220

221-
def deleteJob(self, jobID):
221+
def deleteJob(self, jobID, force=False):
222222
"""Delete job(s) (set their status to DELETED) from the WMS Job database.
223223
jobID can be an integer representing a single DIRAC job ID or a list of IDs
224224
"""
225-
return self.jobManager.deleteJob(jobID)
225+
return self.jobManager.deleteJob(jobID, force=force)
226226

227227
def removeJob(self, jobID):
228228
"""Fully remove job(s) from the WMS Job database.

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ def setJobsMajorStatus(self, jIDList, candidateStatus, force=False):
601601

602602
return self._update(cmd)
603603

604-
def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
604+
def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus="", force=False):
605605
"""Set status of the job specified by its jobID"""
606606
# Do not update the LastUpdate time stamp if setting the Stalled status
607607
update_flag = True
@@ -620,7 +620,7 @@ def setJobStatus(self, jobID, status="", minorStatus="", applicationStatus=""):
620620
attrNames.append("ApplicationStatus")
621621
attrValues.append(applicationStatus[:255])
622622

623-
result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag)
623+
result = self.setJobAttributes(jobID, attrNames, attrValues, update=update_flag, force=force)
624624
if not result["OK"]:
625625
return result
626626

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2323
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2424
from DIRAC.StorageManagementSystem.Client.StorageManagerClient import StorageManagerClient
25-
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
2625
from DIRAC.WorkloadManagementSystem.Client import JobStatus
26+
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
2727
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
2828
RIGHT_DELETE,
2929
RIGHT_KILL,
@@ -435,13 +435,14 @@ def export_removeJob(self, jobIDs):
435435

436436
return S_OK(validJobList)
437437

438-
def __deleteJob(self, jobID):
439-
"""Set the job status to "Deleted" and remove the pilot that ran.
438+
def __deleteJob(self, jobID, force=False):
439+
"""Set the job status to "Deleted"
440+
and remove the pilot that ran and its logging info if the pilot is finished.
440441
441442
:param int jobID: job ID
442443
:return: S_OK()/S_ERROR()
443444
"""
444-
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting"))["OK"]:
445+
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
445446
return result
446447

447448
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
@@ -469,7 +470,7 @@ def __deleteJob(self, jobID):
469470

470471
return S_OK()
471472

472-
def __killJob(self, jobID, sendKillCommand=True):
473+
def __killJob(self, jobID, sendKillCommand=True, force=False):
473474
"""Kill one job
474475
475476
:param int jobID: job ID
@@ -482,14 +483,16 @@ def __killJob(self, jobID, sendKillCommand=True):
482483
return result
483484

484485
self.log.info("Job marked for termination", jobID)
485-
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination"))["OK"]:
486+
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[
487+
"OK"
488+
]:
486489
self.log.warn("Failed to set job Killed status", result["Message"])
487490
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
488491
self.log.warn("Failed to delete job from the TaskQueue", result["Message"])
489492

490493
return S_OK()
491494

492-
def _kill_delete_jobs(self, jobIDList, right):
495+
def _kill_delete_jobs(self, jobIDList, right, force=False):
493496
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
494497
495498
:param list jobIDList: job IDs
@@ -529,12 +532,12 @@ def _kill_delete_jobs(self, jobIDList, right):
529532
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]
530533

531534
for jobID in killJobList:
532-
result = self.__killJob(jobID)
535+
result = self.__killJob(jobID, force=force)
533536
if not result["OK"]:
534537
badIDs.append(jobID)
535538

536539
for jobID in deleteJobList:
537-
result = self.__deleteJob(jobID)
540+
result = self.__deleteJob(jobID, force=force)
538541
if not result["OK"]:
539542
badIDs.append(jobID)
540543

@@ -567,28 +570,28 @@ def _kill_delete_jobs(self, jobIDList, right):
567570
###########################################################################
568571
types_deleteJob = []
569572

570-
def export_deleteJob(self, jobIDs):
573+
def export_deleteJob(self, jobIDs, force=False):
571574
"""Delete jobs specified in the jobIDs list
572575
573576
:param list jobIDs: list of job IDs
574577
575578
:return: S_OK/S_ERROR
576579
"""
577580

578-
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE)
581+
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force)
579582

580583
###########################################################################
581584
types_killJob = []
582585

583-
def export_killJob(self, jobIDs):
586+
def export_killJob(self, jobIDs, force=False):
584587
"""Kill jobs specified in the jobIDs list
585588
586589
:param list jobIDs: list of job IDs
587590
588591
:return: S_OK/S_ERROR
589592
"""
590593

591-
return self._kill_delete_jobs(jobIDs, RIGHT_KILL)
594+
return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force)
592595

593596
###########################################################################
594597
types_resetJob = []

0 commit comments

Comments
 (0)