Skip to content

Commit f23b11e

Browse files
author
Vasileios Karakasis
authored
Merge pull request #2458 from ekouts/feat/alt_flex_alloc
[feat] Add new command line option to distribute single node jobs on multiple cluster nodes
2 parents 8c178ab + 631be2e commit f23b11e

File tree

11 files changed

+374
-28
lines changed

11 files changed

+374
-28
lines changed

docs/manpage.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,36 @@ Options controlling ReFrame execution
379379

380380
.. versionadded:: 3.2
381381

382+
.. option:: --distribute[=NODESTATE]
383+
384+
Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions.
385+
386+
ReFrame will parameterize and run the tests on the selected nodes.
387+
Effectively, it will dynamically create new tests that inherit from the original tests and add a new parameter named ``$nid`` which contains the list of nodes that the test must run on.
388+
The new tests are named with the following pattern ``{orig_test_basename}_{partition_fullname}``.
389+
390+
When determining the list of nodes to distribute the selected tests, ReFrame will take into account any job options passed through the :option:`-J` option.
391+
392+
You can optionally specify the state of the nodes to consider when distributing the test through the ``NODESTATE`` argument:
393+
394+
- ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the nodes' state.
395+
- ``idle``: Tests will run on all *idle* nodes of their respective valid partitions.
396+
- ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE`` of their respective valid partitions.
397+
If ``NODESTATE`` is not specified, ``idle`` will be assumed.
398+
399+
The state of the nodes will be determined once, before beginning the
400+
execution of the tests, so it might be different at the time the tests are actually submitted.
401+
402+
.. note::
403+
Currently, only single-node jobs can be distributed and only local or the Slurm-based backends support this feature.
404+
405+
.. note::
406+
Distributing tests with dependencies is not supported.
407+
However, you can distribute tests that use fixtures.
408+
409+
410+
.. versionadded:: 3.11.0
411+
382412
.. option:: --exec-policy=POLICY
383413

384414
The execution policy to be used for running tests.

reframe/core/meta.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ class MyTest(rfm.RegressionTest):
705705
...
706706
707707
# Get the raw info for variant 0
708-
MyTest.get_variant_info(0, recursive=True)
708+
MyTest.get_variant_info(0, recurse=True)
709709
# {
710710
# 'params': {'p1': 'a'},
711711
# 'fixtures': {
@@ -846,7 +846,7 @@ def make_test(name, bases, body, methods=None, **kwargs):
846846
class HelloTest(rfm.RunOnlyRegressionTest):
847847
valid_systems = ['*']
848848
valid_prog_environs = ['*']
849-
executable = 'echo',
849+
executable = 'echo'
850850
sanity_patterns: sn.assert_true(1)
851851
852852
hello_cls = HelloTest

reframe/core/schedulers/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,18 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
265265
#: :type: :class:`reframe.core.launchers.JobLauncher`
266266
launcher = variable(JobLauncher)
267267

268+
#: Pin the jobs on the given nodes.
269+
#:
270+
#: The list of nodes will be transformed to a suitable string and be
271+
#: passed to the scheduler's options. Currently it will have an effect
272+
#: only for the Slurm scheduler.
273+
#:
274+
#: :type: :class:`List[str]`
275+
#: :default: ``[]``
276+
#:
277+
#: .. versionadded:: 3.11.0
278+
pin_nodes = variable(typ.List[str], value=[])
279+
268280
# The sched_* arguments are exposed also to the frontend
269281
def __init__(self,
270282
name,

reframe/core/schedulers/local.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,5 +202,9 @@ class _LocalNode(sched.Node):
202202
def __init__(self, name):
203203
self._name = name
204204

205+
@property
206+
def name(self):
207+
return self._name
208+
205209
def in_state(self, state):
206210
return state.casefold() == 'idle'

reframe/core/schedulers/slurm.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import glob
88
import itertools
99
import re
10+
import shlex
1011
import time
1112
from argparse import ArgumentParser
1213
from contextlib import suppress
@@ -19,7 +20,7 @@
1920
JobBlockedError,
2021
JobError,
2122
JobSchedulerError)
22-
from reframe.utility import seconds_to_hms
23+
from reframe.utility import nodelist_abbrev, seconds_to_hms
2324

2425

2526
def slurm_state_completed(state):
@@ -192,6 +193,14 @@ def emit_preamble(self, job):
192193
else:
193194
hint = 'multithread' if job.use_smt else 'nomultithread'
194195

