Skip to content

Commit 99f68cc

Browse files
authored
Merge pull request #7415 from aldbr/v8.0_FIX_PJA-failover
[8.0] PushJobAgent do not send failover request anymore
2 parents 2d5ab55 + 15ff95a commit 99f68cc

File tree

3 files changed

+234
-131
lines changed

3 files changed

+234
-131
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 88 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,12 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
8888
self.timeLeftUtil = None
8989
self.pilotInfoReportedFlag = False
9090

91-
# Submission results
92-
self.submissionDict = {}
91+
# Attributes related to the processed jobs, it should take the following form:
92+
# {"<jobID>": {"jobReport": JobReport(), "taskID": "<taskID>"}}
93+
# where taskID is the ID of the job as seen by the CE
94+
# and jobReport is the JobReport instance for the job
95+
# (one instance per job to avoid any discrepancy when communicating with the WMS)
96+
self.jobs = {}
9397

9498
#############################################################################
9599
def initialize(self):
@@ -135,7 +139,6 @@ def initialize(self):
135139

136140
# Utilities
137141
self.timeLeftUtil = TimeLeft()
138-
self.jobReport = JobReport(0, f"{self.__class__.__name__}@{self.siteName}")
139142
return S_OK()
140143

141144
def _initializeComputingElement(self, localCE):
@@ -211,8 +214,11 @@ def execute(self):
211214
matcherParams = ["JDL", "DN", "Group"]
212215
matcherInfo = jobRequest["Value"]
213216
jobID = matcherInfo["JobID"]
214-
self.jobReport.setJob(jobID)
215-
result = self._checkMatcherInfo(matcherInfo, matcherParams)
217+
218+
self.jobs[jobID] = {}
219+
self.jobs[jobID]["JobReport"] = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}")
220+
221+
result = self._checkMatcherInfo(jobID, matcherInfo, matcherParams)
216222
if not result["OK"]:
217223
return self._finish(result["Message"])
218224

@@ -235,30 +241,35 @@ def execute(self):
235241
# Get JDL paramters
236242
parameters = self._getJDLParameters(jobJDL)
237243
if not parameters["OK"]:
238-
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters")
244+
self.jobs[jobID]["JobReport"].setJobStatus(
245+
status=JobStatus.FAILED, minorStatus="Could Not Extract JDL Parameters"
246+
)
239247
self.log.warn("Could Not Extract JDL Parameters", parameters["Message"])
240-
return self._finish("JDL Problem")
248+
return self._finish("JDL Problem", self.stopOnApplicationFailure)
241249

242250
params = parameters["Value"]
243251
result = self._extractValuesFromJobParams(params)
244252
if not result["OK"]:
245-
return self._finish(result["Value"])
253+
self.jobs[jobID]["JobReport"].setJobStatus(status=JobStatus.FAILED, minorStatus=result["Message"])
254+
return self._finish(result["Value"], self.stopOnApplicationFailure)
246255
submissionParams = result["Value"]
247256
jobID = submissionParams["jobID"]
248257
jobType = submissionParams["jobType"]
249258

250259
self.log.verbose("Job request successful: \n", jobRequest["Value"])
251260
self.log.info("Received", f"JobID={jobID}, JobType={jobType}, OwnerDN={ownerDN}, JobGroup={jobGroup}")
252261
self.jobCount += 1
253-
self.jobReport.setJobParameter(par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False)
262+
self.jobs[jobID]["JobReport"].setJobParameter(
263+
par_name="MatcherServiceTime", par_value=str(matchTime), sendFlag=False
264+
)
254265
if "BOINC_JOB_ID" in os.environ:
255266
# Report BOINC environment
256267
for thisp in ("BoincUserID", "BoincHostID", "BoincHostPlatform", "BoincHostName"):
257-
self.jobReport.setJobParameter(
268+
self.jobs[jobID]["JobReport"].setJobParameter(
258269
par_name=thisp, par_value=gConfig.getValue(f"/LocalSite/{thisp}", "Unknown"), sendFlag=False
259270
)
260271

261-
self.jobReport.setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
272+
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Job Received by Agent", sendFlag=False)
262273
result_setupProxy = self._setupProxy(ownerDN, jobGroup)
263274
if not result_setupProxy["OK"]:
264275
result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"])
@@ -269,7 +280,8 @@ def execute(self):
269280
self._saveJobJDLRequest(jobID, jobJDL)
270281

271282
# Check software and install them if required
272-
software = self._checkInstallSoftware(jobID, params, ceDict)
283+
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False)
284+
software = self._checkInstallSoftware(params, ceDict)
273285
if not software["OK"]:
274286
self.log.error("Failed to install software for job", f"{jobID}")
275287
errorMsg = software["Message"]
@@ -280,14 +292,14 @@ def execute(self):
280292

