Skip to content

Commit d8af1ef

Browse files
author
Vasileios Karakasis
authored
Merge pull request #2598 from researchapps/add/flux-framework-scheduler
[feat] Add support for the Flux scheduler
2 parents 2076f04 + 949a933 commit d8af1ef

File tree

14 files changed

+491
-20
lines changed

14 files changed

+491
-20
lines changed

.github/workflows/test-flux.yaml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
name: Test Flux Scheduler
2+
on:
3+
pull_request: []
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
permissions:
9+
packages: read
10+
strategy:
11+
fail-fast: false
12+
matrix:
13+
container: ['fluxrm/flux-sched:focal']
14+
15+
container:
16+
image: ${{ matrix.container }}
17+
options: "--platform=linux/amd64 --user root -it --init"
18+
19+
name: ${{ matrix.container }}
20+
steps:
21+
- name: Make Space
22+
run: |
23+
rm -rf /usr/share/dotnet
24+
rm -rf /opt/ghc
25+
26+
- name: Checkout
27+
uses: actions/checkout@v3
28+
29+
- name: Install Reframe
30+
run: |
31+
/bin/bash ./bootstrap.sh
32+
export PATH=$PWD/bin:$PATH
33+
which reframe
34+
35+
# Any additional examples added here will be tested
36+
- name: Start Flux and Run Test
37+
run: |
38+
export PATH=$PWD/bin:$PATH
39+
which reframe
40+
flux start reframe -c tutorials/flux -C tutorials/flux/settings.py -l
41+
flux start reframe -c tutorials/flux -C tutorials/flux/settings.py --run
42+
flux start python3 ./test_reframe.py --rfm-user-config=tutorials/flux/settings.py -vvvv

docs/config_reference.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ System Partition Configuration
223223
Supported schedulers are the following:
224224

225225
- ``local``: Jobs will be launched locally without using any job scheduler.
226+
- ``flux``: Jobs will be launched using the `Flux Framework <https://flux-framework.org/>`_ scheduler.
226227
- ``oar``: Jobs will be launched using the `OAR <https://oar.imag.fr/>`__ scheduler.
227228
- ``pbs``: Jobs will be launched using the `PBS Pro <https://en.wikipedia.org/wiki/Portable_Batch_System>`__ scheduler.
228229
- ``sge``: Jobs will be launched using the `Sun Grid Engine <https://arc.liv.ac.uk/SGE/htmlman/manuals.html>`__ scheduler.

docs/tutorial_flux.rst

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
========================================
2+
Tutorial 7: The Flux Framework Scheduler
3+
========================================
4+
5+
This is a tutorial that will show how to use refame with `Flux
6+
Framework <https://github.com/flux-framework/>`__. First, build the
7+
container here from the root of reframe.
8+
9+
.. code:: bash
10+
11+
$ docker build -f tutorials/flux/Dockerfile -t flux-reframe .
12+
13+
Then shell inside, optionally binding the present working directory if
14+
you want to develop.
15+
16+
.. code:: bash
17+
18+
$ docker run -it -v $PWD:/code flux-reframe
19+
$ docker run -it flux-reframe
20+
21+
Note that if you build the local repository, you’ll need to bootstrap
22+
and install again, as we have over-written the bin!
23+
24+
.. code:: bash
25+
26+
./bootstrap.sh
27+
28+
And then reframe will again be in the local ``bin`` directory:
29+
30+
.. code:: bash
31+
32+
# which reframe
33+
/code/bin/reframe
34+
35+
Then we can run ReFrame with the custom config `config.py <config.py>`__
36+
for flux.
37+
38+
.. code:: bash
39+
40+
# What tests are under tutorials/flux?
41+
$ cd tutorials/flux
42+
$ reframe -c . -C settings.py -l
43+
44+
.. code:: console
45+
46+
[ReFrame Setup]
47+
version: 4.0.0-dev.1
48+
command: '/code/bin/reframe -c tutorials/flux -C tutorials/flux/settings.py -l'
49+
launched by: root@b1f6650222bc
50+
working directory: '/code'
51+
settings file: 'tutorials/flux/settings.py'
52+
check search path: '/code/tutorials/flux'
53+
stage directory: '/code/stage'
54+
output directory: '/code/output'
55+
56+
[List of matched checks]
57+
- EchoRandTest /66b93401
58+
Found 1 check(s)
59+
60+
Log file(s) saved in '/tmp/rfm-ilqg7fqg.log'
61+
62+
This also works
63+
64+
.. code:: bash
65+
66+
$ reframe -c tutorials/flux -C tutorials/flux/settings.py -l
67+
68+
And then to run tests, just replace ``-l`` (for list) with ``-r`` or
69+
``--run`` (for run):
70+
71+
.. code:: bash
72+
73+
$ reframe -c tutorials/flux -C tutorials/flux/settings.py --run
74+
75+
.. code:: console
76+
77+
root@b1f6650222bc:/code# reframe -c tutorials/flux -C tutorials/flux/settings.py --run
78+
[ReFrame Setup]
79+
version: 4.0.0-dev.1
80+
command: '/code/bin/reframe -c tutorials/flux -C tutorials/flux/settings.py --run'
81+
launched by: root@b1f6650222bc
82+
working directory: '/code'
83+
settings file: 'tutorials/flux/settings.py'
84+
check search path: '/code/tutorials/flux'
85+
stage directory: '/code/stage'
86+
output directory: '/code/output'
87+
88+
[==========] Running 1 check(s)
89+
[==========] Started on Fri Sep 16 20:47:15 2022
90+
91+
[----------] start processing checks
92+
[ RUN ] EchoRandTest /66b93401 @generic:default+builtin
93+
[ OK ] (1/1) EchoRandTest /66b93401 @generic:default+builtin
94+
[----------] all spawned checks have finished
95+
96+
[ PASSED ] Ran 1/1 test case(s) from 1 check(s) (0 failure(s), 0 skipped)
97+
[==========] Finished on Fri Sep 16 20:47:15 2022
98+
Run report saved in '/root/.reframe/reports/run-report.json'
99+
Log file(s) saved in '/tmp/rfm-0avso9nb.log'
100+
101+
For advanced users or developers, here is how to run tests within the container:
102+
103+
Testing
104+
-------
105+
106+
.. code:: console
107+
108+
./test_reframe.py --rfm-user-config=tutorials/flux/settings.py unittests/test_schedulers.py -xs

