Skip to content

Commit 7291fc5

Browse files
committed
fix: use directly the DB instead of going through the service
1 parent a7612df commit 7291fc5

File tree

2 files changed

+30
-27
lines changed

2 files changed

+30
-27
lines changed

src/DIRAC/WorkloadManagementSystem/Utilities/jobAdministration.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from DIRAC import S_ERROR, S_OK, gLogger
22
from DIRAC.StorageManagementSystem.DB.StorageManagementDB import StorageManagementDB
33
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4-
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
54
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
65
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
76
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
8-
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL, RIGHT_DELETE
7+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
98

109

1110
def _deleteJob(jobID, force=False):
@@ -79,15 +78,20 @@ def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
7978
killJobList = []
8079
deleteJobList = []
8180
if validJobList:
81+
result = JobDB().getJobsAttributes(killJobList, ["Status"])
82+
if not result["OK"]:
83+
return result
84+
jobStates = result["Value"]
85+
8286
# Get the jobs allowed to transition to the Killed state
83-
filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED)
87+
filterRes = _filterJobStateTransition(jobStates, JobStatus.KILLED)
8488
if not filterRes["OK"]:
8589
return filterRes
8690
killJobList.extend(filterRes["Value"])
8791

8892
if right == RIGHT_DELETE:
8993
# Get the jobs allowed to transition to the Deleted state
90-
filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED)
94+
filterRes = _filterJobStateTransition(jobStates, JobStatus.DELETED)
9195
if not filterRes["OK"]:
9296
return filterRes
9397
deleteJobList.extend(filterRes["Value"])
@@ -103,10 +107,7 @@ def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
103107
badIDs.append(jobID)
104108

105109
# Look for jobs that are in the Staging state to send kill signal to the stager
106-
result = JobDB().getJobsAttributes(killJobList, ["Status"])
107-
if not result["OK"]:
108-
return result
109-
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]
110+
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]
110111

111112
if stagingJobList:
112113
stagerDB = StorageManagementDB()
@@ -127,3 +128,17 @@ def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
127128

128129
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
129130
return S_OK(jobsList)
131+
132+
133+
def _filterJobStateTransition(jobStates, candidateState):
134+
"""Given a dictionary of jobs states,
135+
return a list of jobs that are allowed to transition to the given candidate state.
136+
"""
137+
allowedJobs = []
138+
139+
for js in jobStates.items():
140+
stateRes = JobStatus.JobsStateMachine(js[1]["Status"]).getNextState(candidateState)
141+
if stateRes["OK"]:
142+
if stateRes["Value"] == candidateState:
143+
allowedJobs.append(js[0])
144+
return S_OK(allowedJobs)

src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobAdministration.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,19 @@
1010

1111

1212
@pytest.mark.parametrize(
13-
"jobIDs_list, right, filtered_jobs, expected_res, expected_value",
13+
"jobIDs_list, right",
1414
[
15-
([], "Kill", [], True, []),
16-
([], "Delete", [], True, []),
17-
(1, "Kill", [], True, []),
18-
(1, "Kill", [1], True, [1]),
19-
([1, 2], "Kill", [], True, []),
20-
([1, 2], "Kill", [1], True, [1]),
21-
(1, "Kill", [1], True, [1]),
22-
([1, 2], "Kill", [1], True, [1]),
23-
([1, 2], "Kill", [2], True, [2]),
24-
([1, 2], "Kill", [], True, []),
25-
([1, 2], "Kill", [1, 2], True, [1, 2]),
15+
([], "Kill"),
16+
([], "Delete"),
17+
(1, "Kill"),
18+
([1, 2], "Kill"),
2619
],
2720
)
28-
def test___kill_delete_jobs(mocker, jobIDs_list, right, filtered_jobs, expected_res, expected_value):
21+
def test___kill_delete_jobs(mocker, jobIDs_list, right):
2922
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.JobDB", MagicMock())
3023
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.TaskQueueDB", MagicMock())
3124
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.PilotAgentsDB", MagicMock())
3225
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.StorageManagementDB", MagicMock())
33-
mocker.patch(
34-
"DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.filterJobStateTransition",
35-
return_value={"OK": True, "Value": filtered_jobs},
36-
)
3726

3827
res = kill_delete_jobs(right, jobIDs_list)
39-
assert res["OK"] == expected_res
40-
assert res["Value"] == expected_value
28+
assert res["OK"]

0 commit comments

Comments
 (0)