Skip to content

Commit 1cf4240

Browse files
committed
refactor: extracted rescheduleJobs in utility, use it in agents
1 parent 9391e01 commit 1cf4240

File tree

7 files changed

+52
-39
lines changed

7 files changed

+52
-39
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@
4141
transferInputSandbox,
4242
)
4343
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
44+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4445
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
4546
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
46-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
47-
4847

4948
MAX_JOBS_MANAGED = 100
5049

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
23-
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
2423
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2524
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2625
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2726
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2827
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
28+
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -529,17 +529,12 @@ def _kickStuckJobs(self):
529529
return result
530530

531531
jobIDs = result["Value"]
532-
jobManagerClient = JobManagerClient()
533532
if jobIDs:
534533
self.log.info(f"Rescheduling {len(jobIDs)} jobs stuck in {JobStatus.MATCHED} status")
535-
result = jobManagerClient.rescheduleJob(jobIDs)
534+
result = rescheduleJobs(jobIDs)
536535
if not result["OK"]:
537536
message = f"Failed to reschedule jobs stuck in {JobStatus.MATCHED} status"
538537
message += "\n" + result["Message"]
539-
if "InvalidJobIDs" in result:
540-
message += "\n" + "\tInvalid job IDs: " + str(result["InvalidJobIDs"])
541-
if "NonauthorizedJobIDs" in result:
542-
message += "\n" + "\tNon authorized job IDs: " + str(result["NonauthorizedJobIDs"])
543538

544539
checkTime = datetime.datetime.utcnow() - self.rescheduledTime * second
545540
result = self.jobDB.selectJobs({"Status": JobStatus.RESCHEDULED}, older=checkTime)
@@ -550,17 +545,13 @@ def _kickStuckJobs(self):
550545
jobIDs = result["Value"]
551546
if jobIDs:
552547
self.log.info(f"Rescheduling {len(jobIDs)} jobs stuck in {JobStatus.RESCHEDULED} status")
553-
result = jobManagerClient.rescheduleJob(jobIDs)
548+
result = rescheduleJobs(jobIDs)
554549
if not result["OK"]:
555550
message = f"Failed to reschedule jobs stuck in {JobStatus.RESCHEDULED} status"
556551
message += "\n" + result["Message"]
557-
if "InvalidJobIDs" in result:
558-
message += "\n" + "\tInvalid job IDs: " + str(result["InvalidJobIDs"])
559-
if "NonauthorizedJobIDs" in result:
560-
message += "\n" + "\tNon authorized job IDs: " + str(result["NonauthorizedJobIDs"])
561552

562553
if message:
563-
return S_ERROR(message)
554+
self.log.error(message)
564555
return S_OK()
565556

566557
def _failSubmittingJobs(self):

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def sja(mocker):
2525
)
2626
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobDB")
2727
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB")
28-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock())
28+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.rescheduleJobs", return_value=MagicMock())
2929
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
3030
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,10 @@ def getJobsForPilot(self, pilotID):
447447
"""Get IDs of Jobs that were executed by a pilot"""
448448
cmd = "SELECT pilotID,JobID FROM JobToPilotMapping "
449449
if isinstance(pilotID, list):
450-
cmd = cmd + " WHERE pilotID IN (%s)" % ",".join(["%s" % x for x in pilotID])
450+
pilotIDs_string = ",".join(str(int(x)) for x in pilotID)
451+
cmd = f"{cmd} WHERE pilotID IN ({pilotIDs_string})"
451452
else:
452-
cmd = cmd + f" WHERE pilotID = {pilotID}"
453+
cmd = f"{cmd} WHERE pilotID = {pilotID}"
453454

454455
result = self._query(cmd)
455456
if not result["OK"]:

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3636
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
37+
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
3738

3839
MAX_PARAMETRIC_JOBS = 20
3940

@@ -345,8 +346,7 @@ def __getJobList(jobInput):
345346
types_rescheduleJob = []
346347

347348
def export_rescheduleJob(self, jobIDs):
348-
"""Reschedule a single job. If the optional proxy parameter is given
349-
it will be used to refresh the proxy in the Proxy Repository
349+
"""Reschedule a list of jobs.
350350
351351
:param list jobIDs: list of job IDs
352352
@@ -360,22 +360,12 @@ def export_rescheduleJob(self, jobIDs):
360360
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
361361
jobList, RIGHT_RESCHEDULE
362362
)
363-
for jobID in validJobList:
364-
self.taskQueueDB.deleteJob(jobID)
365-
result = self.jobDB.rescheduleJob(jobID)
366-
self.log.debug(str(result))
367-
if not result["OK"]:
368-
return result
369-
self.jobLoggingDB.addLoggingRecord(
370-
result["JobID"],
371-
status=result["Status"],
372-
minorStatus=result["MinorStatus"],
373-
applicationStatus="Unknown",
374-
source="JobManager",
375-
)
363+
res = rescheduleJobs(validJobList, source="JobManager")
364+
if not res["OK"]:
365+
self.log.error(res["Message"])
376366

377367
if invalidJobList or nonauthJobList:
378-
result = S_ERROR("Some jobs failed reschedule")
368+
result = S_ERROR("Some jobs can not be rescheduled")
379369
if invalidJobList:
380370
result["InvalidJobIDs"] = invalidJobList
381371
if nonauthJobList:

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
146146
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
147147
"""Utility to get a job parameter for a list of jobIDs pertaining to a VO.
148148
If the jobID is not in the JobParametersDB, it will be looked up in the JobDB.
149-
149+
150150
Requires direct access to the JobParametersDB and JobDB.
151+
151152
:param jobIDs: list of jobIDs
152153
:param parName: name of the parameter to be retrieved
153154
:param vo: VO of the jobIDs

src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
from DIRAC.Core.Utilities import TimeUtilities
1010
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1111
from DIRAC.WorkloadManagementSystem.Client import JobStatus
12-
13-
if TYPE_CHECKING:
14-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
15-
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
12+
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
13+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
14+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
1615

1716

1817
class JobStatusUtility:
@@ -243,3 +242,35 @@ def getNewStatus(
243242
minor = sDict.get("MinorStatus", minor)
244243
application = sDict.get("ApplicationStatus", application)
245244
return S_OK((status, minor, application))
245+
246+
247+
def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
248+
"""Utility to reschedule jobs (not atomic, nor bulk)
249+
Requires direct access to the JobDB and TaskQueueDB
250+
251+
:param jobIDs: list of jobIDs
252+
:param source: source of the reschedule
253+
:return: S_OK/S_ERROR
254+
:rtype: dict
255+
256+
"""
257+
258+
failedJobs = []
259+
260+
for jobID in jobIDs:
261+
result = JobDB().rescheduleJob(jobID)
262+
if not result["OK"]:
263+
failedJobs.append(jobID)
264+
continue
265+
TaskQueueDB().deleteJob(jobID)
266+
JobLoggingDB().addLoggingRecord(
267+
result["JobID"],
268+
status=result["Status"],
269+
minorStatus=result["MinorStatus"],
270+
applicationStatus="Unknown",
271+
source=source,
272+
)
273+
274+
if failedJobs:
275+
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
276+
return S_OK()

0 commit comments

Comments
 (0)