Skip to content

Commit b42aa1f

Browse files
committed
feat: RemoteRunner refactoring and new features
1 parent 8ad4e57 commit b42aa1f

File tree

3 files changed

+251
-38
lines changed

3 files changed

+251
-38
lines changed

src/DIRAC/Workflow/Modules/Script.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import shlex
1414
import distutils.spawn # pylint: disable=no-name-in-module,no-member,import-error
1515

16-
from DIRAC import gLogger
16+
from DIRAC import gLogger, gConfig
1717
from DIRAC.Core.Utilities.Subprocess import systemCall
1818
from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner
1919
from DIRAC.Workflow.Modules.ModuleBase import ModuleBase
@@ -88,8 +88,14 @@ def _executeCommand(self):
8888
"""execute the self.command (uses systemCall)"""
8989
failed = False
9090

91-
remoteRunner = RemoteRunner()
92-
if remoteRunner.is_remote_execution():
91+
# Check whether the execution should be done remotely
92+
is_remote_execution = gConfig.getValue("/LocalSite/RemoteExecution", "false")
93+
if is_remote_execution.lower() in ["true", "yes"]:
94+
remoteRunner = RemoteRunner(
95+
gConfig.getValue("/LocalSite/Site"),
96+
gConfig.getValue("/LocalSite/GridCE"),
97+
gConfig.getValue("/LocalSite/CEQueue"),
98+
)
9399
retVal = remoteRunner.execute(self.command)
94100
else:
95101
retVal = systemCall(

src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py

Lines changed: 132 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,99 @@
11
""" RemoteRunner
22
3+
RemoteRunner has been designed to send scripts/applications and input files on remote worker nodes having
4+
no outbound connectivity (e.g. supercomputers)
5+
36
Mostly called by workflow modules, RemoteRunner is generally the last component to get through before
47
the script/application execution on a remote machine.
5-
Depending on an environment variable WORKLOADEXECLOCATION, it decides whether it should take care of the execution.
6-
RemoteRunner has been designed to send script/application on remote worker nodes having no outbound connectivity
7-
(e.g. supercomputers)
88
"""
99
import os
1010
import shlex
11+
from six.moves import shlex_quote
1112
import time
1213

13-
from DIRAC import gLogger, gConfig, S_OK
14+
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
1415
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
16+
from DIRAC.Core.Utilities.Decorators import deprecated
1517
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
1618
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueue
1719
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
1820

1921

2022
class RemoteRunner(object):
21-
def __init__(self):
23+
def __init__(self, siteName=None, ceName=None, queueName=None):
2224
self.log = gLogger.getSubLogger("RemoteRunner")
23-
self.remoteExecution = gConfig.getValue("/LocalSite/RemoteExecution", "false")
24-
25+
self._workloadSite = siteName
26+
if not self._workloadSite:
27+
self.log.warn("You are expected to provide a siteName in parameters from v8.0")
28+
self.log.warn("Trying to get workloadSite from /LocalSite/Site...")
29+
self._workloadSite = gConfig.getValue("/LocalSite/Site")
30+
self._workloadCE = ceName
31+
if not self._workloadCE:
32+
self.log.warn("You are expected to provide a ceName in parameters from v8.0")
33+
self.log.warn("Trying to get workloadSite from /LocalSite/GridCE...")
34+
self._workloadCE = gConfig.getValue("/LocalSite/GridCE")
35+
self._workloadQueue = queueName
36+
if not self._workloadQueue:
37+
self.log.warn("You are expected to provide a queueName in parameters from v8.0")
38+
self.log.warn("Trying to get workloadSite from /LocalSite/CEQueue...")
39+
self._workloadQueue = gConfig.getValue("/LocalSite/CEQueue")
40+
41+
@deprecated('Use gConfig.getValue("/LocalSite/RemoteExecution") instead.')
2542
def is_remote_execution(self):
2643
"""Main method: decides whether the execution will be done locally or remotely via a CE.
2744
45+
This method does not really make sense: if we use RemoteRunner, it means we want to perform a remote execution.
46+
Therefore, this should be checked before calling RemoteRunner by checking /LocalSite/RemoteExecution for instance.
47+
2848
:return: bool
2949
"""
50+
return gConfig.getValue("/LocalSite/RemoteExecution")
3051

31-
# if remoteExecution is true, this means the workload should be executed
32-
# in a different remote location. This mainly happens when the remote Site has no
33-
# external connectivity and can only execute the workload itself.
34-
return self.remoteExecution.lower() in ["true", "yes"]
35-
36-
def execute(self, command):
52+
def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemoteJob=True):
3753
"""Execute the command remotely via a CE
3854
3955
:param str command: command to execute remotely
56+
:param str workingDirectory: directory containing the inputs required by the command
57+
:param int numberOfProcessors: number of processors to allocate to the command
58+
:param str cleanRemoteJob: clean the files related to the command on the remote host if True
59+
:return: (status, output, error)
4060
"""
61+
self.log.verbose("Command to submit:", command)
62+
63+
# Check whether CE parameters are set
64+
result = self._checkParameters()
65+
if not result["OK"]:
66+
result["Value"] = (-1, "", result["Message"])
67+
return result
68+
self.log.verbose(
69+
"The command will be sent to",
70+
"site %s, CE %s, queue %s" % (self._workloadSite, self._workloadCE, self._workloadQueue),
71+
)
72+
4173
# Set up Application Queue
42-
self.log.verbose("Remote application execution on:", self.remoteExecution)
43-
result = self._setUpworkloadCE()
74+
result = self._setUpWorkloadCE(numberOfProcessors)
4475
if not result["OK"]:
76+
result["Value"] = (-1, "", result["Message"])
4577
return result
4678
workloadCE = result["Value"]
79+
self.log.debug("The CE interface has been set up")
4780

