Skip to content

Commit 6465d5e

Browse files
committed
feat: add a test for JobAgent.checkSubmittedJobs()
1 parent e7062de commit 6465d5e

File tree

1 file changed

+107
-4
lines changed

1 file changed

+107
-4
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
""" Test class for Job Agent
22
"""
3-
3+
import os
44
import pytest
5+
import time
56

7+
from DIRAC import gLogger, S_OK, S_ERROR
8+
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
9+
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
10+
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
11+
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import badJobScript, jobScript
612
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
713
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
8-
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
9-
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
10-
from DIRAC import gLogger, S_ERROR
1114

1215
gLogger.setLevel("DEBUG")
1316

@@ -465,3 +468,103 @@ def test_submitJob(mocker, mockJWInput, expected):
465468

466469
if not result["OK"]:
467470
assert result["Message"] == expected["Message"]
471+
472+
473+
@pytest.mark.slow
474+
@pytest.mark.parametrize(
475+
"localCE, job, expectedResult1, expectedResult2",
476+
[
477+
# Sync submission, should not encounter any issue
478+
("InProcess", jobScript % "1", ([], []), ([], [])),
479+
# Async submission, should not encounter any issue
480+
("Pool/InProcess", jobScript % "1", ([], []), ([], [])),
481+
# Sync submission of a failed job, first time the job is failed, second time is ok since the job
482+
# as already been processed
483+
("InProcess", badJobScript, ([], ["Payload execution failed with error code 5"]), ([], [])),
484+
# Async submission of a failed job, first time the job has not failed yet, second time it is failed
485+
("Pool/InProcess", badJobScript, ([], []), ([], ["Payload execution failed with error code 5"])),
486+
# Sync submission, should fail because of a problem in the Singularity CE
487+
("Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
488+
# Async submission, should fail because of a problem in the Singularity CE
489+
("Pool/Singularity", jobScript % "1", (["Failed to find singularity"], []), ([], [])),
490+
],
491+
)
492+
def test_submitAndCheckJob(mocker, localCE, job, expectedResult1, expectedResult2):
493+
"""Test the submission and the management of the job status."""
494+
jobName = "testJob.py"
495+
with open(jobName, "w") as execFile:
496+
execFile.write(job)
497+
os.chmod(jobName, 0o755)
498+
499+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
500+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.JobAgent.am_stopExecution")
501+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.createJobWrapper", return_value=S_OK([jobName]))
502+
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.dumpAllToString", return_value=S_OK())
503+
504+
jobID = 123
505+
506+
jobAgent = JobAgent("JobAgent", "Test")
507+
jobAgent.log = gLogger.getSubLogger("JobAgent")
508+
jobAgent._initializeComputingElement(localCE)
509+
jobAgent.jobReport = JobReport(jobID)
510+
511+
# Submit a job
512+
result = jobAgent._submitJob(
513+
jobID=jobID, jobParams={}, resourceParams={}, optimizerParams={}, proxyChain=X509Chain()
514+
)
515+
# Check that no error occurred during the submission process
516+
# at the level of the JobAgent
517+
assert result["OK"]
518+
519+
# Check that the job was added to jobAgent.submissionDict
520+
assert len(jobAgent.submissionDict) == 1
521+
assert jobID in jobAgent.submissionDict
522+
523+
# If the submission is synchronous jobAgent.computingElement.taskResults
524+
# should already contain the result
525+
if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False):
526+
assert len(jobAgent.computingElement.taskResults) == 1
527+
# Else, the job is still running, the result should not already be present
528+
# Unless, an error occurred during the submission
529+
else:
530+
if expectedResult1[0]:
531+
assert len(jobAgent.computingElement.taskResults) == 1
532+
else:
533+
assert len(jobAgent.computingElement.taskResults) == 0
534+
535+
# Check errors that could have occurred in the innerCE
536+
result = jobAgent._checkSubmittedJobs()
537+
assert result["OK"]
538+
assert result["Value"] == expectedResult1
539+
540+
# Check that the job is still present in jobAgent.submissionDict
541+
assert len(jobAgent.submissionDict) == 1
542+
assert jobID in jobAgent.submissionDict
543+
544+
# If the submission is synchronous jobAgent.computingElement.taskResults
545+
# should not contain the result anymore: already processed by checkSubmittedJobs
546+
if not jobAgent.computingElement.ceParameters.get("AsyncSubmission", False):
547+
assert len(jobAgent.computingElement.taskResults) == 0
548+
# Else, the job is still running, the result should not already be present
549+
# Unless, an error occurred during the submission
550+
else:
551+
if expectedResult1[0]:
552+
assert len(jobAgent.computingElement.taskResults) == 0
553+
else:
554+
# Wait for the end of the job
555+
attempts = 0
556+
while len(jobAgent.computingElement.taskResults) < 1:
557+
time.sleep(0.1)
558+
attempts += 1
559+
if attempts == 1200:
560+
break
561+
assert len(jobAgent.computingElement.taskResults) == 1
562+
563+
# Check errors that could have occurred in the innerCE
564+
result = jobAgent._checkSubmittedJobs()
565+
assert result["OK"]
566+
assert result["Value"] == expectedResult2
567+
568+
# From here, taskResults should be empty
569+
assert jobID in jobAgent.submissionDict
570+
assert len(jobAgent.computingElement.taskResults) == 0

0 commit comments

Comments
 (0)