Skip to content

Commit 025eca8

Browse files
authored
Merge pull request #5736 from aldbr/cherry-pick-2-b0f53a3cf-integration
[sweep:integration] Add PushJobAgent: supports Sites with no external connectivity
2 parents 77841b8 + 3fa1638 commit 025eca8

File tree

17 files changed

+1481
-344
lines changed

17 files changed

+1481
-344
lines changed

docs/source/AdministratorGuide/Resources/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ contributing with their computing and storage capacity, available as conventiona
2323
agents2CS
2424
proxyprovider
2525
identityprovider
26+
supercomputers
2627

2728

2829
Site Names
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
.. _supercomputers:
2+
3+
===========================
4+
Dealing with Supercomputers
5+
===========================
6+
7+
Supercomputers are highly heterogeneous infrastructures that can provide non-traditional architectures like:
8+
9+
- non-x86 CPUs
10+
- accelerators (GPUs, FPGAs)
11+
12+
They might make use of modern multicore and manycore architectures and are built for applications running fast on a large number of resources, having fast node-interconnectivity.
13+
They often have different policies from those of HEP Grid Sites:
14+
15+
- no internet connectivity
16+
- no CVMFS
17+
- VPN access
18+
19+
This chapter aims to help administrators to configure DIRAC in order to deal with supercomputers according to their features.
20+
21+
22+
.. toctree::
23+
:maxdepth: 1
24+
:numbered:
25+
26+
.. contents:: Table of contents
27+
:depth: 4
28+
29+
30+
--------
31+
Overview
32+
--------
33+
34+
.. image:: ../../_static/hpc_schema.png
35+
:alt: DIRAC projects interaction overview
36+
:align: center
37+
38+
39+
We have identified differences between a traditional HEP grid site and supercomputers.
40+
To run workloads on supercomputers, administrators might need to perform and combine several changes.
41+
Some can be very easy, such as an update in the CS, while some can require additional actions and analysis such as delivering a subset of CVMFS.
42+
43+
---------------------
44+
Outbound connectivity
45+
---------------------
46+
47+
Multi-core allocations
48+
----------------------
49+
50+
Supercomputers often privilege workloads exploiting many cores in parallel during a short time (HPC).
51+
This means they allow a small number of large allocations of resources.
52+
Grid applications are usually not adapted: they are embarrassingly parallel, run on a single core for a long period (HTC).
53+
54+
To exploit the manycore nodes of supercomputers and avoid a waste of computing resources, DIRAC can leverage the fat-node partitioning mechanism.
55+
This consists in submitting a Pilot-Job on a node, which can then fetch and run multiple workloads in parallel.
56+
To set it up, one has to add two options in the CE configuration: ``LocalCEType=Pool`` and ``NumberOfProcessors=<N>``, ``N`` being the number of cores per worker node.
57+
58+
For further details about the CE options, consult :ref:`the Sites section <cs-site>`.
59+
60+
61+
Multi-node allocations
62+
----------------------
63+
64+
In the same way, some supercomputers have specific partitions only accessible for applications exploiting multiple manycore nodes simultaneously in the same allocation.
65+
To exploit the many-node allocations, DIRAC allows to generate one sub-pilot per node allocated.
66+
Sub-pilots run in parallel and share the same identifier, output and status.
67+
68+
This option is currently only available via :mod:`~DIRAC.Resources.Computing.BatchSystems.SLURM`.
69+
To use sub-pilots in many node allocations, one has to add an additional options in the CE configuration: ``NumberOfNodes=<min-max>``.
70+
71+
For further details about the CE options, consult :ref:`the Sites section <cs-site>`.
72+
73+
CVMFS not available
74+
-------------------
75+
76+
Workloads having thousands of dependencies are generally delivered via CVMFS on grid sites.
77+
By default, CVMFS is not mounted on the nodes of supercomputers.
78+
One can talk with the system administrators to discuss the possibility of having it mounted on the worker nodes.
79+
If this is not possible, then one has to use `cvmfs-exec <https://github.com/cvmfs/cvmfsexec>`_.
80+
It allows mounting CVMFS as an unprivileged user, without the CVMFS package being installed by a system administrator.
81+
82+
This action is purely a VO action: the package has to be installed on the worker node before starting the job.
83+
The solution has not been integrated into DIRAC yet.
84+
85+
86+
LRMS not accessible
87+
-------------------
88+
89+
LRMS, also called Batch System, is the component that orchestrates the worker nodes and the workload on a site.
90+
On grid sites, a LRMS is often accessible via a CE, and if a CE is not available, then one can interact directly with it via SSH: DIRAC handles both cases.
91+
92+
Nevertheless, supercomputers have more restrictive access policies than grid sites and may protect the facility access with a VPN.
93+
In this situation, one can run a :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` combined to a :mod:`~DIRAC.Resources.Computing.LocalComputingElement` directly on the edge node of the supercomputer.
94+
This allows submitting pilots from the edge of the supercomputer to the worker nodes directly.
95+
96+
Supercomputers often do not allow users to execute a program from the edge node for a long time.
97+
To address this problem, one can call the Site Director in a cron job, executed every N minutes for 1 cycle.
98+
99+
Also, to generate pilot proxies, the Site Director has to rely on a host certificate: one has to contact a system administrator for that.
100+
101+
----------------------------------
102+
Only partial outbound connectivity
103+
----------------------------------
104+
105+
This case has not been addressed yet.
106+
107+
------------------------
108+
No outbound connectivity
109+
------------------------
110+
111+
Solutions seen in the previous section cannot work in an environment without external connectivity.
112+
The well-known Pilot-Job paradigm on which the DIRAC WMS is based does not apply in these circumstances: the Pilot-Jobs cannot fetch jobs from DIRAC.
113+
Thus, such supercomputers require slightly changes in the WMS: we reintroduced the push model.
114+
115+
To leverage the Push model, one has to add the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` to the ``Systems/WorkloadManagement/<Setup>/Agents``
116+
CS section, such as::
117+
118+
Systems
119+
PushJobAgent_<Name>
120+
{
121+
CEs = <CEs>
122+
Sites = <Sites>
123+
CETypes = <CETypes>
124+
MaxJobsToSubmit = 100
125+
Module = PushJobAgent
126+
}
127+
128+
:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` inherits from :mod:`~DIRAC.WorkloadManagementSystem.Agent.JobAgent` and proposes a similar structure: it fetches a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service and submit it to a :mod:`~DIRAC.Resources.Computing.PoolComputingElement`.
129+
- It provides an additional parameter in ``/LocalSite`` named ``RemoteExecution`` that can be used later in the process to identify computing resources with no external connectivity.
130+
- There is no ``timeLeft`` attribute: it runs on the DIRAC side as an ``Agent``.
131+
- ``MaxJobsToSubmit`` corresponds to the maximum number of jobs the agent can handle at the same time.
132+
- To fetch a job, :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` sends the dictionary of the target CE to the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service.
133+
134+
:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` does not inherit from :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` but embeds similar features:
135+
- It supervises specific Sites/CEs/Queues.
136+
- If there is an error with a queue, it puts it on hold and waits for a certain number of cycles before trying again.
137+
138+
Internally, the workflow modules originally in charge of executing the script/application (:mod:`~DIRAC.Workflow.Modules.Script`) check whether the workload should be
139+
sent to a remote location before acting.
140+
:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` attempts to extract the value from the environment variable initialized by the :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper`.
141+
If the variable is not set, then the application is run locally via ``systemCall()``, else the application is submitted to a remote Computing Element such as ARC.
142+
:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` wraps the script/application command in an executable, gets all the files of the working directory that correspond to input files and submits the executable along with the input files. It gets the status of the application submitted every 2 minutes until it is finished and finally gets the outputs.
143+
144+
145+
Multi-core/node allocations
146+
---------------------------
147+
148+
This case has not been addressed yet.
149+
150+
CVMFS not available
151+
-------------------
152+
153+
Workloads depending on CVMFS cannot run on such infrastructure: the only possibility is to generate a subset of CVMFS, deploy it on the supercomputer,
154+
and mount it to a container.
155+
`subCVMFS-builder <https://gitlab.cern.ch/alboyer/subcvmfs-builder>`_ and `subCVMFS-builder-pipeline <https://gitlab.cern.ch/alboyer/subcvmfs-builder-pipeline>`_ are two projects aiming to assist VOs in this process.
156+
They allow to trace applications of interest, build a subset of CVMFS, test it and deploy it to a remote location.
157+
158+
To integrate the subset of CVMFS with the DIRAC workflow, one can leverage the :mod:`~DIRAC.Resources.Computing.ARCComputingElement` ``XRSLExtraString`` option such as::
159+
160+
XRSLExtraString = (runtimeEnvironment="ENV/SINGULARITY" "</path/to/singularity_container>" "" "</path/to/singularity_executable")
161+
162+
To mount the subset of CVMFS in the singularity container, one has to contact the ARC administrators to finetune the configuration or has to build a container image containing the subset with `subCVMFS-builder <https://gitlab.cern.ch/alboyer/subcvmfs-builder>`_.
163+
164+
LRMS not accessible
165+
-------------------
166+
167+
In this case, nothing can be done.

