Skip to content

Commit db595fe

Browse files
committed
fix(resources): Embed executableFile in SrunWrapper
1 parent f0b2839 commit db595fe

File tree

6 files changed

+69
-127
lines changed

6 files changed

+69
-127
lines changed

src/DIRAC/Resources/Computing/ComputingElement.py

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def __init__(self, ceName):
7979
self.valid = None
8080
self.mandatoryParameters = []
8181
self.batchSystem = None
82-
self.parallelLibrary = None
8382
self.taskResults = {}
8483
self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs
8584
self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs
@@ -197,52 +196,19 @@ def loadBatchSystem(self, batchSystemName):
197196
198197
:param str batchSystemName: name of the batch system
199198
"""
200-
result = self._loadCEAttribute("BatchSystem", batchSystemName, "Resources.Computing.BatchSystems")
201-
if not result["OK"]:
202-
return result
203-
self.batchSystem = result["Value"]
204-
return S_OK()
205-
206-
def loadParallelLibrary(self, parallelLibraryName, workingDirectory="."):
207-
"""Instantiate object representing the parallel library that will generate a script to wrap the executable
208-
209-
:param str parallelLibraryName: name of the parallel library
210-
"""
211-
result = self._loadCEAttribute(
212-
"ParallelLibrary",
213-
parallelLibraryName,
214-
"Resources.Computing.ParallelLibraries",
215-
workingDirectory=workingDirectory,
216-
)
217-
if not result["OK"]:
218-
return result
219-
self.parallelLibrary = result["Value"]
220-
return S_OK()
221-
222-
def _loadCEAttribute(self, ceAttribute, typeAttribute, moduleLocation, **kwargs):
223-
"""Instantiate specific CE attributes
224-
225-
:param str ceAttribute: name of the CE attribute
226-
:param str typeAttribute: class of the CE attribute
227-
:param str moduleLocation: location of the class definition in the source code
228-
:param dict kwargs: parameters to pass to the constructor of the ceAttribute instance
229-
"""
230-
if typeAttribute is None:
231-
typeAttribute = self.ceParameters[ceAttribute]
199+
if batchSystemName is None:
200+
batchSystemName = self.ceParameters['BatchSystem']
232201

233202
objectLoader = ObjectLoader()
234-
moduleLocation += ".%s" % typeAttribute
235-
result = objectLoader.loadObject(moduleLocation, typeAttribute)
236-
if not result["OK"]:
237-
self.log.error("Failed to load %s object: %s" % (ceAttribute, result["Message"]))
203+
result = objectLoader.loadObject('Resources.Computing.BatchSystems.%s' % batchSystemName, batchSystemName)
204+
if not result['OK']:
205+
self.log.error('Failed to load batch object: %s' % result['Message'])
238206
return result
239-
ceAttributeClass = result["Value"]
240-
ceAttributeModuleFile = result["ModuleFile"]
241-
242-
ceAttributeObject = ceAttributeClass(**kwargs)
243-
self.log.info("Class from module: ", ceAttributeModuleFile)
244-
245-
return S_OK(ceAttributeObject)
207+
batchClass = result['Value']
208+
batchModuleFile = result['ModuleFile']
209+
self.batchSystem = batchClass()
210+
self.log.info("Batch system class from module: ", batchModuleFile)
211+
return S_OK()
246212

247213
def setParameters(self, ceOptions):
248214
"""Add parameters from the given dictionary overriding the previous values

src/DIRAC/Resources/Computing/LocalComputingElement.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
Underlying batch system that is going to be used to orchestrate executable files. The Batch System has to be
1616
accessible from the LocalCE. By default, the LocalComputingElement submits directly on the host via the Host class.
1717
18-
ParallelLibrary:
19-
Underlying parallel library used to generate a wrapper around the executable files to run them in parallel on
20-
multiple nodes.
21-
2218
SharedArea:
2319
Area used to store executable/output/error files if they are not aready defined via BatchOutput, BatchError,
2420
InfoArea, ExecutableArea and/or WorkArea. The path should be absolute.
@@ -80,7 +76,7 @@ def _reset(self):
8076
batchSystemName = self.ceParameters.get("BatchSystem", "Host")
8177
result = self.loadBatchSystem(batchSystemName)
8278
if not result["OK"]:
83-
self.log.error("Failed to load the batch system plugin %s", self.batchSystem)
79+
self.log.error("Failed to load the batch system plugin %s", batchSystemName)
8480
return result
8581