196+
if job.pin_nodes:
197+
preamble.append(
198+
self._format_option(
199+
nodelist_abbrev(job.pin_nodes),
200+
'--nodelist={0}'
201+
)
202+
)
203+
195204
for opt in job.sched_access:
196205
if not opt.strip().startswith(('-C', '--constraint')):
197206
preamble.append('%s %s' % (self._prefix, opt))
@@ -297,6 +306,11 @@ def filternodes(self, job, nodes):
297306
# create a mutable list out of the immutable SequenceView that
298307
# sched_access is
299308
options = job.sched_access + job.options + job.cli_options
309+
310+
# Properly split lexically all the arguments in the options list so as
311+
# to treat correctly entries such as '--option foo'.
312+
options = list(itertools.chain.from_iterable(shlex.split(opt)
313+
for opt in options))
300314
option_parser = ArgumentParser()
301315
option_parser.add_argument('--reservation')
302316
option_parser.add_argument('-p', '--partition')

reframe/frontend/cli.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
import reframe.utility.typecheck as typ
3131

3232

33-
from reframe.frontend.printer import PrettyPrinter
34-
from reframe.frontend.loader import RegressionCheckLoader
33+
from reframe.frontend.distribute import distribute_tests, getallnodes
3534
from reframe.frontend.executors.policies import (SerialExecutionPolicy,
3635
AsynchronousExecutionPolicy)
3736
from reframe.frontend.executors import Runner, generate_testcases
37+
from reframe.frontend.loader import RegressionCheckLoader
38+
from reframe.frontend.printer import PrettyPrinter
3839

3940

4041
def format_env(envvars):
@@ -370,6 +371,12 @@ def main():
370371
'--disable-hook', action='append', metavar='NAME', dest='hooks',
371372
default=[], help='Disable a pipeline hook for this run'
372373
)
374+
run_options.add_argument(
375+
'--distribute', action='store', metavar='{all|STATE}',
376+
nargs='?', const='idle',
377+
help=('Distribute the selected single-node jobs on every node that'
378+
'is in STATE (default: "idle"')
379+
)
373380
run_options.add_argument(
374381
'--exec-policy', metavar='POLICY', action='store',
375382
choices=['async', 'serial'], default='async',
@@ -933,6 +940,19 @@ def print_infoline(param, value):
933940
print_infoline('output directory', repr(session_info['prefix_output']))
934941
printer.info('')
935942
try:
943+
# Need to parse the cli options before loading the tests
944+
parsed_job_options = []
945+
for opt in options.job_options:
946+
opt_split = opt.split('=', maxsplit=1)
947+
optstr = opt_split[0]
948+
valstr = opt_split[1] if len(opt_split) > 1 else ''
949+
if opt.startswith('-') or opt.startswith('#'):
950+
parsed_job_options.append(opt)
951+
elif len(optstr) == 1:
952+
parsed_job_options.append(f'-{optstr} {valstr}')
953+
else:
954+
parsed_job_options.append(f'--{optstr} {valstr}')
955+
936956
# Locate and load checks; `force=True` is not needed for normal
937957
# invocations from the command line and has practically no effect, but
938958
# it is needed to better emulate the behavior of running reframe's CLI
@@ -1015,6 +1035,22 @@ def _case_failed(t):
10151035
f'{len(testcases)} remaining'
10161036
)
10171037

1038+
if options.distribute:
1039+
node_map = getallnodes(options.distribute, parsed_job_options)
1040+
1041+
# Remove the job options that begin with '--nodelist' and '-w', so
1042+
# that they do not override those set from the distribute feature.
1043+
#
1044+
# NOTE: This is Slurm-specific. When support of distributing tests
1045+
# is added to other scheduler backends, this needs to be updated,
1046+
# too.
1047+
parsed_job_options = [
1048+
x for x in parsed_job_options
1049+
if (not x.startswith('-w') and not x.startswith('--nodelist'))
1050+
]
1051+
testcases = distribute_tests(testcases, node_map)
1052+
testcases_all = testcases
1053+
10181054
# Prepare for running
10191055
printer.debug('Building and validating the full test DAG')
10201056
testgraph, skipped_cases = dependencies.build_deps(testcases_all)
@@ -1194,18 +1230,6 @@ def module_unuse(*paths):
11941230
sched_flex_alloc_nodes = options.flex_alloc_nodes
11951231