docs/tutorials.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ ReFrame Tutorials
1111
tutorial_fixtures
1212
tutorial_build_automation
1313
tutorial_tips_tricks
14-
14+
tutorial_flux
1515

1616
Online Tutorials
1717
----------------

reframe/core/backends.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
]
1818
_launchers = {}
1919
_scheduler_backend_modules = [
20+
'reframe.core.schedulers.flux',
2021
'reframe.core.schedulers.local',
2122
'reframe.core.schedulers.lsf',
2223
'reframe.core.schedulers.pbs',
@@ -27,7 +28,7 @@
2728
_schedulers = {}
2829

2930

30-
def _register_backend(name, local=False, *, backend_type):
31+
def _register_backend(name, local=False, error=None, *, backend_type):
3132
def do_register(cls):
3233
registry = globals()[f'_{backend_type}s']
3334
if name in registry:
@@ -37,7 +38,7 @@ def do_register(cls):
3738

3839
cls.is_local = fields.ConstantField(bool(local))
3940
cls.registered_name = fields.ConstantField(name)
40-
registry[name] = cls
41+
registry[name] = (cls, error)
4142
return cls
4243

4344
return do_register
@@ -49,9 +50,15 @@ def _get_backend(name, *, backend_type):
4950
importlib.import_module(mod)
5051

5152
try:
52-
return globals()[f'_{backend_type}s'][name]
53+
cls, error = globals()[f'_{backend_type}s'][name]
54+
if error:
55+
raise ConfigError(
56+
f'could not register {backend_type} backend: {error}'
57+
)
5358
except KeyError:
54-
raise ConfigError(f"no such {backend_type}: '{name}'")
59+
raise ConfigError(f'no such {backend_type}: {name!r}')
60+
else:
61+
return cls
5562

5663

5764
register_scheduler = functools.partial(

reframe/core/schedulers/flux.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
2+
# ReFrame Project Developers. See the top-level LICENSE file for details.
3+
#
4+
# SPDX-License-Identifier: BSD-3-Clause
5+
6+
#
7+
# Flux-Framework backend
8+
#
9+
# - Initial version submitted by Vanessa Sochat,
10+
# Lawrence Livermore National Lab
11+
#
12+
13+
import itertools
14+
import os
15+
import time
16+
17+
import reframe.core.runtime as rt
18+
from reframe.core.backends import register_scheduler
19+
from reframe.core.exceptions import JobError
20+
from reframe.core.schedulers import JobScheduler, Job
21+
22+
# Just import flux once
23+
try:
24+
import flux
25+
import flux.job
26+
from flux.job import JobspecV1
27+
except ImportError:
28+
error = 'no flux Python bindings found'
29+
else:
30+
error = None
31+
32+
WAITING_STATES = ('QUEUED', 'HELD', 'WAITING', 'PENDING')
33+
34+
35+
class _FluxJob(Job):
36+
def __init__(self, *args, **kwargs):
37+
'''Create the flux job (and future) to watch.'''
38+
super().__init__(*args, **kwargs)
39+
40+
# Generate the flux job
41+
self.fluxjob = JobspecV1.from_command(
42+
command=['/bin/bash', self.script_filename],
43+
num_tasks=self.num_tasks_per_core or 1,
44+
cores_per_task=self.num_cpus_per_task or 1,
45+
)
46+
47+
# We must use absolute paths for Flux
48+
out = os.path.join(os.path.abspath(self.workdir), self.stdout)
49+
err = os.path.join(os.path.abspath(self.workdir), self.stderr)
50+
51+
# A duration of zero (the default) means unlimited
52+
self.fluxjob.duration = self.time_limit or 0
53+
self.fluxjob.stdout = out
54+
self.fluxjob.stderr = err
55+
self.fluxjob.cwd = os.path.abspath(self.workdir)
56+
self.fluxjob.environment = dict(os.environ)
57+
self._completed = False
58+
59+
@property
60+
def completed(self):
61+
return self._completed
62+
63+
64+
@register_scheduler('flux', error=error)
65+
class FluxJobScheduler(JobScheduler):
66+
def __init__(self):
67+
self._fexecutor = flux.job.FluxExecutor()
68+
self._submit_timeout = rt.runtime().get_option(
69+
f'schedulers/@{self.registered_name}/job_submit_timeout'
70+
)
71+
72+
def emit_preamble(self, job):
73+
# We don't need to submit with a file, so we don't need a preamble.
74+
return []
75+
76+
def make_job(self, *args, **kwargs):
77+
return _FluxJob(*args, **kwargs)
78+
79+
def submit(self, job):
80+
'''Submit a job to the flux executor.'''
81+
82+
flux_future = self._fexecutor.submit(job.fluxjob)
83+
job._jobid = str(flux_future.jobid())
84+
job._submit_time = time.time()
85+
job._flux_future = flux_future
86+
87+
def cancel(self, job):
88+
'''Cancel a running Flux job.'''
89+
90+
# Job future cannot cancel once running or completed
91+
if not job._flux_future.cancel():
92+
# This will raise JobException with event=cancel (on poll)
93+
flux.job.cancel(flux.Flux(), job._flux_future.jobid())
94+
95+
def poll(self, *jobs):
96+
'''Poll running Flux jobs for updated states.'''
97+
98+
if jobs:
99+
# filter out non-jobs
100+
jobs = [job for job in jobs if job is not None]
101+
102+
if not jobs:
103+
return
104+
105+
# Loop through active jobs and act on status
106+
for job in jobs:
107+
if job._flux_future.done():
108+
try:
109+
# The exit code can help us determine if the job was
110+
# successful
111+
exit_code = job._flux_future.result(0)
112+
except flux.job.JobException:
113+
# Currently the only state we see is cancelled here
114+
self.log(f'Job {job.jobid} was likely cancelled.')
115+
job._state = 'CANCELLED'
116+
except RuntimeError:
117+
# Assume some runtime issue (suspended)
118+
self.log(f'Job {job.jobid} was likely suspended.')
119+
job._state = 'SUSPENDED'
120+
else:
121+
# the job finished (but possibly with nonzero exit code)
122+
job._state = 'COMPLETED'
123+
if exit_code != 0:
124+
self.log(
125+
f'Job {job.jobid} did not finish successfully'
126+
)
127+
128+
job._completed = True
129+
elif job.state in WAITING_STATES and job.max_pending_time:
130+
if time.time() - job.submit_time >= job.max_pending_time:
131+
self.cancel(job)
132+
job._exception = JobError(
133+
'maximum pending time exceeded', job.jobid
134+
)
135+
else:
136+
# Otherwise, we are still running
137+
job._state = 'RUNNING'
138+
139+
def allnodes(self):
140+
raise NotImplementedError('flux backend does not support node listing')
141+
142+
def filternodes(self, job, nodes):
143+
raise NotImplementedError(
144+
'flux backend does not support node filtering'
145+
)
146+
147+
def wait(self, job):
148+
'''Wait until a job is finished.'''
149+
150+
intervals = itertools.cycle([1, 2, 3])
151+
while not self.finished(job):
152+
self.poll(job)
153+
time.sleep(next(intervals))
154+
155+
def finished(self, job):
156+
if job.exception:
157+
raise job.exception
158+
159+
return job.completed

reframe/core/schedulers/registry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def getscheduler(name):
3535

3636

3737
# Import the schedulers modules to trigger their registration
38+
import reframe.core.schedulers.flux # noqa: F401, F403
3839
import reframe.core.schedulers.local # noqa: F401, F403
3940
import reframe.core.schedulers.lsf # noqa: F401, F403
4041
import reframe.core.schedulers.oar # noqa: F401, F403

0 commit comments

Comments
 (0)