8682
self.queue = self.ceParameters["Queue"]
@@ -106,13 +102,6 @@ def _reset(self):
106102
if not self.workArea.startswith("/"):
107103
self.workArea = os.path.join(self.sharedArea, self.workArea)
108104

109-
parallelLibraryName = self.ceParameters.get("ParallelLibrary")
110-
if parallelLibraryName:
111-
result = self.loadParallelLibrary(parallelLibraryName, self.executableArea)
112-
if not result["OK"]:
113-
self.log.error("Failed to load the parallel library plugin %s", parallelLibraryName)
114-
return result
115-
116105
result = self._prepareHost()
117106
if not result["OK"]:
118107
self.log.error("Failed to initialize CE", self.ceName)
@@ -181,14 +170,6 @@ def _prepareHost(self):
181170
return S_OK()
182171

183172
def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
184-
copyExecutable = os.path.join(self.executableArea, os.path.basename(executableFile))
185-
if self.parallelLibrary and executableFile != copyExecutable:
186-
# Because we use a parallel library, the executable will become a dependency of the parallel library script
187-
# Thus, it has to be defined in a specific area (executableArea) to be found and executed properly
188-
# For this reason, we copy the executable from its location to executableArea
189-
shutil.copy(executableFile, copyExecutable)
190-
executableFile = copyExecutable
191-
192173
if not os.access(executableFile, 5):
193174
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
194175

@@ -206,10 +187,6 @@ def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
206187
else: # no proxy
207188
submitFile = executableFile
208189

209-
if self.parallelLibrary:
210-
# Wrap the executable to be executed multiple times in parallel via a parallel library
211-
submitFile = self.parallelLibrary.generateWrapper(submitFile)
212-
213190
jobStamps = []
214191
for _i in range(numberOfJobs):
215192
jobStamps.append(makeGuid()[:8])
@@ -229,7 +206,7 @@ def submitJob(self, executableFile, proxy=None, numberOfJobs=1):
229206
"NumberOfGPUs": self.numberOfGPUs,
230207
}
231208
resultSubmit = self.batchSystem.submitJob(**batchDict)
232-
if proxy or self.parallelLibrary:
209+
if proxy:
233210
os.remove(submitFile)
234211

235212
if resultSubmit["Status"] == 0:
@@ -311,9 +288,6 @@ def getJobOutput(self, jobID, localDir=None):
311288
return result
312289

313290
jobStamp, _host, outputFile, errorFile = result["Value"]
314-
if self.parallelLibrary:
315-
# outputFile and errorFile are directly modified by parallelLib
316-
self.parallelLibrary.processOutput(outputFile, errorFile)
317291

318292
if not localDir:
319293
tempDir = tempfile.mkdtemp()