docs/source/_static/hpc_schema.png

982 KB
Loading

src/DIRAC/Resources/Computing/ARCComputingElement.py

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
Emies is another protocol that allows to interact with A-REX services that provide additional features
3232
(support of OIDC tokens).
3333
34+
Preamble:
35+
Line that should be executed just before the executable file.
36+
3437
**Code Documentation**
3538
"""
3639
from __future__ import absolute_import
@@ -51,8 +54,10 @@
5154
from DIRAC.Core.Utilities.List import breakListIntoChunks
5255
from DIRAC.Core.Security.ProxyInfo import getVOfromProxyGroup
5356
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
57+
from DIRAC.Resources.Computing.PilotBundle import writeScript
5458
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
5559

60+
5661
# Uncomment the following 5 lines for getting verbose ARC api output (debugging)
5762
# import sys
5863
# logstdout = arc.LogStream(sys.stdout)
@@ -95,18 +100,13 @@ def __init__(self, ceUniqueID):
95100
self.ceHost = self.ceName
96101
self.endpointType = "Gridftp"
97102
self.usercfg = arc.common.UserConfig()
103+
self.preamble = ""
104+
98105
# set the timeout to the default 20 seconds in case the UserConfig constructor did not
99106
self.usercfg.Timeout(20) # pylint: disable=pointless-statement
100107
self.ceHost = self.ceParameters.get("Host", self.ceName)
101108
self.gridEnv = self.ceParameters.get("GridEnv", self.gridEnv)
102109

103-
# ARC endpoint types (Gridftp, Emies)
104-
endpointType = self.ceParameters.get("EndpointType", self.endpointType)
105-
if endpointType not in ["Gridftp", "Emies"]:
106-
self.log.warn("Unknown ARC endpoint, change to default", self.endpointType)
107-
else:
108-
self.endpointType = endpointType
109-
110110
# Used in getJobStatus
111111
self.mapStates = STATES_MAP
112112
# Do these after all other initialisations, in case something barks
@@ -208,8 +208,14 @@ def _addCEConfigDefaults(self):
208208
ComputingElement._addCEConfigDefaults(self)
209209

210210
#############################################################################
211-
def __writeXRSL(self, executableFile):
212-
"""Create the JDL for submission"""
211+
def __writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None):
212+
"""Create the JDL for submission
213+
214+
:param str executableFile: executable to wrap in a XRSL file
215+
:param str/list inputs: path of the dependencies to include along with the executable
216+
:param str/list outputs: path of the outputs that we want to get at the end of the execution
217+
:param str/list executables: path to inputs that should have execution mode on the remote worker node
218+
"""
213219
diracStamp = makeGuid()[:8]
214220
# Evaluate the number of processors to allocate
215221
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
@@ -226,35 +232,82 @@ def __writeXRSL(self, executableFile):
226232
"xrslMPExtraString": self.xrslMPExtraString,
227233
}
228234

235+
# Files that would need execution rights on the remote worker node
236+
xrslExecutables = ""
237+
if executables:
238+
if not isinstance(executables, list):
239+
executables = [executables]
240+
xrslExecutables = "(executables=%s)" % " ".join(map(os.path.basename, executables))
241+
# Add them to the inputFiles
242+
if not inputs:
243+
inputs = []
244+
if not isinstance(inputs, list):
245+
inputs = [inputs]
246+
inputs += executables
247+
248+
# Dependencies that have to be embedded along with the executable
249+
xrslInputs = ""
250+
if inputs:
251+
if not isinstance(inputs, list):
252+
inputs = [inputs]
253+
for inputFile in inputs:
254+
xrslInputs += '(%s "%s")' % (os.path.basename(inputFile), inputFile)
255+
256+
# Output files to retrieve once the execution is complete
257+
xrslOutputs = '("%s.out" "") ("%s.err" "")' % (diracStamp, diracStamp)
258+
if outputs:
259+
if not isinstance(outputs, list):
260+
outputs = [outputs]
261+
for outputFile in outputs:
262+
xrslOutputs += '(%s "")' % (outputFile)
263+
229264
xrsl = """
230265
&(executable="%(executable)s")
231-
(inputFiles=(%(executable)s "%(executableFile)s"))
266+
(inputFiles=(%(executable)s "%(executableFile)s") %(xrslInputAdditions)s)
232267
(stdout="%(diracStamp)s.out")
233268
(stderr="%(diracStamp)s.err")
234-
(outputFiles=("%(diracStamp)s.out" "") ("%(diracStamp)s.err" ""))
269+
(outputFiles=%(xrslOutputFiles)s)
235270
(queue=%(queue)s)
236271
%(xrslMPAdditions)s
272+
%(xrslExecutables)s
237273
%(xrslExtraString)s
238274
""" % {
239275
"executableFile": executableFile,
240276
"executable": os.path.basename(executableFile),
277+
"xrslInputAdditions": xrslInputs,
241278
"diracStamp": diracStamp,
242279
"queue": self.arcQueue,
280+
"xrslOutputFiles": xrslOutputs,
243281
"xrslMPAdditions": xrslMPAdditions,
282+
"xrslExecutables": xrslExecutables,
244283
"xrslExtraString": self.xrslExtraString,
245284
}
246285

247286
return xrsl, diracStamp
248287

288+
def _bundlePreamble(self, executableFile):
289+
"""Bundle the preamble with the executable file"""
290+
wrapperContent = "%s\n%s" % (self.preamble, executableFile)
291+
return writeScript(wrapperContent, os.getcwd())
292+
249293
#############################################################################
250294
def _reset(self):
251295
self.queue = self.ceParameters.get("CEQueueName", self.ceParameters["Queue"])
252296
if "GridEnv" in self.ceParameters:
253297
self.gridEnv = self.ceParameters["GridEnv"]
298+
299+
self.preamble = self.ceParameters.get("Preamble", self.preamble)
300+
301+
# ARC endpoint types (Gridftp, Emies)
302+
endpointType = self.ceParameters.get("EndpointType", self.endpointType)
303+
if endpointType not in ["Gridftp", "Emies"]:
304+
self.log.warn("Unknown ARC endpoint, change to default", self.endpointType)
305+
else:
306+
self.endpointType = endpointType
254307
return S_OK()
255308

256309
#############################################################################
257-
def submitJob(self, executableFile, proxy, numberOfJobs=1):
310+
def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=None):
258311
"""Method to submit job"""
259312

260313
# Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
@@ -270,6 +323,11 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
270323
if not os.access(executableFile, 5):
271324
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)
272325

326+
executables = None
327+
if self.preamble:
328+
executables = [executableFile]
329+
executableFile = self._bundlePreamble(executableFile)
330+
273331
batchIDList = []
274332
stampDict = {}
275333

@@ -287,7 +345,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
287345
# The basic job description
288346
jobdescs = arc.JobDescriptionList()
289347
# Get the job into the ARC way
290-
xrslString, diracStamp = self.__writeXRSL(executableFile)
348+
xrslString, diracStamp = self.__writeXRSL(executableFile, inputs, outputs, executables)
291349
self.log.debug("XRSL string submitted : %s" % xrslString)
292350
self.log.debug("DIRAC stamp for job : %s" % diracStamp)
293351
# The arc bindings don't accept unicode objects in Python 2 so xrslString must be explicitly cast
@@ -332,6 +390,9 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
332390
self.log.warn("%s ... maybe above messages will give a hint." % message)
333391
break # Boo hoo *sniff*
334392

393+
if self.preamble:
394+
os.unlink(executableFile)
395+
335396
if batchIDList:
336397
result = S_OK(batchIDList)
337398
result["PilotStampDict"] = stampDict

src/DIRAC/Resources/Computing/InProcessComputingElement.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def __init__(self, ceUniqueID):
3939
self.ceParameters["MaxTotalJobs"] = 1
4040

4141
#############################################################################
42-
def submitJob(self, executableFile, proxy=None, **kwargs):
42+
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
4343
"""Method to submit job (overriding base method).
4444
4545
:param str executableFile: file to execute via systemCall.
@@ -83,6 +83,14 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
8383

8484
self.runningJobs -= 1
8585

86+
# Delete executable file and inputs in case space is limited
87+
os.unlink(executableFile)
88+
if inputs:
89+
if not isinstance(inputs, list):
90+
inputs = [inputs]
91+
for inputFile in inputs:
92+
os.unlink(inputFile)
93+
8694
ret = S_OK()
8795

8896
if not result["OK"]:

0 commit comments

Comments
 (0)