diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 734e8f244da..0f1d82d4ee1 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -354,7 +354,13 @@ def export_rescheduleJob(self, jobIDs): validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights( jobList, RIGHT_RESCHEDULE ) - res = rescheduleJobs(validJobList, source="JobManager") + res = rescheduleJobs( + validJobList, + source="JobManager", + jobDB=self.jobDB, + taskQueueDB=self.taskQueueDB, + jobLoggingDB=self.jobLoggingDB, + ) if not res["OK"]: self.log.error(res["Message"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 83f35fb5abf..07a1121eb88 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -118,12 +118,21 @@ def createJobWrapper( return S_OK(generatedFiles) -def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict: +def rescheduleJobs( + jobIDs: list[int], + source: str = "", + jobDB: JobDB | None = None, + taskQueueDB: TaskQueueDB | None = None, + jobLoggingDB: JobLoggingDB | None = None, +) -> dict: """Utility to reschedule jobs (not atomic, nor bulk) Requires direct access to the JobDB and TaskQueueDB :param jobIDs: list of jobIDs :param source: source of the reschedule + :param jobDB: optional JobDB instance to reuse (creates new if not provided) + :param taskQueueDB: optional TaskQueueDB instance to reuse (creates new if not provided) + :param jobLoggingDB: optional JobLoggingDB instance to reuse (creates new if not provided) :return: S_OK/S_ERROR :rtype: dict @@ -131,13 +140,21 @@ def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict: failedJobs = [] + # Reuse provided DB instances or create new ones + if jobDB is None: + jobDB = JobDB() + if taskQueueDB is None: + taskQueueDB = TaskQueueDB() + if jobLoggingDB is None: + jobLoggingDB = JobLoggingDB() + for jobID in jobIDs: - result = JobDB().rescheduleJob(jobID) + result = jobDB.rescheduleJob(jobID) if not result["OK"]: failedJobs.append(jobID) continue - TaskQueueDB().deleteJob(jobID) - JobLoggingDB().addLoggingRecord( + taskQueueDB.deleteJob(jobID) + jobLoggingDB.addLoggingRecord( result["JobID"], status=result["Status"], minorStatus=result["MinorStatus"],