4881
# Add the command in an executable file
49-
executable = self._wrapCommand(command)
50-
# get inputs file from the current working directory
51-
inputs = os.listdir(".")
82+
executable = "workloadExec.sh"
83+
self._wrapCommand(command, workingDirectory, executable)
84+
self.log.debug("The command has been wrapped into an executable")
85+
86+
# Get inputs from the current working directory
87+
inputs = os.listdir(workingDirectory)
5288
inputs.remove(os.path.basename(executable))
5389
self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs))
54-
# request the whole directory as output
90+
# Request the whole directory as output
5591
outputs = ["/"]
5692

5793
# Submit the command as a job
5894
result = workloadCE.submitJob(executable, workloadCE.proxy, inputs=inputs, outputs=outputs)
5995
if not result["OK"]:
96+
result["Value"] = (-1, "", result["Message"])
6097
return result
6198
jobID = result["Value"][0]
6299
stamp = result["PilotStampDict"][jobID]
@@ -67,46 +104,83 @@ def execute(self, command):
67104
time.sleep(120)
68105
result = workloadCE.getJobStatus([jobID])
69106
if not result["OK"]:
107+
result["Value"] = (-1, "", result["Message"])
70108
return result
71109
jobStatus = result["Value"][jobID]
72110
self.log.verbose("The final status of the application/script is: ", jobStatus)
73111

74112
# Get job outputs
75113
result = workloadCE.getJobOutput("%s:::%s" % (jobID, stamp), os.path.abspath("."))
76114
if not result["OK"]:
115+
result["Value"] = (-1, "", result["Message"])
77116
return result
117+
output, error = result["Value"]
118+
119+
# Clean job on the remote resource
120+
if cleanRemoteJob:
121+
result = workloadCE.cleanJob(jobID)
122+
if not result["OK"]:
123+
result["Value"] = (-1, "", result["Message"])
124+
return result
78125

79126
commandStatus = {"Done": 0, "Failed": -1, "Killed": -2}
80-
output, error = result["Value"]
81-
outputDict = {"OK": True, "Value": [commandStatus[jobStatus], output, error]}
82-
return outputDict
127+
return S_OK((commandStatus[jobStatus], output, error))
128+
129+
def _checkParameters(self):
130+
"""Initialize the remote runner using the parameters of the CS.
131+
:return: S_OK, S_ERROR
132+
"""
133+
if not self._workloadSite:
134+
return S_ERROR("The remote site is not defined")
135+
if not self._workloadCE:
136+
return S_ERROR("The remote CE is not defined")
137+
if not self._workloadQueue:
138+
return S_ERROR("The remote queue is not defined")
83139

84-
def _setUpworkloadCE(self):
140+
return S_OK()
141+
142+
def _setUpWorkloadCE(self, numberOfProcessorsPayload=1):
85143
"""Get application queue and configure it
86144
87145
:return: a ComputingElement instance
88146
"""
89-
# Get CE parameters
90-
workloadSite = gConfig.getValue("/LocalSite/Site")
91-
workloadCE = gConfig.getValue("/LocalSite/GridCE")
92-
workloadQueue = gConfig.getValue("/LocalSite/CEQueue")
93-
94-
result = getQueue(workloadSite, workloadCE, workloadQueue)
147+
# Get CE Parameters
148+
result = getQueue(self._workloadSite, self._workloadCE, self._workloadQueue)
95149
if not result["OK"]:
96150
return result
97151
ceType = result["Value"]["CEType"]
98152
ceParams = result["Value"]
99153

