Skip to content

Commit 068aa3a

Browse files
authored
Merge pull request #6489 from chrisburr/cherry-pick-2-d3e0a8171-integration
[sweep:integration] fix (wms): additional fixes for the PushJobAgent
2 parents 140f9c7 + 4a3a339 commit 068aa3a

File tree

5 files changed

+136
-60
lines changed

5 files changed

+136
-60
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,6 @@ def initialize(self):
6868
self.resourcesModule = res["Value"]
6969
self.opsHelper = Operations()
7070

71-
# Disable Watchdog: we don't need it as pre/post processing occurs locally
72-
setup = gConfig.getValue("/DIRAC/Setup", "")
73-
if not setup:
74-
return S_ERROR("Cannot get the DIRAC Setup value")
75-
wms_instance = getSystemInstance("WorkloadManagement")
76-
if not wms_instance:
77-
return S_ERROR("Cannot get the WorkloadManagement system instance")
78-
section = "/Systems/WorkloadManagement/%s/JobWrapper" % wms_instance
79-
self._updateConfiguration("CheckWallClockFlag", 0, path=section)
80-
self._updateConfiguration("CheckDiskSpaceFlag", 0, path=section)
81-
self._updateConfiguration("CheckLoadAvgFlag", 0, path=section)
82-
self._updateConfiguration("CheckCPUConsumedFlag", 0, path=section)
83-
self._updateConfiguration("CheckCPULimitFlag", 0, path=section)
84-
self._updateConfiguration("CheckMemoryLimitFlag", 0, path=section)
85-
self._updateConfiguration("CheckTimeLeftFlag", 0, path=section)
86-
8771
return S_OK()
8872

8973
def beginExecution(self):
@@ -193,13 +177,6 @@ def execute(self):
193177
ceDict["NumberOfProcessors"] = ce.ceParameters.get("NumberOfProcessors")
194178
self._setCEDict(ceDict)
195179

196-
# Update the configuration with the names of the Site, CE and queue to target
197-
# This is used in the next stages
198-
self._updateConfiguration("Site", queueDictionary["Site"])
199-
self._updateConfiguration("GridCE", queueDictionary["CEName"])
200-
self._updateConfiguration("CEQueue", queueDictionary["QueueName"])
201-
self._updateConfiguration("RemoteExecution", True)
202-
203180
# Try to match a job
204181
jobRequest = self._matchAJob(ceDictList)
205182
while jobRequest["OK"]:
@@ -397,6 +374,9 @@ def _setCEDict(self, ceDict):
397374
if project:
398375
ceDict["ReleaseProject"] = project
399376

