Skip to content

Commit f891345

Browse files
committed
fix: Move rescheduleJobs into Utils
1 parent 015f82f commit f891345

File tree

4 files changed

+42
-38
lines changed

4 files changed

+42
-38
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2626
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2727
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
28-
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
28+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
2929

3030

3131
class StalledJobAgent(AgentModule):

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
)
3535
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3636
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
37-
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
37+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
3838

3939
MAX_PARAMETRIC_JOBS = 20
4040

src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
from DIRAC.Core.Utilities import TimeUtilities
1010
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1111
from DIRAC.WorkloadManagementSystem.Client import JobStatus
12-
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
13-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
14-
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
12+
13+
if TYPE_CHECKING:
14+
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
15+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
1516

1617

1718
class JobStatusUtility:
@@ -242,35 +243,3 @@ def getNewStatus(
242243
minor = sDict.get("MinorStatus", minor)
243244
application = sDict.get("ApplicationStatus", application)
244245
return S_OK((status, minor, application))
245-
246-
247-
def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
248-
"""Utility to reschedule jobs (not atomic, nor bulk)
249-
Requires direct access to the JobDB and TaskQueueDB
250-
251-
:param jobIDs: list of jobIDs
252-
:param source: source of the reschedule
253-
:return: S_OK/S_ERROR
254-
:rtype: dict
255-
256-
"""
257-
258-
failedJobs = []
259-
260-
for jobID in jobIDs:
261-
result = JobDB().rescheduleJob(jobID)
262-
if not result["OK"]:
263-
failedJobs.append(jobID)
264-
continue
265-
TaskQueueDB().deleteJob(jobID)
266-
JobLoggingDB().addLoggingRecord(
267-
result["JobID"],
268-
status=result["Status"],
269-
minorStatus=result["MinorStatus"],
270-
applicationStatus="Unknown",
271-
source=source,
272-
)
273-
274-
if failedJobs:
275-
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
276-
return S_OK()

src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
import sys
55
import json
66

7-
from DIRAC import gLogger, S_OK
7+
from DIRAC import gLogger, S_OK, S_ERROR
88
from DIRAC.Core.Utilities.File import mkDir
99
from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging
10+
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
11+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
12+
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
1013

1114

1215
def createJobWrapper(
@@ -113,3 +116,35 @@ def createJobWrapper(
113116
if rootLocation != wrapperPath:
114117
generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile))
115118
return S_OK(generatedFiles)
119+
120+
121+
def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
122+
"""Utility to reschedule jobs (not atomic, nor bulk)
123+
Requires direct access to the JobDB and TaskQueueDB
124+
125+
:param jobIDs: list of jobIDs
126+
:param source: source of the reschedule
127+
:return: S_OK/S_ERROR
128+
:rtype: dict
129+
130+
"""
131+
132+
failedJobs = []
133+
134+
for jobID in jobIDs:
135+
result = JobDB().rescheduleJob(jobID)
136+
if not result["OK"]:
137+
failedJobs.append(jobID)
138+
continue
139+
TaskQueueDB().deleteJob(jobID)
140+
JobLoggingDB().addLoggingRecord(
141+
result["JobID"],
142+
status=result["Status"],
143+
minorStatus=result["MinorStatus"],
144+
applicationStatus="Unknown",
145+
source=source,
146+
)
147+
148+
if failedJobs:
149+
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
150+
return S_OK()

0 commit comments

Comments
 (0)