Skip to content

Commit 047d02c

Browse files
committed
feat: use directly the utilities instead of going through the service
1 parent 73673ee commit 047d02c

File tree

7 files changed

+40
-64
lines changed

7 files changed

+40
-64
lines changed

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333
from DIRAC.TransformationSystem.Client import TransformationStatus
3434
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
3535
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
36-
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
3736
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
37+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
38+
RIGHT_DELETE,
39+
RIGHT_KILL,
40+
)
41+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
3842

3943
# # agent's name
4044
AGENT_NAME = "Transformation/TransformationCleaningAgent"
@@ -58,8 +62,6 @@ def __init__(self, *args, **kwargs):
5862

5963
# # transformation client
6064
self.transClient = None
61-
# # wms client
62-
self.wmsClient = None
6365
# # request client
6466
self.reqClient = None
6567
# # file catalog client
@@ -120,8 +122,6 @@ def initialize(self):
120122

121123
# # transformation client
122124
self.transClient = TransformationClient()
123-
# # wms client
124-
self.wmsClient = WMSClient()
125125
# # request client
126126
self.reqClient = ReqClient()
127127
# # file catalog client
@@ -613,8 +613,8 @@ def __removeWMSTasks(self, transJobIDs):
613613
# Prevent 0 job IDs
614614
jobIDs = [int(j) for j in transJobIDs if int(j)]
615615
allRemove = True
616-
for jobList in breakListIntoChunks(jobIDs, 500):
617-
res = self.wmsClient.killJob(jobList, force=True)
616+
for jobList in breakListIntoChunks(jobIDs, 1000):
617+
res = kill_delete_jobs(RIGHT_KILL, jobList, force=True)
618618
if res["OK"]:
619619
self.log.info(f"Successfully killed {len(jobList)} jobs from WMS")
620620
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):
@@ -626,7 +626,7 @@ def __removeWMSTasks(self, transJobIDs):
626626
self.log.error("Failed to kill jobs", f"(n={len(res['FailedJobIDs'])})")
627627
allRemove = False
628628

629-
res = self.wmsClient.deleteJob(jobList)
629+
res = kill_delete_jobs(RIGHT_DELETE, jobList, force=True)
630630
if res["OK"]:
631631
self.log.info("Successfully deleted jobs from WMS", f"(n={len(jobList)})")
632632
elif ("InvalidJobIDs" in res) and ("NonauthorizedJobIDs" not in res) and ("FailedJobIDs" not in res):

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@
3535
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
3636
from DIRAC.RequestManagementSystem.Client.Request import Request
3737
from DIRAC.WorkloadManagementSystem.Client import JobStatus
38-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3938
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
4039
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
4140
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
41+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
42+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
43+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4244

4345

