Skip to content

Commit 73673ee

Browse files
committed
refactor: extracted utilities from WMSClient
1 parent c94e03c commit 73673ee

File tree

6 files changed

+187
-176
lines changed

6 files changed

+187
-176
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def jca(mocker):
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
3333
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
3434
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
35-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobMonitoringClient", return_value=mockJMC)
3635

3736
jca = JobCleaningAgent()
3837
jca.log = gLogger
@@ -127,7 +126,6 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
127126
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
128127
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
129128
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
130-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobMonitoringClient", return_value=mockJMC)
131129
mocker.patch(
132130
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
133131
)

src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
methods necessary to communicate with the Workload Management System
33
"""
44
import os
5-
from io import StringIO
65
import time
6+
from io import StringIO
77

8-
from DIRAC import S_OK, S_ERROR, gLogger
9-
8+
from DIRAC import S_ERROR, S_OK, gLogger
109
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1110
from DIRAC.Core.Utilities import File
1211
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 29 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
from DIRAC.Core.Utilities.JEncode import strToIntDict
2222
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2323
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
24-
from DIRAC.StorageManagementSystem.DB.StorageManagementDB import StorageManagementDB
2524
from DIRAC.WorkloadManagementSystem.Client import JobStatus
26-
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
2725
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
2826
RIGHT_DELETE,
2927
RIGHT_KILL,
@@ -32,6 +30,7 @@
3230
RIGHT_SUBMIT,
3331
JobPolicy,
3432
)
33+
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import kill_delete_jobs
3534
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3635
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
3736
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
@@ -379,6 +378,8 @@ def export_removeJob(self, jobIDs):
379378
:return: S_OK()/S_ERROR() -- confirmed job IDs
380379
"""
381380

381+
# FIXME: extract the logic to a utility function
382+
382383
jobList = self.__getJobList(jobIDs)
383384
if not jobList:
384385
return S_ERROR("Invalid job specification: " + str(jobIDs))
@@ -430,131 +431,28 @@ def export_removeJob(self, jobIDs):
430431

431432
return S_OK(validJobList)
432433

433-
def __deleteJob(self, jobID, force=False):
434-
"""Set the job status to "Deleted"
435-
and remove the pilot that ran and its logging info if the pilot is finished.
436-
437-
:param int jobID: job ID
438-
:return: S_OK()/S_ERROR()
439-
"""
440-
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
441-
return result
442-
443-
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
444-
self.log.warn("Failed to delete job from the TaskQueue")
445-
446-
# if it was the last job for the pilot
447-
result = self.pilotAgentsDB.getPilotsForJobID(jobID)
448-
if not result["OK"]:
449-
self.log.error("Failed to get Pilots for JobID", result["Message"])
450-
return result
451-
for pilot in result["Value"]:
452-
res = self.pilotAgentsDB.getJobsForPilot(pilot)
453-
if not res["OK"]:
454-
self.log.error("Failed to get jobs for pilot", res["Message"])
455-
return res
456-
if not res["Value"]: # if list of jobs for pilot is empty, delete pilot
457-
result = self.pilotAgentsDB.getPilotInfo(pilotID=pilot)
458-
if not result["OK"]:
459-
self.log.error("Failed to get pilot info", result["Message"])
460-
return result
461-
ret = self.pilotAgentsDB.deletePilot(result["Value"]["PilotJobReference"])
462-
if not ret["OK"]:
463-
self.log.error("Failed to delete pilot from PilotAgentsDB", ret["Message"])
464-
return ret
465-
466-
return S_OK()
434+
###########################################################################
435+
types_deleteJob = []
467436

468-
def __killJob(self, jobID, sendKillCommand=True, force=False):
469-
"""Kill one job
437+
def export_deleteJob(self, jobIDs, force=False):
438+
"""Delete jobs specified in the jobIDs list
470439
471-
:param int jobID: job ID
472-
:param bool sendKillCommand: send kill command
440+
:param list jobIDs: list of job IDs
473441
474-
:return: S_OK()/S_ERROR()
442+
:return: S_OK/S_ERROR
475443
"""
476-
if sendKillCommand:
477-
if not (result := self.jobDB.setJobCommand(jobID, "Kill"))["OK"]:
478-
return result
479-
480-
self.log.info("Job marked for termination", jobID)
481-
if not (result := self.jobDB.setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))[
482-
"OK"
483-
]:
484-
self.log.warn("Failed to set job Killed status", result["Message"])
485-
if not (result := self.taskQueueDB.deleteJob(jobID))["OK"]:
486-
self.log.warn("Failed to delete job from the TaskQueue", result["Message"])
487-
488-
return S_OK()
489444

490-
def _kill_delete_jobs(self, jobIDList, right, force=False):
491-
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
492-
493-
:param list jobIDList: job IDs
494-
:param str right: RIGHT_KILL or RIGHT_DELETE
495-
496-
:return: S_OK()/S_ERROR()
497-
"""
498-
jobList = self.__getJobList(jobIDList)
445+
jobList = self.__getJobList(jobIDs)
499446
if not jobList:
500447
self.log.warn("No jobs specified")
501448
return S_OK([])
502449

