Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,13 @@ def export_rescheduleJob(self, jobIDs):
validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(
jobList, RIGHT_RESCHEDULE
)
res = rescheduleJobs(validJobList, source="JobManager")
res = rescheduleJobs(
validJobList,
source="JobManager",
jobDB=self.jobDB,
taskQueueDB=self.taskQueueDB,
jobLoggingDB=self.jobLoggingDB,
)
if not res["OK"]:
self.log.error(res["Message"])

Expand Down
25 changes: 21 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,43 @@ def createJobWrapper(
return S_OK(generatedFiles)


def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
def rescheduleJobs(
jobIDs: list[int],
source: str = "",
jobDB: JobDB | None = None,
taskQueueDB: TaskQueueDB | None = None,
jobLoggingDB: JobLoggingDB | None = None,
) -> dict:
"""Utility to reschedule jobs (not atomic, nor bulk)
Requires direct access to the JobDB and TaskQueueDB

:param jobIDs: list of jobIDs
:param source: source of the reschedule
:param jobDB: optional JobDB instance to reuse (creates new if not provided)
:param taskQueueDB: optional TaskQueueDB instance to reuse (creates new if not provided)
:param jobLoggingDB: optional JobLoggingDB instance to reuse (creates new if not provided)
:return: S_OK/S_ERROR
:rtype: dict

"""

failedJobs = []

# Reuse provided DB instances or create new ones
if jobDB is None:
jobDB = JobDB()
if taskQueueDB is None:
taskQueueDB = TaskQueueDB()
if jobLoggingDB is None:
jobLoggingDB = JobLoggingDB()

for jobID in jobIDs:
result = JobDB().rescheduleJob(jobID)
result = jobDB.rescheduleJob(jobID)
if not result["OK"]:
failedJobs.append(jobID)
continue
TaskQueueDB().deleteJob(jobID)
JobLoggingDB().addLoggingRecord(
taskQueueDB.deleteJob(jobID)
jobLoggingDB.addLoggingRecord(
result["JobID"],
status=result["Status"],
minorStatus=result["MinorStatus"],
Expand Down
Loading