100154
# Build CE
101155
ceFactory = ComputingElementFactory()
102-
result = ceFactory.getCE(ceName=workloadCE, ceType=ceType, ceParametersDict=ceParams)
156+
result = ceFactory.getCE(ceName=self._workloadCE, ceType=ceType, ceParametersDict=ceParams)
103157
if not result["OK"]:
104158
return result
105159
workloadCE = result["Value"]
106160

161+
# Set the number of processors available according to the need of the payload
162+
numberOfProcessorsCE = workloadCE.ceParameters.get("NumberOfProcessors", 1)
163+
if numberOfProcessorsCE < 1 or numberOfProcessorsPayload < 1:
164+
self.log.warn(
165+
"Inappropriate values:",
166+
"number of processors required for the payload %s - for the CE %s"
167+
% (numberOfProcessorsPayload, numberOfProcessorsCE),
168+
)
169+
return S_ERROR("Inappropriate NumberOfProcessors value")
170+
171+
if numberOfProcessorsPayload > numberOfProcessorsCE:
172+
self.log.warn(
173+
"Not enough processors to execute the payload: ",
174+
"number of processors required for the payload %s < %s the WN capacity"
175+
% (numberOfProcessorsPayload, numberOfProcessorsCE),
176+
)
177+
return S_ERROR("Not enough processors to execute the command")
178+
179+
workloadCE.ceParameters["NumberOfProcessors"] = numberOfProcessorsPayload
180+
107181
# Add a proxy to the CE
108182
result = getProxyInfo()
109-
if not result["OK"] and not result["Value"]["chain"]:
183+
if not result["OK"]:
110184
return result
111185
proxy = result["Value"]["chain"]
112186
result = proxy.getRemainingSecs()
@@ -117,13 +191,36 @@ def _setUpworkloadCE(self):
117191

118192
return S_OK(workloadCE)
119193

