Skip to content

Commit 2fad6e0

Browse files
committed
fix: Improve performance of job delete/kill/reschedule API
1 parent 71c456a commit 2fad6e0

File tree

2 files changed

+52
-20
lines changed

2 files changed

+52
-20
lines changed

src/DIRAC/Interfaces/API/Dirac.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,14 +1652,15 @@ def deleteJob(self, jobID):
16521652
return ret
16531653
jobIDs = ret["Value"]
16541654

1655-
jobIDsToDelete = []
1656-
for jobID in jobIDs:
1657-
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
1658-
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
1659-
if can_kill or can_del:
1660-
jobIDsToDelete.append(jobID)
1661-
1662-
result = WMSClient(useCertificates=self.useCertificates).deleteJob(jobIDsToDelete)
1655+
# Remove any job IDs that can't change to the Killed or Deleted states
1656+
filteredJobs = set()
1657+
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
1658+
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
1659+
if not filterRes["OK"]:
1660+
return filterRes
1661+
filteredJobs.update(filterRes["Value"])
1662+
1663+
result = WMSClient(useCertificates=self.useCertificates).deleteJob(list(filteredJobs))
16631664
if result["OK"]:
16641665
if self.jobRepo:
16651666
for jID in result["Value"]:
@@ -1689,11 +1690,11 @@ def rescheduleJob(self, jobID):
16891690
return ret
16901691
jobIDs = ret["Value"]
16911692

1692-
jobIDsToReschedule = []
1693-
for jobID in jobIDs:
1694-
res = JobStatus.checkJobStateTransition(jobID, JobStatus.RESCHEDULED)
1695-
if res["OK"]:
1696-
jobIDsToReschedule.append(jobID)
1693+
# Remove any job IDs that can't change to the rescheduled state
1694+
filterRes = JobStatus.filterJobStateTransition(jobIDs, JobStatus.RESCHEDULED)
1695+
if not filterRes["OK"]:
1696+
return filterRes
1697+
jobIDsToReschedule = filterRes["Value"]
16971698

16981699
result = WMSClient(useCertificates=self.useCertificates).rescheduleJob(jobIDsToReschedule)
16991700
if result["OK"]:
@@ -1724,14 +1725,15 @@ def killJob(self, jobID):
17241725
return ret
17251726
jobIDs = ret["Value"]
17261727

1727-
jobIDsToKill = []
1728-
for jobID in jobIDs:
1729-
can_kill = JobStatus.checkJobStateTransition(jobID, JobStatus.KILLED)["OK"]
1730-
can_del = JobStatus.checkJobStateTransition(jobID, JobStatus.DELETED)["OK"]
1731-
if can_kill or can_del:
1732-
jobIDsToKill.append(jobID)
1728+
# Remove any job IDs that can't change to the Killed or Deleted states
1729+
filteredJobs = set()
1730+
for filterState in (JobStatus.KILLED, JobStatus.DELETED):
1731+
filterRes = JobStatus.filterJobStateTransition(jobIDs, filterState)
1732+
if not filterRes["OK"]:
1733+
return filterRes
1734+
filteredJobs.update(filterRes["Value"])
17331735

1734-
result = WMSClient(useCertificates=self.useCertificates).killJob(jobIDsToKill)
1736+
result = WMSClient(useCertificates=self.useCertificates).killJob(list(filteredJobs))
17351737
if result["OK"]:
17361738
if self.jobRepo:
17371739
for jID in result["Value"]:

src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from DIRAC import gLogger, S_OK, S_ERROR
66
from DIRAC.Core.Utilities.StateMachine import State, StateMachine
7+
from DIRAC.Core.Utilities.Decorators import deprecated
78

89

910
#:
@@ -97,6 +98,7 @@ def __init__(self, state):
9798
}
9899

99100

101+
@deprecated("Use filterJobStateTransition instead")
100102
def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonitoringClient=None):
101103
"""Utility to check if a job state transition is allowed"""
102104
if not currentStatus:
@@ -125,3 +127,31 @@ def checkJobStateTransition(jobID, candidateState, currentStatus=None, jobMonito
125127
)
126128
return S_ERROR("Job state transition not allowed")
127129
return S_OK()
130+
131+
132+
def filterJobStateTransition(jobIDs, candidateState, jobMonitoringClient=None):
133+
"""Given a list of jobIDs, return a list that are allowed to transition
134+
to the given candidate state.
135+
"""
136+
allowedJobs = []
137+
138+
if not isinstance(jobIDs, list):
139+
jobIDs = [jobIDs]
140+
141+
if not jobMonitoringClient:
142+
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
143+
144+
jobMonitoringClient = JobMonitoringClient()
145+
146+
res = jobMonitoringClient.getJobsStatus(jobIDs)
147+
if not res["OK"]:
148+
return res
149+
150+
for jobID in jobIDs:
151+
if jobID in res["Value"]:
152+
curState = res["Value"][jobID]["Status"]
153+
stateRes = JobsStateMachine(curState).getNextState(candidateState)
154+
if stateRes["OK"]:
155+
if stateRes["Value"] == candidateState:
156+
allowedJobs.append(jobID)
157+
return S_OK(allowedJobs)

0 commit comments

Comments
 (0)