Skip to content

Commit 627c344

Browse files
fstagniweb-flow
authored andcommitted
sweep: #5744 WMS job status fixes
1 parent fa9c15a commit 627c344

File tree

4 files changed

+19
-16
lines changed

4 files changed

+19
-16
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
3737
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
3838
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
39+
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
3940
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
4041
from DIRAC.WorkloadManagementSystem.Client import JobStatus
4142
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
@@ -315,7 +316,7 @@ def execute(self):
315316
self.log.debug("After %sCE submitJob()" % (self.ceName))
316317
except Exception as subExcept: # pylint: disable=broad-except
317318
self.log.exception("Exception in submission", "", lException=subExcept, lExcInfo=True)
318-
result = self._rescheduleFailedJob(jobID, "Job processing failed with exception")
319+
result = self._rescheduleFailedJob(jobID, "Job processing failed with exception", direct=True)
319320
return self._finish(result["Message"], self.stopOnApplicationFailure)
320321

321322
return S_OK("Job Agent cycle complete")
@@ -751,20 +752,23 @@ def _finish(self, message, stop=True):
751752
return S_OK(message)
752753

753754
#############################################################################
754-
def _rescheduleFailedJob(self, jobID, message):
755+
def _rescheduleFailedJob(self, jobID, message, direct=False):
755756
"""
756757
Set Job Status to "Rescheduled" and issue a reschedule command to the Job Manager
757758
"""
758759

759760
self.log.warn("Failure ==> rescheduling", "(during %s)" % (message))
760761

761-
jobReport = JobReport(int(jobID), "JobAgent@%s" % self.siteName)
762-
763-
# Setting a job parameter does not help since the job will be rescheduled,
764-
# instead set the status with the cause and then another status showing the
765-
# reschedule operation.
766-
767-
jobReport.setJobStatus(status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True)
762+
if direct:
763+
JobStateUpdateClient().setJobStatus(
764+
int(jobID), status=JobStatus.RESCHEDULED, applicationStatus=message, source="JobAgent@%s", force=True
765+
)
766+
else:
767+
jobReport = JobReport(int(jobID), "JobAgent@%s" % self.siteName)
768+
# Setting a job parameter does not help since the job will be rescheduled,
769+
# instead set the status with the cause and then another status showing the
770+
# reschedule operation.
771+
jobReport.setJobStatus(status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True)
768772

769773
self.log.info("Job will be rescheduled")
770774
result = JobManagerClient().rescheduleJob(jobID)

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def _failStalledJobs(self, jobID):
239239
return result
240240
site = result["Value"]
241241
if site in self.stalledJobsToRescheduleSites:
242-
return self._updateJobStatus(jobID, JobStatus.RESCHEDULED, minorStatus=setFailed)
242+
return self._updateJobStatus(jobID, JobStatus.RESCHEDULED, minorStatus=setFailed, force=True)
243243

244244
return self._updateJobStatus(jobID, JobStatus.FAILED, minorStatus=setFailed)
245245

@@ -314,7 +314,7 @@ def _getLatestUpdateTime(self, job):
314314
return S_OK(latestUpdate)
315315

316316
#############################################################################
317-
def _updateJobStatus(self, job, status, minorStatus=None):
317+
def _updateJobStatus(self, job, status, minorStatus=None, force=False):
318318
"""This method updates the job status in the JobDB"""
319319

320320
if not self.am_getOption("Enable", True):
@@ -323,7 +323,7 @@ def _updateJobStatus(self, job, status, minorStatus=None):
323323
toRet = S_OK()
324324

325325
self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status))
326-
result = self.jobDB.setJobAttribute(job, "Status", status, update=True)
326+
result = self.jobDB.setJobAttribute(job, "Status", status, update=True, force=force)
327327
if not result["OK"]:
328328
self.log.error("Failed setting Status", "%s for job %d: %s" % (status, job, result["Message"]))
329329
toRet = result

src/DIRAC/WorkloadManagementSystem/Client/JobStatus.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ def __init__(self, state):
8080
COMPLETED: State(10, [DONE, FAILED], defState=COMPLETED),
8181
COMPLETING: State(9, [DONE, FAILED, COMPLETED, STALLED, KILLED], defState=COMPLETING),
8282
STALLED: State(8, [RUNNING, FAILED, KILLED], defState=STALLED),
83-
RUNNING: State(7, [STALLED, DONE, FAILED, COMPLETING, KILLED, RECEIVED], defState=RUNNING),
83+
RUNNING: State(7, [STALLED, DONE, FAILED, RESCHEDULED, COMPLETING, KILLED, RECEIVED], defState=RUNNING),
8484
RESCHEDULED: State(6, [WAITING, RECEIVED, DELETED], defState=RESCHEDULED),
85-
MATCHED: State(5, [RUNNING, FAILED, KILLED], defState=MATCHED),
85+
MATCHED: State(5, [RUNNING, FAILED, RESCHEDULED, KILLED], defState=MATCHED),
8686
WAITING: State(4, [MATCHED, RESCHEDULED, DELETED], defState=WAITING),
8787
STAGING: State(3, [WAITING, FAILED, KILLED], defState=STAGING),
8888
CHECKING: State(2, [STAGING, WAITING, RESCHEDULED, FAILED, DELETED], defState=CHECKING),

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,8 +1518,7 @@ def rescheduleFailedJob(jobID, minorStatus, jobReport=None):
15181518

15191519
gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper")
15201520

1521-
jobManager = JobManagerClient()
1522-
result = jobManager.rescheduleJob(int(jobID))
1521+
result = JobManagerClient().rescheduleJob(int(jobID))
15231522
if not result["OK"]:
15241523
gLogger.warn(result["Message"])
15251524
if "Maximum number of reschedulings is reached" in result["Message"]:

0 commit comments

Comments
 (0)