377+
# Add a remoteExecution tag, which can be used in the next stages
378+
ceDict["RemoteExecution"] = True
379+
400380
def _checkMatchingIssues(self, jobRequest):
401381
"""Check the source of the matching issue
402382

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ def test__allowedToSubmit(mocker, queue, failedQueues, failedQueueCycleFactor, e
4949
@pytest.mark.parametrize(
5050
"ceDict, pilotVersion, pilotProject, expected",
5151
[
52-
({}, None, None, {}),
53-
({}, "8.0.0", None, {"ReleaseVersion": "8.0.0"}),
54-
({}, ["8.0.0", "7.3.7"], None, {"ReleaseVersion": "8.0.0"}),
55-
({}, None, "Project", {"ReleaseProject": "Project"}),
56-
({}, "8.0.0", "Project", {"ReleaseVersion": "8.0.0", "ReleaseProject": "Project"}),
52+
({}, None, None, {"RemoteExecution": True}),
53+
({}, "8.0.0", None, {"ReleaseVersion": "8.0.0", "RemoteExecution": True}),
54+
({}, ["8.0.0", "7.3.7"], None, {"ReleaseVersion": "8.0.0", "RemoteExecution": True}),
55+
({}, None, "Project", {"ReleaseProject": "Project", "RemoteExecution": True}),
56+
({}, "8.0.0", "Project", {"ReleaseVersion": "8.0.0", "ReleaseProject": "Project", "RemoteExecution": True}),
5757
],
5858
)
5959
def test__setCEDict(mocker, ceDict, pilotVersion, pilotProject, expected):

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ def initialize(self, arguments):
203203
self.userGroup = self.jobArgs.get("OwnerGroup", self.userGroup)
204204
self.jobClass = self.jobArgs.get("JobSplitType", self.jobClass)
205205

206+
if not self.cpuNormalizationFactor:
207+
self.cpuNormalizationFactor = self.ceArgs.get("CPUNormalizationFactor", self.cpuNormalizationFactor)
208+
self.siteName = self.ceArgs.get("Site", self.siteName)
209+
206210
# Prepare the working directory, cd to there, and copying eventual extra arguments in it
207211
if self.jobID:
208212
if os.path.exists(str(self.jobID)):
@@ -302,6 +306,23 @@ def execute(self):
302306
# the argument should include the jobDescription.xml file
303307
jobArguments = self.jobArgs.get("Arguments", "")
304308

309+
# In case the executable is dirac-jobexec,
310+
# the configuration should include essential parameters related to the CE (which can be found in ceArgs)
311+
# we consider information from ceArgs more accurate than from LocalSite (especially when jobs are pushed)
312+
configOptions = ""
313+
if executable == "dirac-jobexec":
314+
configOptions = "-o /LocalSite/CPUNormalizationFactor=%s " % self.cpuNormalizationFactor
315+
configOptions += "-o /LocalSite/Site=%s " % self.siteName
316+
configOptions += "-o /LocalSite/GridCE=%s " % self.ceArgs.get(
317+
"GridCE", gConfig.getValue("/LocalSite/GridCE", "")
318+
)
319+
configOptions += "-o /LocalSite/CEQueue=%s " % self.ceArgs.get(
320+
"Queue", gConfig.getValue("/LocalSite/CEQueue", "")
321+
)
322+
configOptions += "-o /LocalSite/RemoteExecution=%s " % self.ceArgs.get(
323+
"RemoteExecution", gConfig.getValue("/LocalSite/RemoteExecution", False)
324+
)
325+
305326
executable = os.path.expandvars(executable)
306327
exeThread = None
307328
spObject = None
@@ -337,7 +358,9 @@ def execute(self):
337358
spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit))
338359
command = executable
339360
if jobArguments:
340-
command += " " + jobArguments
361+
command += " " + str(jobArguments)
362+
if configOptions:
363+
command += " " + configOptions
341364
self.log.verbose("Execution command: %s" % (command))
342365
maxPeekLines = self.maxPeekLines
343366
exeThread = ExecutionThread(spObject, command, maxPeekLines, outputFile, errorFile, exeEnv)
@@ -376,6 +399,16 @@ def execute(self):
376399
if "DisableCPUCheck" in self.jobArgs:
377400
watchdog.testCPUConsumed = False
378401

402+
# disable checks if remote execution: do not need it as pre/post processing occurs locally
403+
if self.ceArgs.get("RemoteExecution", False):
404+
watchdog.testWallClock = False
405+
watchdog.testDiskSpace = False
406+
watchdog.testLoadAvg = False
407+
watchdog.testCPUConsumed = False
408+
watchdog.testCPULimit = False
409+
watchdog.testMemoryLimit = False
410+
watchdog.testTimeLeft = False
411+
379412
if exeThread.is_alive():
380413
self.log.info("Application thread is started in Job Wrapper")
381414
watchdog.run()
@@ -1224,6 +1257,7 @@ def sendJobAccounting(self, status="", minorStatus=""):
12241257
"JobType": self.jobType,
12251258
"JobClass": self.jobClass,
12261259
"ProcessingType": self.processingType,
1260+
"Site": self.siteName,
12271261
"FinalMajorStatus": self.wmsMajorStatus,
12281262
"FinalMinorStatus": self.wmsMinorStatus,
12291263
"CPUTime": cpuTime,

src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,31 @@ def test_performChecks():
6262

6363

6464
@pytest.mark.slow
65-
def test_execute(mocker):
65+
@pytest.mark.parametrize(
66+
"executable, args, src, expectedResult",
67+
[
68+
("/bin/ls", None, None, "Application Finished Successfully"),
69+
(
70+
"script-OK.sh",
71+
None,
72+
"src/DIRAC/WorkloadManagementSystem/JobWrapper/test/",
73+
"Application Finished Successfully",
74+
),
75+
("script.sh", "111", "src/DIRAC/WorkloadManagementSystem/JobWrapper/test/", "Application Finished With Errors"),
76+
("script.sh", 111, "src/DIRAC/WorkloadManagementSystem/JobWrapper/test/", "Application Finished With Errors"),
77+
("script-RESC.sh", None, "src/DIRAC/WorkloadManagementSystem/JobWrapper/test/", "Going to reschedule job"),
78+
(
79+
"src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py",
80+
"src/DIRAC/WorkloadManagementSystem/JobWrapper/test/jobDescription.xml -o /DIRAC/Setup=Test",
81+
None,
82+
"Application Finished Successfully",
83+
),
84+
],
85+
)
86+
def test_execute(mocker, executable, args, src, expectedResult):
87+
"""Test the status of the job after JobWrapper.execute().
88+
The returned value of JobWrapper.execute() is not checked as it can apparently be wrong depending on the shell used.
89+
"""
6690

6791
mocker.patch(
6892
"DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock
@@ -71,41 +95,21 @@ def test_execute(mocker):
7195
"DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", side_effect=getSystemSectionMock
7296
)
7397

74-
jw = JobWrapper()
75-
jw.jobArgs = {"Executable": "/bin/ls"}
76-
res = jw.execute()
77-
print("jw.execute() returns", str(res))
78-
assert res["OK"]
98+
if src:
99+
shutil.copy(os.path.join(src, executable), executable)
79100

80-
shutil.copy("src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-OK.sh", "script-OK.sh")
81101
jw = JobWrapper()
82-
jw.jobArgs = {"Executable": "script-OK.sh"}
102+
jw.jobArgs = {"Executable": executable}
103+
if args:
104+
jw.jobArgs["Arguments"] = args
83105
res = jw.execute()
84-
assert res["OK"]
85-
os.remove("script-OK.sh")
106+
assert expectedResult in jw.jobReport.jobStatusInfo[-1]
86107

87-
shutil.copy("src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script.sh", "script.sh")
88-
jw = JobWrapper()
89-
jw.jobArgs = {"Executable": "script.sh", "Arguments": "111"}
90-
res = jw.execute()
91-
assert res["OK"] # In this case the application finished with errors,
92-
# but the JobWrapper executed successfully
93-
os.remove("script.sh")
108+
if src:
109+
os.remove(executable)
94110

95-
# this will reschedule
96-
shutil.copy("src/DIRAC/WorkloadManagementSystem/JobWrapper/test/script-RESC.sh", "script-RESC.sh")
97-
jw = JobWrapper()
98-
jw.jobArgs = {"Executable": "script-RESC.sh"}
99-
res = jw.execute()
100-
if res["OK"]: # FIXME: This may happen depending on the shell - not the best test admittedly!
101-
print("We should not be here, unless the 'Execution thread status' is equal to 1")
102-
assert res["OK"]
103-
else:
104-
assert res["OK"] is False # In this case the application finished with an error code
105-
# that the JobWrapper interpreted as "to reschedule"
106-
# so in this case the "execute" is considered an error
107-
os.remove("script-RESC.sh")
108-
os.remove("std.out")
111+
if os.path.exists("std.out"):
112+
os.remove("std.out")
109113

110114

111115
@pytest.mark.parametrize(
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<Workflow>
2+
<descr_short></descr_short>
3+
<description><![CDATA[]]></description>
4+
<name>List files</name>
5+
<origin></origin>
6+
<type></type>
7+
<version>0.0</version>
8+
<Parameter name="JobType" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Job Type"><value><![CDATA[User]]></value></Parameter>
9+
<Parameter name="Priority" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="User Job Priority"><value><![CDATA[1]]></value></Parameter>
10+
<Parameter name="JobGroup" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Name of the JobGroup"><value><![CDATA[dteam]]></value></Parameter>
11+
<Parameter name="JobName" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="User specified name"><value><![CDATA[helloWorldSSHBatch]]></value></Parameter>
12+
<Parameter name="StdOutput" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Standard output file"><value><![CDATA[std.out]]></value></Parameter>
13+
<Parameter name="StdError" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Standard error file"><value><![CDATA[std.err]]></value></Parameter>
14+
<Parameter name="InputData" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Default null input data value"><value><![CDATA[]]></value></Parameter>
15+
<Parameter name="LogLevel" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Job Logging Level"><value><![CDATA[INFO]]></value></Parameter>
16+
<Parameter name="arguments" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Arguments to executable Step"><value><![CDATA[]]></value></Parameter>
17+
<Parameter name="ParametricInputData" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Default null parametric input data value"><value><![CDATA[]]></value></Parameter>
18+
<Parameter name="ParametricInputSandbox" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Default null parametric input sandbox value"><value><![CDATA[]]></value></Parameter>
19+
<Parameter name="CPUTime" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="CPU time in secs"><value><![CDATA[17800]]></value></Parameter>
20+
<Parameter name="InputSandbox" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="Input sandbox file list"><value><![CDATA[/tmp/cburr/CC7py3/v7.3/diracos/lib/python3.9/site-packages/DIRAC/tests/Workflow/Integration/exe-script.py]]></value></Parameter>
21+
<Parameter name="Site" type="JDL" linked_module="" linked_parameter="" in="True" out="False" description="User specified destination site"><value><![CDATA[DIRAC.Jenkins_SSHBatch.ch]]></value></Parameter>
22+
<ModuleDefinition>
23+
<body><![CDATA[
24+
from DIRAC.Workflow.Modules.Script import Script
25+
]]></body>
26+
<descr_short></descr_short>
27+
<description><![CDATA[ The Script class provides a simple way for users to specify an executable
28+
or file to run (and is also a simple example of a workflow module).
29+
]]></description>
30+
<origin></origin>
31+
<required></required>
32+
<type>Script</type>
33+
<version>0.0</version>
34+
</ModuleDefinition>
35+
<StepDefinition>
36+
<descr_short></descr_short>
37+
<description><![CDATA[]]></description>
38+
<origin></origin>
39+
<type>ScriptStep1</type>
40+
<version>0.0</version>
41+
<Parameter name="executable" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Executable Script"><value><![CDATA[]]></value></Parameter>
42+
<Parameter name="arguments" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Arguments for executable Script"><value><![CDATA[]]></value></Parameter>
43+
<Parameter name="applicationLog" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Log file name"><value><![CDATA[]]></value></Parameter>
44+
<ModuleInstance>
45+
<descr_short></descr_short>
46+
<name>Script</name>
47+
<type>Script</type>
48+
</ModuleInstance>
49+
</StepDefinition>
50+
<StepInstance>
51+
<descr_short></descr_short>
52+
<name>RunScriptStep1</name>
53+
<type>ScriptStep1</type>
54+
<Parameter name="executable" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Executable Script"><value><![CDATA[/bin/ls]]></value></Parameter>
55+
<Parameter name="arguments" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Arguments for executable Script"><value><![CDATA[]]></value></Parameter>
56+
<Parameter name="applicationLog" type="string" linked_module="" linked_parameter="" in="True" out="False" description="Log file name"><value><![CDATA[std.out]]></value></Parameter>
57+
</StepInstance>
58+
</Workflow>

0 commit comments

Comments
 (0)