Skip to content

Commit a7760fc

Browse files
committed
broker: validate RFC 33 config
Problem: the [queues] and [policy] configuration defined in RFC 33 is not "owned" by any one plugin or flux component, and the frobnicator (one consumer) is not in a good position to raise a fatal error if it is incorrect. Validate the RFC 33 configuration in the broker so it can be checked before it is distributed to other parts of Flux. In this position, the instance will fail to start with a bad config, and flux config reload will fail on a bad config. Consumers of these config tables thus need not exhaustively validate them. Update t2222-job-manager-limit-job-size.t to expect immediate config failures instead of failures when the jobtap plugin is loaded.
1 parent ec6fe65 commit a7760fc

File tree

2 files changed

+282
-13
lines changed

2 files changed

+282
-13
lines changed

src/broker/brokercfg.c

Lines changed: 281 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "src/common/libutil/log.h"
2222
#include "src/common/libutil/errprintf.h"
23+
#include "src/common/libutil/fsd.h"
2324

2425
#include "attr.h"
2526
#include "module.h"
@@ -34,6 +35,278 @@ struct brokercfg {
3435
flux_future_t *reload_f;
3536
};
3637

38+
static int validate_policy_jobspec (json_t *o,
39+
const char *key,
40+
flux_error_t *error)
41+
{
42+
json_error_t jerror;
43+
json_t *duration = NULL;
44+
json_t *queue = NULL;
45+
46+
if (json_unpack_ex (o,
47+
&jerror,
48+
0,
49+
"{s?{s?{s?o s?o !} !} !}",
50+
"defaults",
51+
"system",
52+
"duration", &duration,
53+
"queue", &queue) < 0) {
54+
errprintf (error, "%s: %s", key, jerror.text);
55+
goto inval;
56+
}
57+
if (duration) {
58+
double d;
59+
60+
if (!json_is_string (duration)
61+
|| fsd_parse_duration (json_string_value (duration), &d) < 0) {
62+
errprintf (error,
63+
"%s.defaults.system.duration is not a valid FSD",
64+
key);
65+
goto inval;
66+
}
67+
}
68+
if (queue) {
69+
if (!json_is_string (queue)) {
70+
errprintf (error, "%s.defaults.system.queue is not a string", key);
71+
goto inval;
72+
}
73+
}
74+
return 0;
75+
inval:
76+
errno = EINVAL;
77+
return -1;
78+
}
79+
80+
static int validate_policy_limits_job_size (json_t *o,
81+
const char *key,
82+
const char *key2,
83+
flux_error_t *error)
84+
{
85+
json_error_t jerror;
86+
int nnodes = -1;
87+
int ncores = -1;
88+
int ngpus = -1;
89+
90+
if (json_unpack_ex (o,
91+
&jerror,
92+
0,
93+
"{s?i s?i s?i !}",
94+
"nnodes", &nnodes,
95+
"ncores", &ncores,
96+
"ngpus", &ngpus) < 0) {
97+
errprintf (error, "%s.%s: %s", key, key2, jerror.text);
98+
goto inval;
99+
}
100+
if (nnodes < -1 || ncores < -1 || ngpus < -1) {
101+
errprintf (error, "%s.%s: values must be >= -1", key, key2);
102+
goto inval;
103+
}
104+
return 0;
105+
inval:
106+
errno = EINVAL;
107+
return -1;
108+
}
109+
110+
static int validate_policy_limits (json_t *o,
111+
const char *key,
112+
flux_error_t *error)
113+
{
114+
json_error_t jerror;
115+
json_t *job_size = NULL;
116+
json_t *duration = NULL;
117+
118+
if (json_unpack_ex (o,
119+
&jerror,
120+
0,
121+
"{s?o s?o !}",
122+
"job-size", &job_size,
123+
"duration", &duration) < 0) {
124+
errprintf (error, "%s: %s", key, jerror.text);
125+
goto inval;
126+
}
127+
if (duration) {
128+
double d;
129+
130+
if (!json_is_string (duration)
131+
|| fsd_parse_duration (json_string_value (duration), &d) < 0) {
132+
errprintf (error, "%s.duration is not a valid FSD", key);
133+
goto inval;
134+
}
135+
}
136+
if (job_size) {
137+
json_t *min = NULL;
138+
json_t *max = NULL;
139+
140+
if (json_unpack_ex (job_size,
141+
&jerror,
142+
0,
143+
"{s?o s?o !}",
144+
"min", &min,
145+
"max", &max) < 0) {
146+
errprintf (error, "%s.job-size: %s", key, jerror.text);
147+
goto inval;
148+
}
149+
if (min) {
150+
if (validate_policy_limits_job_size (min, key, "min", error) < 0)
151+
goto inval;
152+
}
153+
if (max) {
154+
if (validate_policy_limits_job_size (max, key, "max", error) < 0)
155+
goto inval;
156+
}
157+
}
158+
return 0;
159+
inval:
160+
errno = EINVAL;
161+
return -1;
162+
}
163+
164+
static bool is_string_array (json_t *o)
165+
{
166+
size_t index;
167+
json_t *val;
168+
169+
if (!json_is_array (o))
170+
return false;
171+
json_array_foreach (o, index, val) {
172+
if (!json_is_string (val))
173+
return false;
174+
}
175+
return true;
176+
}
177+
178+
static int validate_policy_access (json_t *o,
179+
const char *key,
180+
flux_error_t *error)
181+
{
182+
json_error_t jerror;
183+
json_t *allow_user = NULL;
184+
json_t *allow_group = NULL;
185+
186+
if (json_unpack_ex (o,
187+
&jerror,
188+
0,
189+
"{s?o s?o !}",
190+
"allow-user", &allow_user,
191+
"allow-group", &allow_group) < 0) {
192+
errprintf (error, "%s: %s", key, jerror.text);
193+
goto inval;
194+
}
195+
if (allow_user) {
196+
if (!is_string_array (allow_user)) {
197+
errprintf (error, "%s.allow-user must be a string array", key);
198+
goto inval;
199+
}
200+
}
201+
if (allow_group) {
202+
if (!is_string_array (allow_group)) {
203+
errprintf (error, "%s.allow-group must be a string array", key);
204+
goto inval;
205+
}
206+
}
207+
return 0;
208+
inval:
209+
errno = EINVAL;
210+
return -1;
211+
}
212+
213+
/* Validate the policy table as defined by RFC 33. The table can appear at
214+
* the top level of the config or within a queues entry.
215+
*/
216+
static int validate_policy_json (json_t *policy,
217+
const char *key,
218+
flux_error_t *error)
219+
{
220+
json_error_t jerror;
221+
json_t *jobspec = NULL;
222+
json_t *limits = NULL;
223+
json_t *access = NULL;
224+
json_t *scheduler = NULL;
225+
char key2[1024];
226+
227+
if (json_unpack_ex (policy,
228+
&jerror,
229+
0,
230+
"{s?o s?o s?o s?o !}",
231+
"jobspec", &jobspec,
232+
"limits", &limits,
233+
"access", &access,
234+
"scheduler", &scheduler) < 0) {
235+
errprintf (error, "%s: %s", key, jerror.text);
236+
errno = EINVAL;
237+
return -1;
238+
}
239+
if (jobspec) {
240+
snprintf (key2, sizeof (key2), "%s.jobspec", key);
241+
if (validate_policy_jobspec (jobspec, key2, error) < 0)
242+
return -1;
243+
}
244+
if (limits) {
245+
snprintf (key2, sizeof (key2), "%s.limits", key);
246+
if (validate_policy_limits (limits, key2, error) < 0)
247+
return -1;
248+
}
249+
if (access) {
250+
snprintf (key2, sizeof (key2), "%s.access", key);
251+
if (validate_policy_access (access, key2, error) < 0)
252+
return -1;
253+
}
254+
return 0;
255+
}
256+
257+
static int validate_policy_config (flux_conf_t *conf, flux_error_t *error)
258+
{
259+
json_t *policy = NULL;
260+
261+
if (flux_conf_unpack (conf,
262+
error,
263+
"{s?o}",
264+
"policy", &policy) < 0)
265+
return -1;
266+
if (policy) {
267+
if (validate_policy_json (policy, "policy", error) < 0)
268+
return -1;
269+
}
270+
return 0;
271+
}
272+
273+
static int validate_queues_config (flux_conf_t *conf, flux_error_t *error)
274+
{
275+
json_t *queues = NULL;
276+
277+
if (flux_conf_unpack (conf,
278+
error,
279+
"{s?o}",
280+
"queues", &queues) < 0)
281+
return -1;
282+
if (queues) {
283+
const char *name;
284+
json_t *entry;
285+
286+
json_object_foreach (queues, name, entry) {
287+
json_error_t jerror;
288+
json_t *policy = NULL;
289+
290+
if (json_unpack_ex (entry,
291+
&jerror,
292+
0,
293+
"{s?o !}",
294+
"policy", &policy) < 0) {
295+
errprintf (error, "queues.%s: %s", name, jerror.text);
296+
errno = EINVAL;
297+
return -1;
298+
}
299+
if (policy) {
300+
char key[1024];
301+
snprintf (key, sizeof (key), "queues.%s.policy", name);
302+
if (validate_policy_json (policy, key, error) < 0)
303+
return -1;
304+
}
305+
}
306+
}
307+
return 0;
308+
}
309+
37310
/* Parse config object from TOML config files if path is set;
38311
* otherwise, create an empty config object. Store the object
39312
* in ctx->h for later access by flux_conf_get().
@@ -59,13 +332,18 @@ static int brokercfg_parse (flux_t *h,
59332
return -1;
60333
}
61334
}
335+
if (validate_policy_config (conf, errp) < 0)
336+
goto error;
337+
if (validate_queues_config (conf, errp) < 0)
338+
goto error;
62339
if (flux_set_conf (h, conf) < 0) {
63340
errprintf (errp, "Error caching config object");
64-
flux_conf_decref (conf);
65-
return -1;
341+
goto error;
66342
}
67-
68343
return 0;
344+
error:
345+
flux_conf_decref (conf);
346+
return -1;
69347
}
70348

71349
/* Now that all modules have responded to '<name>.config-reload' request,

t/t2222-job-manager-limit-job-size.t

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,12 @@ test_under_flux 2 full -o,--config-path=$(pwd)/config
1010

1111
flux setattr log-stderr-level 1
1212

13-
test_expect_success 'unload all built-in plugins' '
14-
flux jobtap remove ".*"
15-
'
1613
test_expect_success 'configure an invalid job-size limit' '
1714
cat >config/policy.toml <<-EOT &&
1815
[policy.limits]
1916
job-size.max.nnodes = -42
2017
EOT
21-
flux config reload
22-
'
23-
test_expect_success 'cannot load the limit-job-size plugin' '
24-
test_must_fail flux jobtap load .limit-job-size
18+
test_must_fail flux config reload
2519
'
2620
test_expect_success 'configure valid job-size.*.nnodes limits' '
2721
cat >config/policy.toml <<-EOT &&
@@ -31,9 +25,6 @@ test_expect_success 'configure valid job-size.*.nnodes limits' '
3125
EOT
3226
flux config reload
3327
'
34-
test_expect_success 'and plugin can now be loaded' '
35-
flux jobtap load .limit-job-size
36-
'
3728
test_expect_success 'a job that exceeds job-size.max.nnodes is rejected' '
3829
test_must_fail flux mini submit -N 3 /bin/true 2>max-nnodes.err &&
3930
grep "exceeds policy limit of 2" max-nnodes.err

0 commit comments

Comments
 (0)