Skip to content

Commit 5380875

Browse files
authored
Merge pull request #4529 from garlick/jobfrob
job-ingest: assign jobspec defaults before scheduler feasibility check
2 parents 9f628ee + a9297df commit 5380875

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2870
-1390
lines changed

doc/man5/flux-config-ingest.rst

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,22 @@ flux-config-ingest(5)
66
DESCRIPTION
77
===========
88

9-
The Flux **job-ingest** service verifies and validates job requests
9+
The Flux **job-ingest** service optionally modifies and validates job requests
1010
before announcing new jobs to the **job-manager**. Configuration of the
11-
**job-ingest** module can be accomplished either via the module command
12-
line or an ``ingest`` TOML table. See the KEYS section below for supported
13-
``ingest`` table keys.
11+
**job-ingest** module can be accomplished via the ``ingest`` TOML table.
12+
See the KEYS section below for supported ``ingest`` table keys.
1413

15-
The **job-ingest** module validates jobspec using a work crew of
16-
``flux job-validator`` processes. The validator supports a set of plugins,
17-
and each plugin may consume additional arguments from the command line
18-
for specific configuration. The validator plugins and any arguments are
19-
configured in the ``ingest.validator`` TOML table. See the VALIDATOR KEYS
20-
section below for supported ``ingest.validator`` keys.
14+
The **job-ingest** module implements a two stage pipeline for job requests.
15+
The first stage modifies jobspec and is implemented as a work crew of
16+
``flux job-frobnicator`` processes. The second stage validates the modified
17+
requests and is implemented as a work crew of ``flux job-validator`` processes.
18+
The frobnicator is disabled by default, and the validator is enabled by default.
19+
20+
The frobnicator and validator each supports a set of plugins, and each plugin
21+
may consume additional arguments from the command line for specific
22+
configuration. The plugins and any arguments are configured in the
23+
``ingest.frobnicator`` and ``ingest.validator`` TOML tables, respectively.
24+
See the FROBNICATOR KEYS and VALIDATOR KEYS sections for supported keys.
2125

2226
KEYS
2327
====
@@ -28,6 +32,19 @@ batch-count
2832
``batch-count`` key is nonzero then jobs are batched based on a counter
2933
instead. This is mostly useful for testing.
3034

35+
FROBNICATOR KEYS
36+
================
37+
38+
plugins
39+
(optional) An array of frobnicator plugins to use.
40+
For a list of supported plugins on your system run
41+
``flux job-frobnicator --list-plugins``
42+
43+
args
44+
(optional) An array of extra arguments to pass on the frobnicator
45+
command line. Valid arguments can be found by running
46+
``flux job-frobnicator --plugins=LIST --help``
47+
3148
VALIDATOR KEYS
3249
==============
3350

@@ -52,6 +69,9 @@ EXAMPLE
5269

5370
::
5471

72+
[ingest.frobnicator]
73+
plugins = [ "defaults" ]
74+
5575
[ingest.validator]
5676
plugins = [ "jobspec", "feasibility" ]
5777
args = [ "--require-version=1" ]

doc/man5/flux-config-policy.rst

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ DEFAULTS
2020
Default values for job requests may be configured in the
2121
``policy.jobspec.defaults.system`` table. If a job request does not specify
2222
a value for a system attribute that is configured in the table, the configured
23-
value is substituted. Some common examples are:
23+
value is substituted. Currently the following values are allowed:
2424

25-
policy.jobspec.defaults.system.duration (float seconds or FSD string)
25+
policy.jobspec.defaults.system.duration (FSD string)
2626
(optional) If a job is submitted without specifying a job duration,
2727
this value is substituted.
2828

@@ -42,8 +42,8 @@ rejected at submission time if they violate configured limits.
4242
queue. The actual value used to indicate `unlimited` varies by limit
4343
type and is noted below.
4444

45-
policy.limits.duration (float seconds or FSD string)
46-
(optional) Maximum duration that a job may request (0 = unlimited).
45+
policy.limits.duration (FSD string)
46+
(optional) Maximum duration that a job may request ("0" = unlimited).
4747

4848
.. note::
4949
Limit checks take place before the scheduler sees the request, so it is
@@ -79,18 +79,18 @@ EXAMPLE
7979

8080
::
8181

82-
[policy.defaults]
82+
[policy.jobspec.defaults.system]
8383
duration = "1h"
84-
queue = "pbatch"
84+
queue = "batch"
8585

8686
[policy.limits]
8787
duration = "4h"
8888
job-size.max.nnodes = 8
89-
job-size.max.gpus = 4
89+
job-size.max.ngpus = 4
9090

91-
[queues.pdebug.policy.limits]
91+
[queues.debug.policy.limits]
9292
duration = "30m"
93-
job-size.max.gpus = -1 # unlimited
93+
job-size.max.ngpus = -1 # unlimited
9494

9595

9696
RESOURCES

doc/test/spell.en.pws

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,3 +635,4 @@ pdebug
635635
parentof
636636
bg
637637
norestrict
638+
frobnicator

src/bindings/python/flux/Makefile.am

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ nobase_fluxpy_PYTHON = \
3434
job/validator/plugins/schema.py \
3535
job/validator/plugins/feasibility.py \
3636
job/validator/plugins/require-instance.py \
37+
job/frobnicator/__init__.py \
38+
job/frobnicator/frobnicator.py \
39+
job/frobnicator/plugins/defaults.py \
3740
resource/Rlist.py \
3841
resource/__init__.py \
3942
resource/ResourceSetImplementation.py \