120-
def _wrapCommand(self, command):
194+
def _wrapCommand(self, command, workingDirectory, executable):
121195
"""Wrap the command in a file
122196
123197
:param str command: command line to write in the executable
124-
:return: name of the executable file
198+
:param str workingDirectory: directory containing the inputs required by the command
199+
:param str executable: path of the executable that should contain the command to submit
200+
:return: path of the executable
125201
"""
126-
executable = "workloadExec.sh"
202+
# Check whether the command contains any absolute path: there would be no way to access them remotely
203+
# They need to be converted into relative path
204+
argumentsProcessed = []
205+
for argument in shlex.split(command):
206+
207+
argPath = os.path.dirname(argument)
208+
# The argument does not contain any path, not concerned
209+
if not argPath:
210+
argumentsProcessed.append(argument)
211+
continue
212+
213+
argPathAbsolutePath = os.path.abspath(argPath)
214+
workingDirAbsolutePath = os.path.abspath(workingDirectory)
215+
# The argument is not included in the workingDirectory, not concerned
216+
if not argPathAbsolutePath.startswith(workingDirAbsolutePath):
217+
argumentsProcessed.append(argument)
218+
continue
219+
220+
# The argument is included in the workingDirectory and should be converted
221+
argumentsProcessed.append(os.path.join(".", os.path.basename(argument)))
222+
223+
# Fro v8.0, use: shlex.join(argumentsProcessed)
224+
command = " ".join(shlex_quote(arg) for arg in argumentsProcessed)
127225
with open(executable, "w") as f:
128226
f.write(command)
129-
return executable
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
""" Test class for Job Agent
2+
"""
3+
4+
# imports
5+
from __future__ import absolute_import
6+
from __future__ import division
7+
from __future__ import print_function
8+
import pytest
9+
import os
10+
from diraccfg import CFG
11+
12+
# DIRAC Components
13+
from DIRAC import gLogger, gConfig, S_OK
14+
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
15+
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
16+
from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner
17+
18+
gLogger.setLevel("DEBUG")
19+
20+
21+
@pytest.mark.parametrize(
22+
"command, workingDirectory, expectedContent",
23+
[
24+
("/path/to/script.sh", "/path/to", "./script.sh"),
25+
("/path/to/script.sh", "/another/path/to", "/path/to/script.sh"),
26+
("/path/to/script.sh arg1", "/path/to", "./script.sh arg1"),
27+
("/path/to/script.sh /path/to/arg1", "/path/to", "./script.sh ./arg1"),
28+
("/path/to/script.sh /anotherpath/to/arg1", "/path/to", "./script.sh /anotherpath/to/arg1"),
29+
("/path/to/script.sh /another/path/to/arg1", "/path/to", "./script.sh /another/path/to/arg1"),
30+
("./script.sh", ".", "./script.sh"),
31+
("ls", "/path/to", "ls"),
32+
("echo 'Hello World'", "/path/to", "echo 'Hello World'"),
33+
(
34+
"lb-prod-run prodConf_Gauss_12345_12345.json --verbose",
35+
".",
36+
"lb-prod-run prodConf_Gauss_12345_12345.json --verbose",
37+
),
38+
],
39+
)
40+
def test__wrapCommand(command, workingDirectory, expectedContent):
41+
"""Test RemoteRunner()._wrapCommand()"""
42+
executable = "workloadExec.sh"
43+
44+
# Instantiate a RemoteRunner and wrap the command
45+
remoteRunner = RemoteRunner("Site1", "CE1", "queue1")
46+
remoteRunner._wrapCommand(command, workingDirectory, executable)
47+
48+
# Test the results
49+
assert os.path.isfile(executable)
50+
with open(executable, "r") as f:
51+
content = f.read()
52+
os.remove(executable)
53+
assert content == expectedContent
54+
55+
56+
@pytest.mark.parametrize(
57+
"payloadNumberOfProcessors, ceNumberOfProcessors, expectedResult, expectedNumberOfProcessors",
58+
[
59+
# CE has more processors than the payload requests
60+
(1, 1, True, 1),
61+
(2, 2, True, 2),
62+
(1, 2, True, 1),
63+
# CE has less processors than the payload requests
64+
(2, 1, False, "Not enough processors to execute the command"),
65+
# Specific case: we should not have 0
66+
(0, 1, False, "Inappropriate NumberOfProcessors value"),
67+
(1, 0, False, "Inappropriate NumberOfProcessors value"),
68+
(-4, 1, False, "Inappropriate NumberOfProcessors value"),
69+
(1, -4, False, "Inappropriate NumberOfProcessors value"),
70+
(0, 0, False, "Inappropriate NumberOfProcessors value"),
71+
],
72+
)
73+
def test__setUpWorkloadCE(
74+
mocker, payloadNumberOfProcessors, ceNumberOfProcessors, expectedResult, expectedNumberOfProcessors
75+
):
76+
"""Test RemoteRunner()._setUpWorkloadCE()"""
77+
mocker.patch(
78+
"DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner.getProxyInfo", return_value=S_OK({"chain": X509Chain()})
79+
)
80+
mocker.patch("DIRAC.Core.Security.X509Chain.X509Chain.getRemainingSecs", return_value=S_OK(1000))
81+
82+
# Configure the CS with the number of available processors in the CE
83+
siteName = "DIRAC.Site1.site"
84+
ceName = "CE1"
85+
queueName = "queue1"
86+
87+
config = {"Resources": {"Sites": {"DIRAC": {siteName: {"CEs": {ceName: {}}}}}}}
88+
ceConfig = config["Resources"]["Sites"]["DIRAC"][siteName]["CEs"][ceName]
89+
ceConfig["CEType"] = "HTCondorCE"
90+
ceConfig["Queues"] = {}
91+
ceConfig["Queues"][queueName] = {}
92+
ceConfig["Queues"][queueName]["NumberOfProcessors"] = ceNumberOfProcessors
93+
94+
# Load the configuration
95+
gConfigurationData.localCFG = CFG()
96+
cfg = CFG()
97+
cfg.loadFromDict(config)
98+
gConfig.loadCFG(cfg)
99+
100+
# Instantiate a RemoteRunner and set up the CE
101+
remoteRunner = RemoteRunner(siteName, ceName, queueName)
102+
result = remoteRunner._setUpWorkloadCE(payloadNumberOfProcessors)
103+
104+
# Test the results
105+
assert result["OK"] == expectedResult
106+
if expectedResult:
107+
workloadCE = result["Value"]
108+
assert workloadCE.ceParameters["NumberOfProcessors"] == expectedNumberOfProcessors
109+
else:
110+
assert result["Message"] == expectedNumberOfProcessors

0 commit comments

Comments
 (0)