Skip to content

Commit 0b28e27

Browse files
authored
Merge pull request #5702 from aldbr/cherry-pick-2-0e31badcd-integration
[sweep:integration] Multi-node allocations: Embed executableFile in the srun Wrapper
2 parents 4fe4b98 + e1b8670 commit 0b28e27

File tree

10 files changed

+308
-425
lines changed

10 files changed

+308
-425
lines changed

src/DIRAC/Resources/Computing/BatchSystems/SLURM.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import re
1212
import subprocess
1313
import shlex
14+
import random
1415

1516
__RCSID__ = "$Id$"
1617

@@ -51,6 +52,10 @@ def submitJob(self, **kwargs):
5152
errFile = os.path.expandvars(errFile)
5253
executable = os.path.expandvars(executable)
5354

55+
# There are more than 1 node, we have to run the executable in parallel on different nodes using srun
56+
if numberOfNodes != "1":
57+
executable = self._generateSrunWrapper(executable)
58+
5459
jobIDs = []
5560
for _i in range(nJobs):
5661
jid = ""
@@ -95,6 +100,10 @@ def submitJob(self, **kwargs):
95100
jid = jid.strip()
96101
jobIDs.append(jid)
97102

103+
# Delete the srun wrapper
104+
if numberOfNodes != "1":
105+
os.remove(executable)
106+
98107
if jobIDs:
99108
resultDict["Status"] = 0
100109
resultDict["Jobs"] = jobIDs
@@ -103,6 +112,35 @@ def submitJob(self, **kwargs):
103112
resultDict["Message"] = error
104113
return resultDict
105114

115+
def _generateSrunWrapper(self, executableFile):
116+
"""
117+
Associate the executable with srun, to execute the same command in parallel on multiple nodes.
118+
Wrap it in a new executable file
119+
120+
:param str executableFile: name of the executable file to wrap
121+
:return str: name of the wrapper that runs the executable via srun
122+
"""
123+
suffix = random.randrange(1, 99999)
124+
wrapper = "srunExec_%s.sh" % suffix
125+
with open(executableFile, "r") as f:
126+
content = f.read()
127+
128+
# Build the script to run the executable in parallel multiple times
129+
# - Embed the content of executableFile inside the parallel library wrapper script
130+
# - srun is the command to execute a task multiple time in parallel
131+
# -l option: add the task ID to the output
132+
# -k option: do not kill the slurm job if one of the nodes is broken
133+
cmd = """#!/bin/bash
134+
cat > %(wrapper)s << EOFEXEC
135+
%(content)s
136+
EOFEXEC
137+
chmod 755 %(wrapper)s
138+
srun -l -k %(wrapper)s
139+
""" % dict(
140+
wrapper=wrapper, content=content
141+
)
142+
return cmd
143+
106144
def killJob(self, **kwargs):
107145
"""Delete a job from SLURM batch scheduler. Input: list of jobs output: int"""
108146