281293
gridCE = gConfig.getValue("/LocalSite/GridCE", "")
282294
if gridCE:
283-
self.jobReport.setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False)
295+
self.jobs[jobID]["JobReport"].setJobParameter(par_name="GridCE", par_value=gridCE, sendFlag=False)
284296

285297
queue = gConfig.getValue("/LocalSite/CEQueue", "")
286298
if queue:
287-
self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False)
299+
self.jobs[jobID]["JobReport"].setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False)
288300

289301
if batchSystem := gConfig.getValue("/LocalSite/BatchSystem/Type", ""):
290-
self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False)
302+
self.jobs[jobID]["JobReport"].setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False)
291303

292304
self.log.debug(f"Before self._submitJob() ({self.ceName}CE)")
293305
result = self._submitJob(
@@ -307,32 +319,18 @@ def execute(self):
307319
return self._finish(result["Message"])
308320
self.log.debug(f"After {self.ceName}CE submitJob()")
309321

310-
# Committing the JobReport before evaluating the result of job submission
311-
res = self.jobReport.commit()
312-
if not res["OK"]:
313-
resFD = self.jobReport.generateForwardDISET()
314-
if not resFD["OK"]:
315-
self.log.error("Error generating ForwardDISET operation", resFD["Message"])
316-
elif resFD["Value"]:
317-
# Here we create the Request.
318-
op = resFD["Value"]
319-
request = Request()
320-
requestName = f"jobAgent_{jobID}"
321-
request.RequestName = requestName.replace('"', "")
322-
request.JobID = jobID
323-
request.SourceComponent = f"JobAgent_{jobID}"
324-
request.addOperation(op)
325-
# This might fail, but only a message would be printed.
326-
self._sendFailoverRequest(request)
327-
328322
# Checking errors that could have occurred during the job submission and/or execution
329323
result = self._checkSubmittedJobs()
330324
if not result["OK"]:
331325
return result
326+
332327
submissionErrors = result["Value"][0]
333328
payloadErrors = result["Value"][1]
334329
if submissionErrors:
335-
return self._finish("Error during the submission process")
330+
# Stop the JobAgent if too many CE errors occurred
331+
return self._finish(
332+
"Error during the submission process", self.hostFailureCount > self.stopAfterHostFailures
333+
)
336334
if payloadErrors:
337335
return self._finish("Error during a payload execution", self.stopOnApplicationFailure)
338336

@@ -525,7 +523,7 @@ def _requestProxyFromProxyManager(self, ownerDN, ownerGroup):
525523
return S_OK(chain)
526524

527525
#############################################################################
528-
def _checkInstallSoftware(self, jobID, jobParams, resourceParams):
526+
def _checkInstallSoftware(self, jobParams, resourceParams):
529527
"""Checks software requirement of job and whether this is already present
530528
before installing software locally.
531529
"""
@@ -534,7 +532,6 @@ def _checkInstallSoftware(self, jobID, jobParams, resourceParams):
534532
self.log.verbose(msg)
535533
return S_OK(msg)
536534

537-
self.jobReport.setJobStatus(minorStatus="Installing Software", sendFlag=False)
538535
softwareDist = jobParams["SoftwareDistModule"]
539536
self.log.verbose("Found VO Software Distribution module", f": {softwareDist}")
540537
argumentsDict = {"Job": jobParams, "CE": resourceParams}
@@ -586,15 +583,19 @@ def _checkMatchingIssues(self, jobRequest):
586583
return self._finish("Nothing to do for more than %d cycles" % self.stopAfterFailedMatches)
587584
return S_OK()
588585

589-
def _checkMatcherInfo(self, matcherInfo, matcherParams):
586+
def _checkMatcherInfo(self, jobID, matcherInfo, matcherParams):
590587
"""Check that all relevant information about the job are available"""
591588
for param in matcherParams:
592589
if param not in matcherInfo:
593-
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}")
590+
self.jobs[jobID]["JobReport"].setJobStatus(
591+
status=JobStatus.FAILED, minorStatus=f"Matcher did not return {param}"
592+
)
594593
return S_ERROR("Matcher Failed")
595594