503-
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right)
504-
505-
badIDs = []
506-
507-
killJobList = []
508-
deleteJobList = []
509-
if validJobList:
510-
# Get the jobs allowed to transition to the Killed state
511-
filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED)
512-
if not filterRes["OK"]:
513-
return filterRes
514-
killJobList.extend(filterRes["Value"])
515-
516-
if not right == RIGHT_KILL:
517-
# Get the jobs allowed to transition to the Deleted state
518-
filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED)
519-
if not filterRes["OK"]:
520-
return filterRes
521-
deleteJobList.extend(filterRes["Value"])
522-
523-
# Look for jobs that are in the Staging state to send kill signal to the stager
524-
result = self.jobDB.getJobsAttributes(killJobList, ["Status"])
525-
if not result["OK"]:
526-
return result
527-
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]
528-
529-
for jobID in killJobList:
530-
result = self.__killJob(jobID, force=force)
531-
if not result["OK"]:
532-
badIDs.append(jobID)
533-
534-
for jobID in deleteJobList:
535-
result = self.__deleteJob(jobID, force=force)
536-
if not result["OK"]:
537-
badIDs.append(jobID)
538-
539-
if stagingJobList:
540-
stagerDB = StorageManagementDB()
541-
self.log.info("Going to send killing signal to stager as well!")
542-
result = stagerDB.killTasksBySourceTaskID(stagingJobList)
543-
if not result["OK"]:
544-
self.log.warn("Failed to kill some Stager tasks", result["Message"])
450+
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
451+
jobList, RIGHT_DELETE
452+
)
545453

546-
if nonauthJobList or badIDs:
547-
result = S_ERROR("Some jobs failed deletion")
548-
if nonauthJobList:
549-
self.log.warn("Non-authorized JobIDs won't be deleted", str(nonauthJobList))
550-
result["NonauthorizedJobIDs"] = nonauthJobList
551-
if badIDs:
552-
self.log.warn("JobIDs failed to be deleted", str(badIDs))
553-
result["FailedJobIDs"] = badIDs
554-
return result
454+
result = kill_delete_jobs(RIGHT_DELETE, validJobList, nonauthJobList, force=force)
555455

556-
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
557-
result = S_OK(jobsList)
558456
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
559457

560458
if invalidJobList:
@@ -563,30 +461,33 @@ def _kill_delete_jobs(self, jobIDList, right, force=False):
563461
return result
564462

565463
###########################################################################
566-
types_deleteJob = []
464+
types_killJob = []
567465

568-
def export_deleteJob(self, jobIDs, force=False):
569-
"""Delete jobs specified in the jobIDs list
466+
def export_killJob(self, jobIDs, force=False):
467+
"""Kill jobs specified in the jobIDs list
570468
571469
:param list jobIDs: list of job IDs
572470
573471
:return: S_OK/S_ERROR
574472
"""
575473

576-
return self._kill_delete_jobs(jobIDs, RIGHT_DELETE, force=force)
474+
jobList = self.__getJobList(jobIDs)
475+
if not jobList:
476+
self.log.warn("No jobs specified")
477+
return S_OK([])
577478

578-
###########################################################################
579-
types_killJob = []
479+
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
480+
jobList, RIGHT_KILL
481+
)
580482

581-
def export_killJob(self, jobIDs, force=False):
582-
"""Kill jobs specified in the jobIDs list
483+
result = kill_delete_jobs(RIGHT_KILL, validJobList, nonauthJobList, force=force)
583484

584-
:param list jobIDs: list of job IDs
485+
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
585486