11961232
exec_policy.sched_flex_alloc_nodes = sched_flex_alloc_nodes
1197-
parsed_job_options = []
1198-
for opt in options.job_options:
1199-
opt_split = opt.split('=', maxsplit=1)
1200-
optstr = opt_split[0]
1201-
valstr = opt_split[1] if len(opt_split) > 1 else ''
1202-
if opt.startswith('-') or opt.startswith('#'):
1203-
parsed_job_options.append(opt)
1204-
elif len(optstr) == 1:
1205-
parsed_job_options.append(f'-{optstr} {valstr}')
1206-
else:
1207-
parsed_job_options.append(f'--{optstr} {valstr}')
1208-
12091233
exec_policy.sched_options = parsed_job_options
12101234
if options.maxfail < 0:
12111235
raise errors.ConfigError(
@@ -1236,7 +1260,9 @@ def module_unuse(*paths):
12361260
success = True
12371261
if runner.stats.failed():
12381262
success = False
1239-
runner.stats.print_failure_report(printer)
1263+
runner.stats.print_failure_report(
1264+
printer, not options.distribute
1265+
)
12401266
if options.failure_stats:
12411267
runner.stats.print_failure_stats(printer)
12421268

reframe/frontend/distribute.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
2+
# ReFrame Project Developers. See the top-level LICENSE file for details.
3+
#
4+
# SPDX-License-Identifier: BSD-3-Clause
5+
6+
7+
import reframe.core.builtins as builtins
8+
import reframe.core.runtime as runtime
9+
import reframe.utility as util
10+
11+
from reframe.core.decorators import TestRegistry
12+
from reframe.core.logging import getlogger
13+
from reframe.core.meta import make_test
14+
from reframe.core.schedulers import Job
15+
from reframe.frontend.executors import generate_testcases
16+
17+
18+
def getallnodes(state='all', jobs_cli_options=None):
19+
rt = runtime.runtime()
20+
nodes = {}
21+
for part in rt.system.partitions:
22+
# This job will not be submitted, it's used only to filter
23+
# the nodes based on the partition configuration
24+
dummy_job = Job.create(part.scheduler,
25+
part.launcher_type(),
26+
name='placeholder-job',
27+
sched_access=part.access,
28+
sched_options=jobs_cli_options)
29+
30+
available_nodes = part.scheduler.allnodes()
31+
available_nodes = part.scheduler.filternodes(dummy_job,
32+
available_nodes)
33+
getlogger().debug(
34+
f'Total available nodes for {part.name}: {len(available_nodes)}'
35+
)
36+
37+
if state.casefold() != 'all':
38+
available_nodes = {n for n in available_nodes
39+
if n.in_state(state)}
40+
getlogger().debug(
41+
f'[F] Selecting nodes in state {state!r}: '
42+
f'available nodes now: {len(available_nodes)}'
43+
)
44+
45+
nodes[part.fullname] = [n.name for n in available_nodes]
46+
47+
return nodes
48+
49+
50+
def _rfm_pin_run_nodes(obj):
51+
nodelist = getattr(obj, '$nid')
52+
if not obj.local:
53+
obj.job.pin_nodes = nodelist
54+
55+
56+
def _rfm_pin_build_nodes(obj):
57+
pin_nodes = getattr(obj, '$nid')
58+
if not obj.local and not obj.build_locally:
59+
obj.build_job.pin_nodes = pin_nodes
60+
61+
62+
def make_valid_systems_hook(systems):
63+
'''Returns a function to be used as a hook that sets the
64+
valid systems.
65+
66+
Since valid_systems change for each generated test, we need to
67+
generate different post-init hooks for each one of them.
68+
'''
69+
def _rfm_set_valid_systems(obj):
70+
obj.valid_systems = systems
71+
72+
return _rfm_set_valid_systems
73+
74+
75+
def distribute_tests(testcases, node_map):
76+
'''Returns new testcases that will be parameterized to run in node of
77+
their partitions based on the nodemap
78+
'''
79+
tmp_registry = TestRegistry()
80+
new_checks = []
81+
# We don't want to register the same check for every environment
82+
# per partition
83+
check_part_combs = set()
84+
for tc in testcases:
85+
check, partition, _ = tc
86+
candidate_comb = (check.unique_name, partition.fullname)
87+
if check.is_fixture() or candidate_comb in check_part_combs:
88+
continue
89+
90+
check_part_combs.add(candidate_comb)
91+
cls = type(check)
92+
variant_info = cls.get_variant_info(
93+
check.variant_num, recurse=True
94+
)
95+
nc = make_test(
96+
f'{cls.__name__}_{partition.fullname.replace(":", "_")}',
97+
(cls,),
98+
{
99+
'valid_systems': [partition.fullname],
100+
'$nid': builtins.parameter(
101+
[[n] for n in node_map[partition.fullname]],
102+
fmt=util.nodelist_abbrev
103+
)
104+
},
105+
methods=[
106+
builtins.run_before('run')(_rfm_pin_run_nodes),
107+
builtins.run_before('compile')(_rfm_pin_build_nodes),
108+
# We re-set the valid system in a hook to make sure that it
109+
# will not be overwriten by a parent post-init hook
110+
builtins.run_after('init')(
111+
make_valid_systems_hook([partition.fullname])
112+
),
113+
]
114+
)
115+
# We have to set the prefix manually
116+
nc._rfm_custom_prefix = check.prefix
117+
118+
for i in range(nc.num_variants):
119+
# Check if this variant should be instantiated
120+
vinfo = nc.get_variant_info(i, recurse=True)
121+
vinfo['params'].pop('$nid')
122+
if vinfo == variant_info:
123+
tmp_registry.add(nc, variant_num=i)
124+
125+
new_checks = tmp_registry.instantiate_all()
126+
return generate_testcases(new_checks)

0 commit comments

Comments
 (0)