diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index c8fc48feb7c..9bf3177b01f 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -34,11 +34,11 @@ from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient -from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import RESCHEDULED class JobAgent(AgentModule): @@ -729,9 +729,9 @@ def _checkSubmittedJobs(self): self._rescheduleFailedJob(jobID, result["Message"]) self.hostFailureCount += 1 - # The payload failed (if result["Value"] is not 0) - elif result["Value"]: - # In order to avoid overriding perfectly valid states, the status is updated iff the job was running + # The payload failed (if result["Value"] is not 0 and the job was not rescheduled) + elif result["Value"] and result["Value"] != RESCHEDULED: + # In order to avoid overriding perfectly valid states, the status is updated if the job was running res = JobMonitoringClient().getJobsStatus(jobID) if not res["OK"]: return res diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 4ea8287951a..a4277f81289 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -60,6 +60,14 @@ EXECUTION_RESULT = {} +SUBMISSION_FAILED = -1 +SUBMISSION_REPORT_FAILED = -2 +JOBWRAPPER_EXCEPTION = -3 +INITIALIZATION_FAILED = 1 +PAYLOAD_FAILED = 2 +FINALIZATION_FAILED = 3 +RESCHEDULED = 4 + class JobWrapper: """The only user of the JobWrapper is the JobWrapperTemplate""" diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 1284cbf5815..fb6213d1dfc 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -31,6 +31,7 @@ from DIRAC.Core.Utilities import DErrno from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob +from DIRAC.WorkloadManagementSystem.JobWrapper import JobWrapper as JW from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus @@ -101,7 +102,7 @@ def execute(arguments): else: gLogger.exception("JobWrapperTemplate could not create working directory") rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory") - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED gJobReport = JobReport(jobID, "JobWrapper") @@ -114,7 +115,7 @@ def execute(arguments): jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED if "InputSandbox" in arguments["Job"]: gJobReport.commit() @@ -129,14 +130,14 @@ def execute(arguments): jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc) rescheduleResult = rescheduleFailedJob( jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED else: gLogger.verbose("Job has no InputSandbox requirement") @@ -155,14 +156,14 @@ def execute(arguments): jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc) rescheduleResult = rescheduleFailedJob( jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.INITIALIZATION_FAILED else: gLogger.verbose("Job has a null InputData requirement:") gLogger.verbose(arguments) @@ -185,7 +186,7 @@ def execute(arguments): jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return 1 + return JW.RESCHEDULED if rescheduleResult == JobStatus.RESCHEDULED else JW.PAYLOAD_FAILED gLogger.exception("Job failed in execution phase") gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) gJobReport.setJobStatus( @@ -193,7 +194,7 @@ def execute(arguments): ) job.sendFailoverRequest() job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 + return JW.PAYLOAD_FAILED except Exception as exc: # pylint: disable=broad-except gLogger.exception("Job raised exception during execution phase", lException=exc) gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) @@ -202,7 +203,7 @@ def execute(arguments): ) job.sendFailoverRequest() job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 + return JW.PAYLOAD_FAILED if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: try: @@ -219,7 +220,7 @@ def execute(arguments): job.sendFailoverRequest() job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - return 2 + return JW.FINALIZATION_FAILED except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapper raised exception while processing output files", lException=exc) gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) @@ -228,7 +229,7 @@ def execute(arguments): ) job.sendFailoverRequest() job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - return 2 + return JW.FINALIZATION_FAILED else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") @@ -243,7 +244,7 @@ def execute(arguments): ########################################################## -ret = -3 +ret = JW.JOBWRAPPER_EXCEPTION try: jsonFileName = os.path.realpath(__file__) + ".json" with open(jsonFileName) as f: @@ -259,9 +260,9 @@ def execute(arguments): gLogger.exception("JobWrapperTemplate exception", lException=exc) try: gJobReport.commit() - ret = -1 + ret = JW.SUBMISSION_FAILED except Exception as exc: # pylint: disable=broad-except gLogger.exception("Could not commit the job report", lException=exc) - ret = -2 + ret = JW.SUBMISSION_REPORT_FAILED sys.exit(ret)