@@ -267,3 +305,86 @@ def getCEStatus(self, **kwargs):
267305
resultDict["Waiting"] = waitingJobs
268306
resultDict["Running"] = runningJobs
269307
return resultDict
308+
309+
def getJobOutputFiles(self, **kwargs):
310+
"""Get output file names and templates for the specific CE
311+
312+
Reorder the content of the output files according to the node identifier
313+
if multiple nodes were involved.
314+
315+
From:
316+
>>> 1: line1
317+
>>> 2: line1
318+
>>> 1: line2
319+
To:
320+
>>> # On node 1
321+
>>> line1
322+
>>> line2
323+
>>> # On node 2
324+
>>> line1
325+
"""
326+
resultDict = {}
327+
328+
MANDATORY_PARAMETERS = ["JobIDList", "OutputDir", "ErrorDir"]
329+
for argument in MANDATORY_PARAMETERS:
330+
if argument not in kwargs:
331+
resultDict["Status"] = -1
332+
resultDict["Message"] = "No %s" % argument
333+
return resultDict
334+
335+
outputDir = kwargs["OutputDir"]
336+
errorDir = kwargs["ErrorDir"]
337+
jobIDList = kwargs["JobIDList"]
338+
numberOfNodes = kwargs.get("NumberOfNodes", "1")
339+
340+
jobDict = {}
341+
for jobID in jobIDList:
342+
output = "%s/%s.out" % (outputDir, jobID)
343+
error = "%s/%s.err" % (errorDir, jobID)
344+
345+
if numberOfNodes != "1":
346+
self._openFileAndSortOutput(output)
347+
self._openFileAndSortOutput(error)
348+
349+
jobDict[jobID] = {}
350+
jobDict[jobID]["Output"] = output
351+
jobDict[jobID]["Error"] = error
352+
353+
resultDict["Status"] = 0
354+
resultDict["Jobs"] = jobDict
355+
return resultDict
356+
357+
def _openFileAndSortOutput(self, outputFile):
358+
"""
359+
Open a file, get its content and reorder it according to the node identifiers
360+
361+
:param str outputFile: name of the file to sort
362+
"""
363+
with open(outputFile, "r") as f:
364+
outputContent = f.read()
365+
366+
sortedContent = self._sortOutput(outputContent)
367+
368+
with open(outputFile, "w") as f:
369+
f.write(sortedContent)
370+
371+
def _sortOutput(self, outputContent):
372+
"""
373+
Reorder the content of the output file according to the node identifiers
374+
375+
:param str outputContent: content to sort
376+
:return str: content sorted
377+
"""
378+
outputLines = outputContent.split("\n")
379+
nodes = {}
380+
for line in outputLines:
381+
node, line_content = line.split(":", 1)
382+
if node not in nodes:
383+
nodes[node] = []
384+
nodes[node].append(line_content)
385+
386+
content = ""
387+
for node, lines in nodes.items():
388+
content += "# On node %s\n\n" % node
389+
content += "\n".join(lines) + "\n"
390+
return content
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
""" Test SLURM
2+
"""
3+
from __future__ import absolute_import
4+
from __future__ import division
5+
from __future__ import print_function
6+
import pytest
7+
import os
8+
from six.moves import reload_module
9+
10+
from DIRAC import S_OK, gLogger
11+
from DIRAC.Resources.Computing.BatchSystems.SLURM import SLURM
12+
13+
14+
gLogger.setLevel("DEBUG")
15+
16+
executableContent = """
17+
#!/bin/bash
18+
19+
echo "hello world"
20+
"""
21+
22+
expectedContent = """#!/bin/bash
23+
cat > srunExec_1.sh << EOFEXEC
24+
25+
#!/bin/bash
26+
27+
echo "hello world"
28+
29+
EOFEXEC
30+
chmod 755 srunExec_1.sh
31+
srun -l -k srunExec_1.sh
32+
"""
33+
34+
srunOutput = """
35+
1: line1
36+
1: line2
37+
2: line1
38+
1: line3
39+
3: line1
40+
2: line2
41+
3: line2
42+
2: line3
43+
3: line3
44+
"""
45+
46+
srunExpected1 = """
47+
# On node 1
48+
49+
line1
50+
line2
51+
line3
52+
"""
53+
54+
55+
srunExpected2 = """
56+
# On node 3
57+
58+
line1
59+
line2
60+
line3
61+
"""
62+
63+
64+
srunExpected3 = """
65+
# On node 2
66+
67+
line1
68+
line2
69+
line3
70+
"""
71+
72+
73+
srunExpected = [srunExpected1, srunExpected2, srunExpected3]
74+
75+
76+
normalOutput = """
77+
line1
78+
line2
79+
line3
80+
"""
81+
82+
83+
normalExpected = """
84+
line1
85+
line2
86+
line3
87+
"""
88+
89+
90+
@pytest.mark.parametrize(
91+
"expectedContent",
92+
[
93+
(expectedContent),
94+
],
95+
)
96+
def test_generateWrapper(mocker, expectedContent):
97+
"""Test generateWrapper()"""
98+
mocker.patch("DIRAC.Resources.Computing.BatchSystems.SLURM.random.randrange", return_value=1)
99+
slurm = SLURM()
100+
101+
executableFile = "executableFile.sh"
102+
with open(executableFile, "w") as f:
103+
f.write(executableContent)
104+
105+
res = slurm._generateSrunWrapper(executableFile)
106+
# Make sure a wrapper file has been generated and is executable
107+
assert res == expectedContent
108+
109+
os.remove(executableFile)
110+
111+
112+
@pytest.mark.parametrize(
113+
"numberOfNodes, outputContent, expectedContent",
114+
[
115+
("3-5", srunOutput, srunExpected),
116+
("1", normalOutput, normalExpected),
117+
],
118+
)
119+
def test_getJobOutputFiles(numberOfNodes, outputContent, expectedContent):
120+
"""Test getJobOutputFiles()"""
121+
slurm = SLURM()
122+
123+
# We remove the '\n' at the beginning/end of the file because there are not present in reality
124+
outputContent = outputContent.strip()
125+
# We only remove the '\n' at the beginning because processOutput adds a '\n' at the end
126+
expectedContent = [i.lstrip() for i in expectedContent]
127+
128+
outputFile = "./1234.out"
129+
with open(outputFile, "w") as f:
130+
f.write(outputContent)
131+
132+
errorFile = "./1234.err"
133+
with open(errorFile, "w") as f:
134+
f.write(outputContent)
135+
136+
batchDict = {
137+
"JobIDList": ["1234"],
138+
"OutputDir": ".",
139+
"ErrorDir": ".",
140+
"NumberOfNodes": numberOfNodes,
141+
}
142+
result = slurm.getJobOutputFiles(**batchDict)
143+
assert result["Status"] == 0
144+
145+
output = result["Jobs"]["1234"]["Output"]
146+
error = result["Jobs"]["1234"]["Error"]
147+
assert output == outputFile
148+
assert error == errorFile
149+
150+
with open(outputFile, "r") as f:
151+
wrapperContent = f.read()
152+
for srunLines in expectedContent:
153+
assert srunLines in wrapperContent
154+
155+
os.remove(outputFile)
156+
os.remove(errorFile)
File renamed without changes.

