Skip to content

Commit c11eb31

Browse files
authored
Merge pull request #7397 from aldbr/v8.0_FIX_JobAgent-ce-failures
[8.0] fix: JobAgent rescheduling wrong jobs
2 parents f47c2b9 + c1c49c4 commit c11eb31

File tree

2 files changed

+76
-5
lines changed

2 files changed

+76
-5
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,10 @@ def _checkSubmittedJobs(self):
694694
submissionErrors = []
695695
payloadErrors = []
696696
originalJobID = self.jobReport.jobID
697-
for jobID, taskID in self.submissionDict.items():
697+
# Loop over the jobIDs submitted to the CE
698+
# Here we iterate over a copy of the keys because we are modifying the dictionary within the loop
699+
for jobID in list(self.submissionDict.keys()):
700+
taskID = self.submissionDict[jobID]
698701
if taskID not in self.computingElement.taskResults:
699702
continue
700703

@@ -731,7 +734,9 @@ def _checkSubmittedJobs(self):
731734
self.log.info(message)
732735

733736
# Remove taskID from computingElement.taskResults as it has been treated
737+
# Remove jobID from submissionDict as it has been treated
734738
del self.computingElement.taskResults[taskID]
739+
del self.submissionDict[jobID]
735740

736741
self.jobReport.setJob(originalJobID)
737742
return S_OK((submissionErrors, payloadErrors))

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
517517
jobAgent.log = gLogger.getSubLogger("JobAgent")
518518
jobAgent._initializeComputingElement(localCE)
519519
jobAgent.jobReport = JobReport(jobID)
520+
jobAgent.jobSubmissionDelay = 3
520521

521522
# Submit a job
522523
result = jobAgent._submitJob(
@@ -547,10 +548,6 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
547548
assert result["OK"]
548549
assert result["Value"] == expectedResult1
549550

550-
# Check that the job is still present in jobAgent.submissionDict
551-
assert len(jobAgent.submissionDict) == 1
552-
assert jobID in jobAgent.submissionDict
553-
554551
# If the submission is synchronous jobAgent.computingElement.taskResults
555552
# should not contain the result anymore: already processed by checkSubmittedJobs
556553
if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False):
@@ -576,5 +573,74 @@ def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult
576573
assert result["Value"] == expectedResult2
577574

578575
# From here, taskResults should be empty
576+
assert len(jobAgent.computingElement.taskResults) == 0
577+
578+
579+
def test_submitAndCheck2Jobs(mocker):
580+
"""Test the submission and the management of the job status.
581+
582+
This time, a first job is successfully submitted, but the second submission fails.
583+
We want to make sure that both jobs are correctly managed.
584+
"""
585+
# Mock the JobAgent
586+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
587+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
588+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK(["jobWrapper.py"]))
589+
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
590+
mocker.patch(
591+
"DIRAC.Resources.Computing.InProcessComputingElement.InProcessComputingElement.submitJob",
592+
side_effect=[S_OK(), S_ERROR("ComputingElement error")],
593+
)
594+
595+
jobAgent = JobAgent("JobAgent", "Test")
596+
jobAgent.log = gLogger.getSubLogger("JobAgent")
597+
jobAgent._initializeComputingElement("InProcess")
598+
jobAgent.ceName = "InProcess"
599+
jobAgent.jobSubmissionDelay = 0
600+
601+
jobAgent.jobReport = JobReport(0)
602+
mocker.patch.object(jobAgent, "jobReport", autospec=True)
603+
mock_rescheduleFailedJob = mocker.patch.object(jobAgent, "_rescheduleFailedJob")
604+
605+
# Submit a first job: should be successful
606+
jobID = "123"
607+
result = jobAgent._submitJob(
608+
jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain()
609+
)
610+
# Check that no error occurred during the submission process
611+
# at the level of the JobAgent
612+
assert result["OK"]
613+
614+
# Check that the job was added to jobAgent.submissionDict
615+
assert len(jobAgent.submissionDict) == 1
579616
assert jobID in jobAgent.submissionDict
617+
618+
# The submission is synchronous taskResults should already contain the result
619+
assert len(jobAgent.computingElement.taskResults) == 1
620+
621+
# Check errors that could have occurred in the innerCE
622+
result = jobAgent._checkSubmittedJobs()
623+
assert result["OK"]
624+
assert result["Value"] == ([], [])
625+
626+
mock_rescheduleFailedJob.assert_not_called()
627+
628+
# Submit a second job: should fail
629+
jobID = "456"
630+
result = jobAgent._submitJob(
631+
jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain()
632+
)
633+
# Check that no error occurred during the submission process
634+
# at the level of the JobAgent
635+
assert result["OK"]
636+
637+
# Check errors that could have occurred in the innerCE
638+
result = jobAgent._checkSubmittedJobs()
639+
assert result["OK"]
640+
assert result["Value"] == (["ComputingElement error"], [])
641+
642+
# Make sure that the correct job is rescheduled
643+
mock_rescheduleFailedJob.assert_called_with(jobID, "ComputingElement error")
644+
645+
# From here, taskResults should be empty
580646
assert len(jobAgent.computingElement.taskResults) == 0

0 commit comments

Comments
 (0)