src/DIRAC/Resources/Computing/ParallelLibraries/ParallelLibrary.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
1616
The Parallel Libraries have to be defined in a CE section via the configuration system. See the page about
1717
configuring :ref:`resourcesComputing` for where the options can be placed.
18-
19-
WorkingDirectory:
20-
Generally defined in the ComputingElement source code depending on how they deal with the executables.
21-
Indicates where the parallel library wrapper script has to be defined and retrieved.
22-
2318
"""
2419
from __future__ import absolute_import
2520
from __future__ import division

src/DIRAC/Resources/Computing/ParallelLibraries/Srun.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515

1616
class Srun(ParallelLibrary):
17-
def __init__(self, workingDirectory):
18-
super(Srun, self).__init__("srun", workingDirectory)
17+
def __init__(self):
18+
super(Srun, self).__init__("srun")
1919

2020
def generateWrapper(self, executableFile):
2121
"""
@@ -25,21 +25,25 @@ def generateWrapper(self, executableFile):
2525
:param str executableFile: name of the executable file to wrap
2626
:return str: name of the wrapper that runs the executable via srun
2727
"""
28+
wrapper = 'srunExec.sh'
29+
with open(executableFile, 'r') as f:
30+
content = f.read()
2831

2932
# Build the script to run the executable in parallel multiple times
30-
# srun is the command to execute a task multiple time in parallel
31-
# -l option: add the task ID to the output
32-
# -k option: do not kill the slurm job if one of the nodes is broken
33-
cmd = "#!/bin/bash\n"
34-
cmd += "srun -l -k %s" % shlex_quote(executableFile)
35-
self.log.debug("Command generated by Srun:", "%s" % cmd)
36-
37-
srunWrapper = os.path.join(self.workingDirectory, "srunExec.sh")
38-
with open(srunWrapper, "w") as exe:
39-
exe.write(cmd)
40-
if not os.access(srunWrapper, os.R_OK | os.X_OK):
41-
os.chmod(srunWrapper, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)
42-
return srunWrapper
33+
# - Embed the content of executableFile inside the parallel library wrapper script
34+
# - srun is the command to execute a task multiple time in parallel
35+
# -l option: add the task ID to the output
36+
# -k option: do not kill the slurm job if one of the nodes is broken
37+
cmd = """#!/bin/bash
38+
cat > %(wrapper)s << EOFEXEC
39+
%(content)s
40+
EOFEXEC
41+
chmod 755 %(wrapper)s
42+
srun -l -k %(wrapper)s
43+
""" % dict(wrapper=wrapper,
44+
content=content
45+
)
46+
return cmd
4347

4448
def processOutput(self, output, error, isFile=True):
4549
"""

src/DIRAC/Resources/Computing/ParallelLibraries/test/Test_ParallelLibraries.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,19 @@
1818
echo "hello world"
1919
"""
2020

21+
expectedContent = """#!/bin/bash
22+
23+
cat > srunExec.sh << EOFEXEC
24+
25+
#!/bin/bash
26+
27+
echo "hello world"
28+
29+
EOFEXEC
30+
chmod 755 srunExec.sh
31+
srun -l -k srunExec.sh
32+
"""
33+
2134
srunOutput = """
2235
1: line1
2336
1: line2
@@ -61,14 +74,14 @@
6174

6275

6376
@pytest.fixture
64-
def generateParallelLibrary(parallelLibrary, parameters):
77+
def generateParallelLibrary(parallelLibrary):
6578
"""Instantiate the requested Parallel Library"""
6679
# Instantiate an object from parallelLibrary class
6780
parallelLibraryPath = "DIRAC.Resources.Computing.ParallelLibraries.%s" % parallelLibrary
6881
plugin = __import__(parallelLibraryPath, globals(), locals(), [parallelLibrary]) # pylint: disable=unused-variable
6982
# Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time
7083
reload_module(plugin)
71-
parallelLibraryStr = "plugin.%s(%s)" % (parallelLibrary, parameters)
84+
parallelLibraryStr = "plugin.%s()" % (parallelLibrary)
7285
return eval(parallelLibraryStr)
7386

7487

