Skip to content

Commit 87977f4

Browse files
grondogarlick
authored andcommitted
python: add JobFrobnicator class and flux job-frobnicator cmd
Problem: There is a need for a command and Python class similar to what exists for job validation -- implemented as a command which can filter and modify jobspec, driven as a worker by the job-ingest module. Add a JobFrobnicator class, very similar to the JobValidator class, which loads a set of plugins. However, instead of running all plugins in parallel, jobspec is passed through a stack of plugins which can modify jobspec in place. A flux-job-frobnicator.py command is provided which, similar to flux-job-validator.py, can act as a job-ingest worker. Given jobspec, it will return the modified jobspec to the caller using the job-ingest worker protocol by default. Includes a `defaults` plugin which will set defaults in jobspec based on defaults set by configuration policy.
1 parent 360d6cb commit 87977f4

File tree

6 files changed

+384
-0
lines changed

6 files changed

+384
-0
lines changed

src/bindings/python/flux/Makefile.am

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ nobase_fluxpy_PYTHON = \
3434
job/validator/plugins/schema.py \
3535
job/validator/plugins/feasibility.py \
3636
job/validator/plugins/require-instance.py \
37+
job/frobnicator/__init__.py \
38+
job/frobnicator/frobnicator.py \
39+
job/frobnicator/plugins/defaults.py \
3740
resource/Rlist.py \
3841
resource/__init__.py \
3942
resource/ResourceSetImplementation.py \
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
###############################################################
2+
# Copyright 2022 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
from flux.job.frobnicator.frobnicator import JobFrobnicator, FrobnicatorPlugin
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
###############################################################
2+
# Copyright 2022 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
import os
12+
import argparse
13+
from abc import abstractmethod
14+
15+
import flux
16+
from flux.importer import import_plugins, import_path
17+
from flux.job import Jobspec
18+
19+
20+
class FrobnicatorPlugin:
21+
"""Base class for plugins which modify jobspec in place"""
22+
23+
def __init__(self, parser):
24+
"""Initialize a FrobnicatorPlugin"""
25+
26+
def configure(self, args, config):
27+
"""Configure a FrobnicatorPlugin. Run after arguments are parsed
28+
29+
Args:
30+
args (:obj:`Namespace`): The resulting namespace after calling
31+
argparse.parse_args()
32+
33+
config (:obj:`dict`): The current broker config, stored as a Python
34+
dictionary.
35+
"""
36+
37+
@abstractmethod
38+
def frob(self, jobspec, userid, urgency, flags):
39+
"""Modify jobspec. A FrobnicatorPlugin must implement this method.
40+
41+
The plugin should modify the jobspec parameter directly. Extra
42+
job information (user, urgency, flags) are available in the
43+
``info`` parameter.
44+
45+
Args:
46+
jobspec (:obj:`Jobspec`): The jobspec to modify
47+
48+
userid (:obj:`int`): Submitting user
49+
50+
urgency (:obj:`int`): Initial job urgency
51+
52+
flags (:obj:`int`): Job submission flags
53+
54+
Returns:
55+
None or raises exception.
56+
"""
57+
raise NotImplementedError
58+
59+
60+
# pylint: disable=too-many-instance-attributes
61+
class JobFrobnicator:
62+
"""A plugin-based job modification class
63+
64+
JobFrobnicator loads an ordered stack of plugins that implement the
65+
FrobnicatorPlugin interface from the 'flux.job.frobnicator.plugins'
66+
namespace.
67+
"""
68+
69+
plugin_namespace = "flux.job.frobnicator.plugins"
70+
71+
def __init__(self, argv, pluginpath=None, parser=None):
72+
73+
self.frobnicators = []
74+
self.config = {}
75+
76+
if pluginpath is None:
77+
pluginpath = []
78+
79+
if parser is None:
80+
parser = argparse.ArgumentParser(
81+
formatter_class=flux.util.help_formatter(), add_help=False
82+
)
83+
84+
self.parser = parser
85+
self.parser_group = self.parser.add_argument_group("Options")
86+
self.plugins_group = self.parser.add_argument_group(
87+
"Options provided by plugins"
88+
)
89+
90+
self.parser_group.add_argument("--plugins", action="append", default=[])
91+
92+
args, self.remaining_args = self.parser.parse_known_args(argv)
93+
if args.plugins:
94+
args.plugins = [x for xs in args.plugins for x in xs.split(",")]
95+
96+
# Load all available frobnicator plugins
97+
self.plugins = import_plugins(self.plugin_namespace, pluginpath)
98+
self.args = args
99+
100+
def start(self):
101+
"""Read broker config, select and configure frobnicator plugins"""
102+
103+
self.config = flux.Flux().rpc("config.get").get()
104+
105+
for name in self.args.plugins:
106+
if name not in self.plugins:
107+
try:
108+
self.plugins[name] = import_path(name)
109+
except:
110+
raise ValueError(f"frobnicator plugin '{name}' not found")
111+
plugin = self.plugins[name].Frobnicator(parser=self.plugins_group)
112+
self.frobnicators.append(plugin)
113+
114+
# Parse remaining args and pass result to loaded plugins
115+
args = self.parser.parse_args(self.remaining_args)
116+
for frobnicator in self.frobnicators:
117+
frobnicator.configure(args, config=self.config)
118+
119+
def frob(self, jobspec, user=None, flags=None, urgency=16):
120+
"""Modify jobspec using stack of loaded frobnicator plugins
121+
122+
Args:
123+
jobspec (:obj:`Jobspec`): A Jobspec or JobspecV1 object
124+
which will be modified in place
125+
126+
userid (:obj:`int`): Submitting user
127+
128+
flags (:obj:`int`): Job submission flags
129+
130+
urgency (:obj:`int`): Initial job urgency
131+
132+
Returns:
133+
:obj:`dict`: A dictionary containing a result object,
134+
including keys::
135+
136+
{
137+
'errnum': 0,
138+
'errmsg': "An error message",
139+
'data': jobspec or None
140+
}
141+
142+
"""
143+
if not isinstance(jobspec, Jobspec):
144+
raise ValueError("jobspec not an instance of Jobspec")
145+
146+
if user is None:
147+
user = os.getuid()
148+
149+
for frob in self.frobnicators:
150+
frob.frob(jobspec, user, flags, urgency)
151+
return {"errnum": 0, "data": jobspec}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
##############################################################
2+
# Copyright 2021 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
##############################################################
10+
11+
"""Apply defaults to incoming jobspec based on broker config.
12+
13+
"""
14+
15+
from flux.job.frobnicator import FrobnicatorPlugin
16+
17+
18+
class DefaultsConfig:
19+
"""Convenience class for handling jobspec defaults configuration"""
20+
21+
def __init__(self, config={}):
22+
self.defaults = {}
23+
self.queues = {}
24+
self.default_queue = None
25+
26+
try:
27+
self.defaults = config["policy"]["jobspec"]["defaults"]["system"]
28+
self.default_queue = self.defaults["queue"]
29+
self.queues = config["queues"]
30+
except KeyError:
31+
pass
32+
33+
self.validate_config()
34+
35+
def validate_config(self):
36+
if self.queues and not isinstance(self.queues, dict):
37+
raise ValueError("queues must be a table")
38+
39+
if self.default_queue and self.default_queue not in self.queues:
40+
raise ValueError(
41+
f"default queue '{self.default_queue}' must be in [queues]"
42+
)
43+
44+
for queue in self.queues:
45+
self.queue_defaults(queue)
46+
47+
def queue_defaults(self, name):
48+
if name and self.queues:
49+
if name not in self.queues:
50+
raise ValueError(f"invalid queue {name} specified")
51+
qconf = self.queues[name]
52+
try:
53+
return qconf["policy"]["jobspec"]["defaults"]["system"]
54+
except KeyError:
55+
return None
56+
return None
57+
58+
def setattr_default(self, jobspec, attr, value):
59+
if attr == "duration" and jobspec.duration == 0:
60+
jobspec.duration = value
61+
elif attr not in jobspec.attributes["system"]:
62+
jobspec.setattr(f"system.{attr}", value)
63+
64+
def apply_defaults(self, jobspec):
65+
"""Apply general defaults then queue-specific defaults to jobspec"""
66+
67+
for attr in self.defaults:
68+
self.setattr_default(jobspec, attr, self.defaults[attr])
69+
70+
if jobspec.queue:
71+
if jobspec.queue not in self.queues:
72+
raise ValueError(f"Invalid queue '{jobspec.queue}' specified")
73+
queue_defaults = self.queue_defaults(jobspec.queue)
74+
if queue_defaults:
75+
for attr in queue_defaults:
76+
self.setattr_default(jobspec, attr, queue_defaults[attr])
77+
78+
79+
class Frobnicator(FrobnicatorPlugin):
80+
def __init__(self, parser):
81+
self.config = DefaultsConfig()
82+
super().__init__(parser)
83+
84+
def configure(self, args, config):
85+
self.config = DefaultsConfig(config)
86+
87+
def frob(self, jobspec, user, urgency, flags):
88+
self.config.apply_defaults(jobspec)

src/cmd/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ dist_fluxcmd_SCRIPTS = \
7777
flux-admin.py \
7878
flux-jobtap.py \
7979
flux-job-validator.py \
80+
flux-job-frobnicator.py \
8081
flux-job-exec-override.py \
8182
flux-perilog-run.py \
8283
flux-uri.py \

0 commit comments

Comments
 (0)