src/bindings/python/flux/job/Jobspec.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,25 @@ def duration(self, duration):
371371
raise ValueError("duration must be a normal, finite value")
372372
self.setattr("system.duration", time)
373373

374+
@property
375+
def queue(self):
376+
"""
377+
Target queue of job submission
378+
"""
379+
try:
380+
return self.jobspec["attributes"]["system"]["queue"]
381+
except KeyError:
382+
return None
383+
384+
@queue.setter
385+
def queue(self, queue):
386+
"""
387+
Set target submission queue
388+
"""
389+
if not isinstance(queue, str):
390+
raise TypeError("queue must be a string")
391+
self.setattr("system.queue", queue)
392+
374393
@property
375394
def cwd(self):
376395
"""
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
###############################################################
2+
# Copyright 2022 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
from flux.job.frobnicator.frobnicator import JobFrobnicator, FrobnicatorPlugin
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
###############################################################
2+
# Copyright 2022 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
import os
12+
import argparse
13+
from abc import abstractmethod
14+
15+
import flux
16+
from flux.importer import import_plugins, import_path
17+
from flux.job import Jobspec
18+
19+
20+
class FrobnicatorPlugin:
21+
"""Base class for plugins which modify jobspec in place"""
22+
23+
def __init__(self, parser):
24+
"""Initialize a FrobnicatorPlugin"""
25+
26+
def configure(self, args, config):
27+
"""Configure a FrobnicatorPlugin. Run after arguments are parsed
28+
29+
Args:
30+
args (:obj:`Namespace`): The resulting namespace after calling
31+
argparse.parse_args()
32+
33+
config (:obj:`dict`): The current broker config, stored as a Python
34+
dictionary.
35+
"""
36+
37+
@abstractmethod
38+
def frob(self, jobspec, userid, urgency, flags):
39+
"""Modify jobspec. A FrobnicatorPlugin must implement this method.
40+
41+
The plugin should modify the jobspec parameter directly. Extra
42+
job information (user, urgency, flags) are available in the
43+
``info`` parameter.
44+
45+
Args:
46+
jobspec (:obj:`Jobspec`): The jobspec to modify
47+
48+
userid (:obj:`int`): Submitting user
49+
50+
urgency (:obj:`int`): Initial job urgency
51+
52+
flags (:obj:`int`): Job submission flags
53+
54+
Returns:
55+
None or raises exception.
56+
"""
57+
raise NotImplementedError
58+
59+
60+
# pylint: disable=too-many-instance-attributes
61+
class JobFrobnicator:
62+
"""A plugin-based job modification class
63+
64+
JobFrobnicator loads an ordered stack of plugins that implement the
65+
FrobnicatorPlugin interface from the 'flux.job.frobnicator.plugins'
66+
namespace.
67+
"""
68+
69+
plugin_namespace = "flux.job.frobnicator.plugins"
70+
71+
def __init__(self, argv, pluginpath=None, parser=None):
72+
73+
self.frobnicators = []
74+
self.config = {}
75+
76+
if pluginpath is None:
77+
pluginpath = []
78+
79+
if parser is None:
80+
parser = argparse.ArgumentParser(
81+
formatter_class=flux.util.help_formatter(), add_help=False
82+
)
83+
84+
self.parser = parser
85+
self.parser_group = self.parser.add_argument_group("Options")
86+
self.plugins_group = self.parser.add_argument_group(
87+
"Options provided by plugins"
88+
)
89+
90+
self.parser_group.add_argument("--plugins", action="append", default=[])
91+
92+
args, self.remaining_args = self.parser.parse_known_args(argv)
93+
if args.plugins:
94+
args.plugins = [x for xs in args.plugins for x in xs.split(",")]
95+
96+
# Load all available frobnicator plugins
97+
self.plugins = import_plugins(self.plugin_namespace, pluginpath)
98+
self.args = args
99+
100+
def start(self):
101+
"""Read broker config, select and configure frobnicator plugins"""
102+
103+
self.config = flux.Flux().rpc("config.get").get()
104+
105+
for name in self.args.plugins:
106+
if name not in self.plugins:
107+
try:
108+
self.plugins[name] = import_path(name)
109+
except:
110+
raise ValueError(f"frobnicator plugin '{name}' not found")
111+
plugin = self.plugins[name].Frobnicator(parser=self.plugins_group)
112+
self.frobnicators.append(plugin)
113+
114+
# Parse remaining args and pass result to loaded plugins
115+
args = self.parser.parse_args(self.remaining_args)
116+
for frobnicator in self.frobnicators:
117+
frobnicator.configure(args, config=self.config)
118+
119+
def frob(self, jobspec, user=None, flags=None, urgency=16):
120+
"""Modify jobspec using stack of loaded frobnicator plugins
121+
122+
Args:
123+
jobspec (:obj:`Jobspec`): A Jobspec or JobspecV1 object
124+
which will be modified in place
125+
126+
userid (:obj:`int`): Submitting user
127+
128+
flags (:obj:`int`): Job submission flags
129+
130+
urgency (:obj:`int`): Initial job urgency
131+
132+
Returns:
133+
:obj:`dict`: A dictionary containing a result object,
134+
including keys::
135+
136+
{
137+
'errnum': 0,
138+
'errmsg': "An error message",
139+
'data': jobspec or None
140+
}
141+
142+
"""
143+
if not isinstance(jobspec, Jobspec):
144+
raise ValueError("jobspec not an instance of Jobspec")
145+
146+
if user is None:
147+
user = os.getuid()
148+
149+
for frob in self.frobnicators:
150+
frob.frob(jobspec, user, flags, urgency)
151+
return {"errnum": 0, "data": jobspec}

0 commit comments

Comments
 (0)