@@ -100,6 +113,26 @@ def test_generateWrapper(generateParallelLibrary, parallelLibrary, parameters, e
100113
os.remove(executableFile)
101114
os.remove(res)
102115

116+
@pytest.mark.parametrize(
117+
"parallelLibrary, expectedContent",
118+
[
119+
('Srun', expectedContent),
120+
],
121+
)
122+
def test_generateWrapper(generateParallelLibrary, parallelLibrary, expectedContent):
123+
""" Test generateWrapper()"""
124+
parallelLibraryInstance = generateParallelLibrary
125+
126+
executableFile = 'executableFile.sh'
127+
with open(executableFile, 'w') as f:
128+
f.write(executableContent)
129+
130+
res = parallelLibraryInstance.generateWrapper(executableFile)
131+
# Make sure a wrapper file has been generated and is executable
132+
assert res == expectedContent
133+
134+
os.remove(executableFile)
135+
103136

104137
@pytest.mark.parametrize(
105138
"parallelLibrary, parameters, outputContent, expectedContent",

src/DIRAC/Resources/Computing/SSHComputingElement.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@
1010
Underlying batch system that is going to be used to orchestrate executable files. The Batch System has to be
1111
accessible from the LocalCE. By default, the LocalComputingElement submits directly on the host via the Host class.
1212
13-
ParallelLibrary:
14-
Underlying parallel library used to generate a wrapper around the executable files to run them in parallel on
15-
multiple nodes.
16-
1713
SharedArea:
1814
Area used to store executable/output/error files if they are not aready defined via BatchOutput, BatchError,
1915
InfoArea, ExecutableArea and/or WorkArea. The path should be absolute.
@@ -412,13 +408,6 @@ def _reset(self):
412408
if not self.workArea.startswith("/"):
413409
self.workArea = os.path.join(self.sharedArea, self.workArea)
414410

415-
parallelLibraryName = self.ceParameters.get("ParallelLibrary")
416-
if parallelLibraryName:
417-
result = self.loadParallelLibrary(parallelLibraryName)
418-
if not result["OK"]:
419-
self.log.error("Failed to load the parallel library plugin %s", parallelLibraryName)
420-
return result
421-
422411
self.submitOptions = self.ceParameters.get("SubmitOptions", "")
423412
self.removeOutput = True
424413
if "RemoveOutput" in self.ceParameters:
@@ -599,21 +588,13 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
599588
else: # no proxy
600589
submitFile = executableFile
601590

602-
inputs = []
603-
if self.parallelLibrary:
604-
# In this case, the executable becomes an input of a parallel library script.
605-
# It needs to be submitted along with the submitFile, which is a parallel library wrapper.
606-
inputFile = os.path.join(self.executableArea, os.path.basename(submitFile))
607-
inputs.append(inputFile)
608-
submitFile = self.parallelLibrary.generateWrapper(inputFile)
609-
610-
result = self._submitJobToHost(submitFile, numberOfJobs, inputs=inputs)
611-
if proxy or self.parallelLibrary:
591+
result = self._submitJobToHost(submitFile, numberOfJobs)
592+
if proxy:
612593
os.remove(submitFile)
613594

614595
return result
615596

616-
def _submitJobToHost(self, executableFile, numberOfJobs, host=None, inputs=None):
597+
def _submitJobToHost(self, executableFile, numberOfJobs, host=None):
617598
"""Submit prepared executable to the given host"""
618599
ssh = SSH(host=host, parameters=self.ceParameters)
619600
# Copy the executable
@@ -622,14 +603,6 @@ def _submitJobToHost(self, executableFile, numberOfJobs, host=None, inputs=None)
622603
if not result["OK"]:
623604
return result
624605

625-
# Copy the executable dependencies if any
626-
if inputs:
627-
for localInput in inputs:
628-
remoteInput = os.path.join(self.executableArea, os.path.basename(localInput))
629-
result = ssh.scpCall(30, localInput, remoteInput, postUploadCommand="chmod +x %s" % remoteInput)
630-
if not result["OK"]:
631-
return result
632-
633606
jobStamps = []
634607
for _i in range(numberOfJobs):
635608
jobStamps.append(makeGuid()[:8])
@@ -849,7 +822,4 @@ def getJobOutput(self, jobID, localDir=None):
849822
output = result["Value"][1]
850823
error = result["Value"][1]
851824

852-
if self.parallelLibrary:
853-
output, error = self.parallelLibrary.processOutput(output, error, isFile=bool(localDir))
854-
855825
return S_OK((output, error))

0 commit comments

Comments
 (0)