src/DIRAC/Resources/Computing/ComputingElement.py

Lines changed: 9 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def __init__(self, ceName):
7979
self.valid = None
8080
self.mandatoryParameters = []
8181
self.batchSystem = None
82-
self.parallelLibrary = None
8382
self.taskResults = {}
8483
self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs
8584
self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs
@@ -195,52 +194,19 @@ def loadBatchSystem(self, batchSystemName):
195194
196195
:param str batchSystemName: name of the batch system
197196
"""
198-
result = self._loadCEAttribute("BatchSystem", batchSystemName, "Resources.Computing.BatchSystems")
199-
if not result["OK"]:
200-
return result
201-
self.batchSystem = result["Value"]
202-
return S_OK()
203-
204-
def loadParallelLibrary(self, parallelLibraryName, workingDirectory="."):
205-
"""Instantiate object representing the parallel library that will generate a script to wrap the executable
206-
207-
:param str parallelLibraryName: name of the parallel library
208-
"""
209-
result = self._loadCEAttribute(
210-
"ParallelLibrary",
211-
parallelLibraryName,
212-
"Resources.Computing.ParallelLibraries",
213-
workingDirectory=workingDirectory,
214-
)
215-
if not result["OK"]:
216-
return result
217-
self.parallelLibrary = result["Value"]
218-
return S_OK()
219-
220-
def _loadCEAttribute(self, ceAttribute, typeAttribute, moduleLocation, **kwargs):
221-
"""Instantiate specific CE attributes
222-
223-
:param str ceAttribute: name of the CE attribute
224-
:param str typeAttribute: class of the CE attribute
225-
:param str moduleLocation: location of the class definition in the source code
226-
:param dict kwargs: parameters to pass to the constructor of the ceAttribute instance
227-
"""
228-
if typeAttribute is None:
229-
typeAttribute = self.ceParameters[ceAttribute]
197+
if batchSystemName is None:
198+
batchSystemName = self.ceParameters["BatchSystem"]
230199

231200
objectLoader = ObjectLoader()
232-
moduleLocation += ".%s" % typeAttribute
233-
result = objectLoader.loadObject(moduleLocation, typeAttribute)
201+
result = objectLoader.loadObject("Resources.Computing.BatchSystems.%s" % batchSystemName, batchSystemName)
234202
if not result["OK"]:
235-
self.log.error("Failed to load %s object: %s" % (ceAttribute, result["Message"]))
203+
self.log.error("Failed to load batch object: %s" % result["Message"])
236204
return result
237-
ceAttributeClass = result["Value"]
238-
ceAttributeModuleFile = result["ModuleFile"]
239-
240-
ceAttributeObject = ceAttributeClass(**kwargs)
241-
self.log.info("Class from module: ", ceAttributeModuleFile)
242-
243-
return S_OK(ceAttributeObject)
205+
batchClass = result["Value"]
206+
batchModuleFile = result["ModuleFile"]
207+
self.batchSystem = batchClass()
208+
self.log.info("Batch system class from module: ", batchModuleFile)
209+
return S_OK()
244210

245211
def setParameters(self, ceOptions):
246212
"""Add parameters from the given dictionary overriding the previous values

0 commit comments

Comments
 (0)