596595
if not matcherInfo[param]:
597-
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}")
596+
self.jobs[jobID]["JobReport"].setJobStatus(
597+
status=JobStatus.FAILED, minorStatus=f"Matcher returned null {param}"
598+
)
598599
return S_ERROR("Matcher Failed")
599600

600601
self.log.verbose("Matcher returned", f"{param} = {matcherInfo[param]} ")
@@ -636,7 +637,7 @@ def _submitJob(
636637

637638
wrapperFile = result["Value"][0]
638639
inputs = list(result["Value"][1:])
639-
self.jobReport.setJobStatus(minorStatus="Submitting To CE")
640+
self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Submitting To CE")
640641

641642
self.log.info("Submitting JobWrapper", f"{os.path.basename(wrapperFile)} to {self.ceName}CE")
642643

@@ -666,7 +667,7 @@ def _submitJob(
666667
taskID = 0
667668
# We create a S_ERROR from the exception to compute it as a normal error
668669
self.computingElement.taskResults[taskID] = S_ERROR(unexpectedSubmitException)
669-
self.submissionDict[jobID] = taskID
670+
self.jobs[jobID]["TaskID"] = taskID
670671
return S_OK()
671672

672673
# Submission results are processed in _checkSubmittedJobs
@@ -684,7 +685,7 @@ def _submitJob(
684685

685686
self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})")
686687

687-
self.submissionDict[jobID] = taskID
688+
self.jobs[jobID]["TaskID"] = taskID
688689
time.sleep(self.jobSubmissionDelay)
689690
return S_OK()
690691

@@ -693,31 +694,26 @@ def _checkSubmittedJobs(self):
693694
# We expect the computingElement to have a taskResult dictionary.
694695
submissionErrors = []
695696
payloadErrors = []
696-
originalJobID = self.jobReport.jobID
697697
# Loop over the jobIDs submitted to the CE
698698
# Here we iterate over a copy of the keys because we are modifying the dictionary within the loop
699-
for jobID in list(self.submissionDict.keys()):
700-
taskID = self.submissionDict[jobID]
701-
if taskID not in self.computingElement.taskResults:
699+
for jobID in list(self.jobs.keys()):
700+
taskID = self.jobs[jobID].get("TaskID")
701+
if taskID is None or taskID not in self.computingElement.taskResults:
702702
continue
703703

704704
result = self.computingElement.taskResults[taskID]
705-
# jobReport will handle different jobIDs
706-
# setJobParameter() and setJobStatus() should send status immediately (sendFlag=True by default)
707-
self.jobReport.setJob(jobID)
708705

709706
# The submission process failed
710707
if not result["OK"]:
711708
self.log.error("Job submission failed", jobID)
712-
self.jobReport.setJobParameter(par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error")
709+
self.jobs[jobID]["JobReport"].setJobParameter(
710+
par_name="ErrorMessage", par_value=f"{self.ceName} CE Submission Error", sendFlag=False
711+
)
713712

714713
self.log.error("Error in DIRAC JobWrapper or inner CE execution:", result["Message"])
715714
submissionErrors.append(result["Message"])
716715
self._rescheduleFailedJob(jobID, result["Message"])
717-
# Stop the JobAgent if too many CE errors
718716
self.hostFailureCount += 1
719-
if self.hostFailureCount > self.stopAfterHostFailures:
720-
return self._finish(result["Message"], self.stopAfterHostFailures)
721717

722718
# The payload failed (if result["Value"] is not 0)
723719
elif result["Value"]:
@@ -726,19 +722,38 @@ def _checkSubmittedJobs(self):
726722
if not res["OK"]:
727723
return res
728724
if res["Value"][int(jobID)]["Status"] == JobStatus.RUNNING:
729-
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus="Payload failed")
725+
self.jobs[jobID]["JobReport"].setJobStatus(
726+
status=JobStatus.FAILED, minorStatus="Payload failed", sendFlag=False
727+
)
730728

731729
# Do not keep running and do not overwrite the Payload error
732730
message = f"Payload execution failed with error code {result['Value']}"
733731
payloadErrors.append(message)
734732
self.log.info(message)
735733

734+
# The job has been treated, we can commit the JobReport
735+
res = self.jobs[jobID]["JobReport"].commit()
736+
if not res["OK"]:
737+
resFD = self.jobs[jobID]["JobReport"].generateForwardDISET()
738+
if not resFD["OK"]:
739+
self.log.error("Error generating ForwardDISET operation", resFD["Message"])
740+
elif resFD["Value"]:
741+
# Here we create the Request.
742+
op = resFD["Value"]
743+
request = Request()
744+
requestName = f"jobAgent_{jobID}"
745+
request.RequestName = requestName.replace('"', "")
746+
request.JobID = jobID
747+
request.SourceComponent = f"JobAgent_{jobID}"
748+
request.addOperation(op)
749+
# This might fail, but only a message would be printed.
750+
self._sendFailoverRequest(request)
751+
736752
# Remove taskID from computingElement.taskResults as it has been treated
737-
# Remove jobID from submissionDict as it has been treated
753+
# Remove jobID from jobs as it has been treated
738754
del self.computingElement.taskResults[taskID]
739-
del self.submissionDict[jobID]
755+
del self.jobs[jobID]
740756

741-
self.jobReport.setJob(originalJobID)
742757
return S_OK((submissionErrors, payloadErrors))
743758

744759
#############################################################################
@@ -777,9 +792,8 @@ def _extractValuesFromJobParams(self, params):
777792
submissionDict["jobID"] = params.get("JobID")
778793
if not submissionDict["jobID"]:
779794
msg = "Job has not JobID defined in JDL parameters"
780-
self.jobReport.setJobStatus(status=JobStatus.FAILED, minorStatus=msg)
781795
self.log.warn(msg)
782-
return S_ERROR("JDL Problem")
796+
return S_ERROR(msg)
783797

784798
submissionDict["jobType"] = params.get("JobType", "Unknown")
785799
if submissionDict["jobType"] == "Unknown":
@@ -816,25 +830,19 @@ def _finish(self, message, stop=True):
816830
return S_OK(message)
817831

818832
#############################################################################
819-
def _rescheduleFailedJob(self, jobID, message, direct=False):
833+
def _rescheduleFailedJob(self, jobID, message):
820834
"""
821835
Set Job Status to "Rescheduled" and issue a reschedule command to the Job Manager
822836
"""
823837

824838
self.log.warn("Failure ==> rescheduling", f"(during {message})")
825839

826-
if direct:
827-
JobStateUpdateClient().setJobStatus(
828-
int(jobID), status=JobStatus.RESCHEDULED, applicationStatus=message, source="JobAgent@%s", force=True
829-
)
830-
else:
831-
originalJobID = self.jobReport.jobID
832-
self.jobReport.setJob(jobID)
833-
# Setting a job parameter does not help since the job will be rescheduled,
834-
# instead set the status with the cause and then another status showing the
835-
# reschedule operation.
836-
self.jobReport.setJobStatus(status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True)
837-
self.jobReport.setJob(originalJobID)
840+
# Setting a job parameter does not help since the job will be rescheduled,
841+
# instead set the status with the cause and then another status showing the
842+
# reschedule operation.
843+
self.jobs[jobID]["JobReport"].setJobStatus(
844+
status=JobStatus.RESCHEDULED, applicationStatus=message, sendFlag=True
845+
)
838846

839847
self.log.info("Job will be rescheduled")
840848
result = JobManagerClient().rescheduleJob(jobID)
@@ -882,11 +890,15 @@ def finalize(self):
882890
if not res["OK"]:
883891
self.log.error("CE could not be properly shut down", res["Message"])
884892

885-
# Check the submitted jobs a last time
886-
result = self._checkSubmittedJobs()
887-
if not result["OK"]:
888-
self.log.error("Problem while trying to get status of the last submitted jobs")
893+
# Check the latest submitted jobs
894+
while self.jobs:
895+
result = self._checkSubmittedJobs()
896+
if not result["OK"]:
897+
self.log.error("Problem while trying to get status of the last submitted jobs")
898+
break
899+
time.sleep(int(self.am_getOption("PollingTime")))
889900

901+
# Set the pilot status to Done
890902
gridCE = gConfig.getValue("/LocalSite/GridCE", "")
891903
queue = gConfig.getValue("/LocalSite/CEQueue", "")
892904
result = PilotManagerClient().setPilotStatus(

0 commit comments

Comments
 (0)