Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import RESCHEDULED


class JobAgent(AgentModule):
Expand Down Expand Up @@ -729,9 +729,9 @@ def _checkSubmittedJobs(self):
self._rescheduleFailedJob(jobID, result["Message"])
self.hostFailureCount += 1

# The payload failed (if result["Value"] is not 0)
elif result["Value"]:
# In order to avoid overriding perfectly valid states, the status is updated iff the job was running
# The payload failed (if result["Value"] is not 0 and the job was not rescheduled)
elif result["Value"] and result["Value"] != RESCHEDULED:
# In order to avoid overriding perfectly valid states, the status is updated if the job was running
res = JobMonitoringClient().getJobsStatus(jobID)
if not res["OK"]:
return res
Expand Down
8 changes: 8 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@

EXECUTION_RESULT = {}

SUBMISSION_FAILED = -1
SUBMISSION_REPORT_FAILED = -2
JOBWRAPPER_EXCEPTION = -3
INITIALIZATION_FAILED = 1
PAYLOAD_FAILED = 2
FINALIZATION_FAILED = 3
RESCHEDULED = 4


class JobWrapper:
"""The only user of the JobWrapper is the JobWrapperTemplate"""
Expand Down
29 changes: 15 additions & 14 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from DIRAC.Core.Utilities import DErrno

from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob
from DIRAC.WorkloadManagementSystem.JobWrapper import JobWrapper as JW
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
Expand Down Expand Up @@ -101,7 +102,7 @@ def execute(arguments):
else:
gLogger.exception("JobWrapperTemplate could not create working directory")
rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory")
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED

gJobReport = JobReport(jobID, "JobWrapper")

Expand All @@ -114,7 +115,7 @@ def execute(arguments):
jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED

if "InputSandbox" in arguments["Job"]:
gJobReport.commit()
Expand All @@ -129,14 +130,14 @@ def execute(arguments):
jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc)
rescheduleResult = rescheduleFailedJob(
jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
else:
gLogger.verbose("Job has no InputSandbox requirement")

Expand All @@ -155,14 +156,14 @@ def execute(arguments):
jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc)
rescheduleResult = rescheduleFailedJob(
jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED
else:
gLogger.verbose("Job has a null InputData requirement:")
gLogger.verbose(arguments)
Expand All @@ -185,15 +186,15 @@ def execute(arguments):
jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return 1
return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.PAYLOAD_FAILED
gLogger.exception("Job failed in execution phase")
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
gJobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
return 1
return JW.PAYLOAD_FAILED
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("Job raised exception during execution phase", lException=exc)
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
Expand All @@ -202,7 +203,7 @@ def execute(arguments):
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
return 1
return JW.PAYLOAD_FAILED

if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]:
try:
Expand All @@ -219,7 +220,7 @@ def execute(arguments):
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS)

return 2
return JW.FINALIZATION_FAILED
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("JobWrapper raised exception while processing output files", lException=exc)
gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
Expand All @@ -228,7 +229,7 @@ def execute(arguments):
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS)
return 2
return JW.FINALIZATION_FAILED
else:
gLogger.verbose("Job has no OutputData or OutputSandbox requirement")

Expand All @@ -243,7 +244,7 @@ def execute(arguments):
##########################################################


ret = -3
ret = JW.JOBWRAPPER_EXCEPTION
try:
jsonFileName = os.path.realpath(__file__) + ".json"
with open(jsonFileName) as f:
Expand All @@ -259,9 +260,9 @@ def execute(arguments):
gLogger.exception("JobWrapperTemplate exception", lException=exc)
try:
gJobReport.commit()
ret = -1
ret = JW.SUBMISSION_FAILED
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("Could not commit the job report", lException=exc)
ret = -2
ret = JW.SUBMISSION_REPORT_FAILED

sys.exit(ret)
Loading