586-
:return: S_OK/S_ERROR
587-
"""
487+
if invalidJobList:
488+
result["InvalidJobIDs"] = invalidJobList
588489

589-
return self._kill_delete_jobs(jobIDs, RIGHT_KILL, force=force)
490+
return result
590491

591492
###########################################################################
592493
types_resetJob = []
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from DIRAC import S_ERROR, S_OK, gLogger
2+
from DIRAC.StorageManagementSystem.DB.StorageManagementDB import StorageManagementDB
3+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4+
from DIRAC.WorkloadManagementSystem.Client.JobStatus import filterJobStateTransition
5+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
6+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
7+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
8+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL
9+
10+
11+
def _deleteJob(jobID, force=False):
12+
"""Set the job status to "Deleted"
13+
and remove the pilot that ran and its logging info if the pilot is finished.
14+
15+
:param int jobID: job ID
16+
:return: S_OK()/S_ERROR()
17+
"""
18+
if not (result := JobDB().setJobStatus(jobID, JobStatus.DELETED, "Checking accounting", force=force))["OK"]:
19+
gLogger.warn("Failed to set job Deleted status", result["Message"])
20+
return result
21+
22+
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
23+
gLogger.warn("Failed to delete job from the TaskQueue")
24+
25+
# if it was the last job for the pilot
26+
result = PilotAgentsDB().getPilotsForJobID(jobID)
27+
if not result["OK"]:
28+
gLogger.error("Failed to get Pilots for JobID", result["Message"])
29+
return result
30+
for pilot in result["Value"]:
31+
res = PilotAgentsDB().getJobsForPilot(pilot)
32+
if not res["OK"]:
33+
gLogger.error("Failed to get jobs for pilot", res["Message"])
34+
return res
35+
if not res["Value"]: # if list of jobs for pilot is empty, delete pilot
36+
result = PilotAgentsDB().getPilotInfo(pilotID=pilot)
37+
if not result["OK"]:
38+
gLogger.error("Failed to get pilot info", result["Message"])
39+
return result
40+
ret = PilotAgentsDB().deletePilot(result["Value"]["PilotJobReference"])
41+
if not ret["OK"]:
42+
gLogger.error("Failed to delete pilot from PilotAgentsDB", ret["Message"])
43+
return ret
44+
45+
return S_OK()
46+
47+
48+
def _killJob(jobID, sendKillCommand=True, force=False):
49+
"""Kill one job
50+
51+
:param int jobID: job ID
52+
:param bool sendKillCommand: send kill command
53+
54+
:return: S_OK()/S_ERROR()
55+
"""
56+
if sendKillCommand:
57+
if not (result := JobDB().setJobCommand(jobID, "Kill"))["OK"]:
58+
gLogger.warn("Failed to set job Kill command", result["Message"])
59+
return result
60+
61+
gLogger.info("Job marked for termination", jobID)
62+
if not (result := JobDB().setJobStatus(jobID, JobStatus.KILLED, "Marked for termination", force=force))["OK"]:
63+
gLogger.warn("Failed to set job Killed status", result["Message"])
64+
if not (result := TaskQueueDB().deleteJob(jobID))["OK"]:
65+
gLogger.warn("Failed to delete job from the TaskQueue", result["Message"])
66+
67+
return S_OK()
68+
69+
70+
def kill_delete_jobs(right, validJobList, nonauthJobList=[], force=False):
71+
"""Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary
72+
73+
:param str right: RIGHT_KILL or RIGHT_DELETE
74+
75+
:return: S_OK()/S_ERROR()
76+
"""
77+
badIDs = []
78+
79+
killJobList = []
80+
deleteJobList = []
81+
if validJobList:
82+
# Get the jobs allowed to transition to the Killed state
83+
filterRes = filterJobStateTransition(validJobList, JobStatus.KILLED)
84+
if not filterRes["OK"]:
85+
return filterRes
86+
killJobList.extend(filterRes["Value"])
87+
88+
if not right == RIGHT_KILL:
89+
# Get the jobs allowed to transition to the Deleted state
90+
filterRes = filterJobStateTransition(validJobList, JobStatus.DELETED)
91+
if not filterRes["OK"]:
92+
return filterRes
93+
deleteJobList.extend(filterRes["Value"])
94+
95+
# Look for jobs that are in the Staging state to send kill signal to the stager
96+
result = JobDB().getJobsAttributes(killJobList, ["Status"])
97+
if not result["OK"]:
98+
return result
99+
stagingJobList = [jobID for jobID, sDict in result["Value"].items() if sDict["Status"] == JobStatus.STAGING]
100+
101+
for jobID in killJobList:
102+
result = _killJob(jobID, force=force)
103+
if not result["OK"]:
104+
badIDs.append(jobID)
105+
106+
for jobID in deleteJobList:
107+
result = _deleteJob(jobID, force=force)
108+
if not result["OK"]:
109+
badIDs.append(jobID)
110+
111+
if stagingJobList:
112+
stagerDB = StorageManagementDB()
113+
gLogger.info("Going to send killing signal to stager as well!")
114+
result = stagerDB.killTasksBySourceTaskID(stagingJobList)
115+
if not result["OK"]:
116+
gLogger.warn("Failed to kill some Stager tasks", result["Message"])
117+
118+
if nonauthJobList or badIDs:
119+
result = S_ERROR("Some jobs failed deletion")
120+
if nonauthJobList:
121+
gLogger.warn("Non-authorized JobIDs won't be deleted", str(nonauthJobList))
122+
result["NonauthorizedJobIDs"] = nonauthJobList
123+
if badIDs:
124+
gLogger.warn("JobIDs failed to be deleted", str(badIDs))
125+
result["FailedJobIDs"] = badIDs
126+
return result
127+
128+
jobsList = killJobList if right == RIGHT_KILL else deleteJobList
129+
return S_OK(jobsList)

0 commit comments

Comments
 (0)