4446
class JobCleaningAgent(AgentModule):
@@ -230,11 +232,11 @@ def _deleteRemoveJobs(self, jobList, remove=False):
230232
if not res["OK"]:
231233
self.log.error("No DN found", f"for {user}")
232234
return res
233-
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
234235
if remove:
236+
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
235237
result = wmsClient.removeJob(jobsList)
236238
else:
237-
result = wmsClient.deleteJob(jobsList)
239+
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
238240
if not result["OK"]:
239241
self.log.error(
240242
f"Could not {'remove' if remove else 'delete'} jobs",
@@ -294,7 +296,8 @@ def deleteJobOversizedSandbox(self, jobIDList):
294296
failed = {}
295297
successful = {}
296298

297-
result = JobMonitoringClient().getJobParameters(jobIDList, ["OutputSandboxLFN"])
299+
jobIDs = [int(jobID) for jobID in jobIDList]
300+
result = getJobParameters(jobIDs, "OutputSandboxLFN")
298301
if not result["OK"]:
299302
return result
300303
osLFNDict = result["Value"]

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414
from DIRAC import S_ERROR, S_OK, gConfig
1515
from DIRAC.AccountingSystem.Client.Types.Job import Job
1616
from DIRAC.ConfigurationSystem.Client.Helpers import cfgPath
17-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
1817
from DIRAC.Core.Base.AgentModule import AgentModule
1918
from DIRAC.Core.Utilities import DErrno
2019
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2120
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2221
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
23-
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2422
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2523
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2624
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
25+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL
26+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
2727
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2828
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
2929

@@ -235,7 +235,7 @@ def _failStalledJobs(self, jobID):
235235
# Set the jobs Failed, send them a kill signal in case they are not really dead
236236
# and send accounting info
237237
if setFailed:
238-
res = self._sendKillCommand(jobID)
238+
res = kill_delete_jobs(RIGHT_KILL, [jobID], nonauthJobList=[], force=True)
239239
if not res["OK"]:
240240
self.log.error("Failed to kill job", jobID)
241241

@@ -574,26 +574,3 @@ def _failSubmittingJobs(self):
574574
continue
575575

576576
return S_OK()
577-
578-
def _sendKillCommand(self, job):
579-
"""Send a kill signal to the job such that it cannot continue running.
580-
581-
:param int job: ID of job to send kill command
582-
"""
583-
584-
res = self.jobDB.getJobAttribute(job, "Owner")
585-
if not res["OK"]:
586-
return res
587-
owner = res["Value"]
588-
589-
res = self.jobDB.getJobAttribute(job, "OwnerGroup")
590-
if not res["OK"]:
591-
return res
592-
ownerGroup = res["Value"]
593-
594-
wmsClient = WMSClient(
595-
useCertificates=True,
596-
delegatedDN=getDNForUsername(owner)["Value"][0] if owner else None,
597-
delegatedGroup=ownerGroup,
598-
)
599-
return wmsClient.killJob(job)

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
""" Test class for Job Cleaning Agent
22
"""
3-
import pytest
43
from unittest.mock import MagicMock
54

5+
import pytest
6+
67
# DIRAC Components
7-
from DIRAC import gLogger, S_OK
8+
from DIRAC import S_OK, gLogger
89
from DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent import JobCleaningAgent
910

1011
gLogger.setLevel("DEBUG")
@@ -97,7 +98,7 @@ def test_deleteJobsByStatus(jca, conditions, mockReplyInput, expected):
9798
"inputs, params, expected",
9899
[
99100
([], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
100-
(["a", "b"], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
101+
(["123", "456"], {"OK": True, "Value": {}}, {"OK": True, "Value": {"Failed": {}, "Successful": {}}}),
101102
(
102103
[],
103104
{"OK": True, "Value": {1: {"OutputSandboxLFN": "/some/lfn/1.txt"}}},
@@ -112,11 +113,11 @@ def test_deleteJobsByStatus(jca, conditions, mockReplyInput, expected):
112113
{"OK": True, "Value": {"Failed": {}, "Successful": {1: "/some/lfn/1.txt", 2: "/some/other/lfn/2.txt"}}},
113114
),
114115
(
115-
["a", "b"],
116+
["123", "456"],
116117
{"OK": True, "Value": {1: {"OutputSandboxLFN": "/some/lfn/1.txt"}}},
117118
{"OK": True, "Value": {"Failed": {}, "Successful": {1: "/some/lfn/1.txt"}}},
118119
),
119-
(["a", "b"], {"OK": False}, {"OK": False}),
120+
(["123", "456"], {"OK": False}, {"OK": False}),
120121
],
121122
)
122123
def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
@@ -129,15 +130,14 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
129130
mocker.patch(
130131
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
131132
)
133+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)
132134

133135
jobCleaningAgent = JobCleaningAgent()
134136
jobCleaningAgent.log = gLogger
135137
jobCleaningAgent.log.setLevel("DEBUG")
136138
jobCleaningAgent._AgentModule__configDefaults = mockAM
137139
jobCleaningAgent.initialize()
138140

139-
mockJMC.getJobParameters.return_value = params
140-
141141
result = jobCleaningAgent.deleteJobOversizedSandbox(inputs)
142142

143143
assert result == expected

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ def sja(mocker):
2828
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.rescheduleJobs", return_value=MagicMock())
2929
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
3030
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
31-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
32-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3331

3432
stalledJobAgent = StalledJobAgent()
3533
stalledJobAgent._AgentModule__configDefaults = mockAM

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,6 @@ def export_removeJob(self, jobIDs):
378378
:return: S_OK()/S_ERROR() -- confirmed job IDs
379379
"""
380380

381-
# FIXME: extract the logic to a utility function
382-
383381
jobList = self.__getJobList(jobIDs)
384382
if not jobList:
385383
return S_ERROR("Invalid job specification: " + str(jobIDs))

src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_JobManager.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,35 @@
22
"""
33

44
from unittest.mock import MagicMock
5-
import pytest
65

6+
import pytest
77

88
# sut
99
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
1010

1111

1212
@pytest.mark.parametrize(
13-
"jobIDs_list, right, expected_res, expected_value",
13+
"jobIDs_list, right, filtered_jobs, expected_res, expected_value",
1414
[
15-
([], 'Kill', True, []),
16-
([], 'Delete', True, []),
17-
(1, 'Kill', True, []),
18-
(1, 'Kill', True, []),
19-
([1, 2], 'Kill', True, []),
20-
([1, 2], 'Kill', True, []),
21-
(1, 'Kill', True, [1]),
22-
([1, 2], 'Kill', True, [1]),
23-
([1, 2], 'Kill', True, [1]),
24-
([1, 2], 'Kill', True, []),
25-
([1, 2], 'Kill', True, [1, 2]),
15+
([], "Kill", [], True, []),
16+
([], "Delete", [], True, []),
17+
(1, "Kill", [], True, []),
18+
(1, "Kill", [1], True, [1]),
19+
([1, 2], "Kill", [], True, []),
20+
([1, 2], "Kill", [1], True, [1]),
21+
(1, "Kill", [1], True, [1]),
22+
([1, 2], "Kill", [1], True, [1]),
23+
([1, 2], "Kill", [2], True, [2]),
24+
([1, 2], "Kill", [], True, []),
25+
([1, 2], "Kill", [1,2], True, [1, 2]),
2626
],
2727
)
28-
def test___kill_delete_jobs(mocker, jobIDs_list, right, expected_res, expected_value):
28+
def test___kill_delete_jobs(mocker, jobIDs_list, right, filtered_jobs, expected_res, expected_value):
2929
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.JobDB", MagicMock())
3030
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.TaskQueueDB", MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.PilotAgentsDB", MagicMock())
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.StorageManagementDB", MagicMock())
33-
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.filterJobStateTransition", MagicMock())
33+
mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.jobAdministration.filterJobStateTransition", return_value={"OK": True, "Value": filtered_jobs})
3434

3535
res = kill_delete_jobs(right, jobIDs_list)
3636
assert res["OK"] == expected_res

0 commit comments

Comments
 (0)