Skip to content

Commit ba51709

Browse files
committed
job-manager: add dependency-singleton jobtap plugin
Problem: There's no support for a singleton job dependency scheme, i.e. one in which only one job of a given name is allowed to be running at time for the same userid. Add a dependency-singleton jobtap plugin which supports a singleton dependency scheme. This plugin tracks the count of all active jobs with an explicit job name per userid. Jobs submitted with the singleton dependency scheme are then placed in a list and held in the DEPEND state until other jobs with the same name and userid (and not already held in the singleton list) become inactive. Fixes #6804
1 parent d998edf commit ba51709

File tree

3 files changed

+372
-0
lines changed

3 files changed

+372
-0
lines changed

src/modules/job-manager/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ libjob_manager_la_SOURCES = \
7474
plugins/limit-job-size.c \
7575
plugins/limit-duration.c \
7676
plugins/dependency-after.c \
77+
plugins/dependency-singleton.c \
7778
plugins/begin-time.c \
7879
plugins/update-duration.c \
7980
plugins/validate-duration.c \

src/modules/job-manager/jobtap.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ extern int priority_default_plugin_init (flux_plugin_t *p);
4848
extern int limit_job_size_plugin_init (flux_plugin_t *p);
4949
extern int limit_duration_plugin_init (flux_plugin_t *p);
5050
extern int after_plugin_init (flux_plugin_t *p);
51+
extern int singleton_plugin_init (flux_plugin_t *p);
5152
extern int begin_time_plugin_init (flux_plugin_t *p);
5253
extern int validate_duration_plugin_init (flux_plugin_t *p);
5354
extern int update_duration_plugin_init (flux_plugin_t *p);
@@ -70,6 +71,7 @@ static struct jobtap_builtin jobtap_builtins [] = {
7071
{ ".limit-job-size", limit_job_size_plugin_init },
7172
{ ".limit-duration", limit_duration_plugin_init },
7273
{ ".dependency-after", after_plugin_init },
74+
{ ".dependency-singleton", singleton_plugin_init },
7375
{ ".begin-time", &begin_time_plugin_init },
7476
{ ".validate-duration", &validate_duration_plugin_init },
7577
{ ".update-duration", &update_duration_plugin_init },
Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
/************************************************************\
2+
* Copyright 2025 Lawrence Livermore National Security, LLC
3+
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
*
5+
* This file is part of the Flux resource manager framework.
6+
* For details, see https://github.com/flux-framework.
7+
*
8+
* SPDX-License-Identifier: LGPL-3.0
9+
\************************************************************/
10+
11+
/* dependency-singleton.c
12+
*
13+
* Support singleton dependency scheme which places a dependency
14+
* on a job which is only released when there are no other active
15+
* jobs of the same userid and job name which are not also already
16+
* held with a singleton dependency.
17+
*
18+
* Notes:
19+
* - counts of active jobs with the same userid/job-name are maintained
20+
* in a global hash
21+
* - jobs without an explicit name are ignored
22+
* - jobs submitted with dependency=singleton are placed on a list, also
23+
* hashed by userid/job-name
24+
* - when the active job count for a userid/job-name is decremented and
25+
* equals the count of singletons for that same pair, a singleton job
26+
* is released and removed from the list
27+
* - it is an error to submit a singleton job without an explicit job name
28+
*
29+
*/
30+
31+
#if HAVE_CONFIG_H
32+
#include "config.h"
33+
#endif
34+
#include <jansson.h>
35+
#include <flux/core.h>
36+
#include <flux/jobtap.h>
37+
38+
#include "src/common/libjob/idf58.h"
39+
#include "src/common/libczmqcontainers/czmq_containers.h"
40+
#include "src/common/libutil/hola.h"
41+
#include "ccan/str/str.h"
42+
#include "ccan/ptrint/ptrint.h"
43+
44+
struct singleton_ctx {
45+
zhashx_t *counts; // uid:job-name => ACTIVE job counts
46+
// (with or without singleton dep)
47+
struct hola *singletons; // uid:job-name => list of struct singleton
48+
// (jobs with singleton dep)
49+
};
50+
51+
struct singleton {
52+
flux_jobid_t id;
53+
};
54+
55+
static struct singleton_ctx *global_ctx = NULL;
56+
57+
static void singleton_ctx_destroy (struct singleton_ctx *sctx)
58+
{
59+
if (sctx) {
60+
int saved_errno = errno;
61+
zhashx_destroy (&sctx->counts);
62+
hola_destroy (sctx->singletons);
63+
free (sctx);
64+
errno = saved_errno;
65+
}
66+
}
67+
68+
static struct singleton_ctx *singleton_ctx_create (void)
69+
{
70+
struct singleton_ctx *sctx;
71+
int flags = HOLA_AUTOCREATE | HOLA_AUTODESTROY;
72+
73+
if (!(sctx = malloc (sizeof (*sctx)))
74+
|| !(sctx->counts = zhashx_new ())
75+
|| !(sctx->singletons = hola_create (flags)))
76+
goto error;
77+
return sctx;
78+
error:
79+
singleton_ctx_destroy (sctx);
80+
return NULL;
81+
}
82+
83+
static int singleton_key_create (char *key,
84+
size_t len,
85+
uint32_t userid,
86+
const char *name)
87+
{
88+
if (snprintf (key, len, "%u:%s", userid, name) >= len)
89+
return -1;
90+
return 0;
91+
}
92+
93+
static int singleton_list_push (flux_plugin_t *p,
94+
struct singleton_ctx *sctx,
95+
const char *key,
96+
flux_jobid_t id)
97+
{
98+
struct singleton *s;
99+
100+
if (!(s = calloc (1, sizeof (*s)))
101+
|| flux_jobtap_job_aux_set (p, id, NULL, s, (flux_free_f) free) < 0) {
102+
free (s);
103+
return -1;
104+
}
105+
s->id = id;
106+
if (!(hola_list_add_end (sctx->singletons, key, s)))
107+
return -1;
108+
109+
return 0;
110+
}
111+
112+
static struct singleton *singleton_list_pop (struct singleton_ctx *sctx,
113+
const char *key)
114+
{
115+
struct singleton *s = hola_list_first (sctx->singletons, key);
116+
if (s) {
117+
hola_list_delete (sctx->singletons,
118+
key,
119+
hola_list_cursor (sctx->singletons, key));
120+
}
121+
return s;
122+
}
123+
124+
static size_t singleton_list_count (struct singleton_ctx *sctx,
125+
const char *key)
126+
{
127+
return hola_list_size (sctx->singletons, key);
128+
}
129+
130+
/* Return the current count of ACTIVE jobs with same uid:job-name
131+
*/
132+
static int get_current_active_count (struct singleton_ctx *sctx,
133+
const char *key)
134+
{
135+
return ptr2int (zhashx_lookup (sctx->counts, key));
136+
}
137+
138+
/* Set the current count of ACTIVE jobs for key (uid:job-name)
139+
*/
140+
static void set_current_active_count (struct singleton_ctx *sctx,
141+
const char *key,
142+
int count)
143+
{
144+
if (count > 0)
145+
zhashx_update (sctx->counts, key, int2ptr (count));
146+
else
147+
zhashx_delete (sctx->counts, key);
148+
}
149+
150+
/* Update singleton userid:name hash counts with value (1 or -1)
151+
*/
152+
static int singleton_count_update (struct singleton_ctx *sctx,
153+
flux_plugin_t *p,
154+
flux_plugin_arg_t *args,
155+
int value)
156+
{
157+
flux_jobid_t id;
158+
struct singleton *s;
159+
uint32_t userid;
160+
const char *name = NULL;
161+
char key [1024];
162+
int count;
163+
164+
if (flux_plugin_arg_unpack (args,
165+
FLUX_PLUGIN_ARG_IN,
166+
"{s:I s:i s:{s:{s:{s?{s?s}}}}}",
167+
"id", &id,
168+
"userid", &userid,
169+
"jobspec",
170+
"attributes",
171+
"system",
172+
"job",
173+
"name", &name) < 0)
174+
return -1;
175+
176+
/* Only track jobs with an explicit name */
177+
if (name == NULL)
178+
return 0;
179+
180+
if (singleton_key_create (key, sizeof (key), userid, name) < 0)
181+
return -1;
182+
183+
count = get_current_active_count (sctx, key) + value;
184+
if (count == singleton_list_count (sctx, key)
185+
&& (s = singleton_list_pop (sctx, key))) {
186+
/* All active jobs of this uid/name pair are waiting on a singleton
187+
* dependency. Pop the next job on the list and release it:
188+
*/
189+
if (flux_jobtap_dependency_remove (p, s->id, "singleton") < 0) {
190+
flux_jobtap_raise_exception (p,
191+
s->id,
192+
"dependency",
193+
0,
194+
"failed to remove singleton dependency");
195+
}
196+
}
197+
set_current_active_count (sctx, key, count);
198+
return 0;
199+
}
200+
201+
static int new_cb (flux_plugin_t *p,
202+
const char *topic,
203+
flux_plugin_arg_t *args,
204+
void *data)
205+
{
206+
return singleton_count_update (global_ctx, p, args, 1);
207+
}
208+
209+
static int inactive_cb (flux_plugin_t *p,
210+
const char *topic,
211+
flux_plugin_arg_t *args,
212+
void *data)
213+
{
214+
return singleton_count_update (global_ctx, p, args, -1);
215+
}
216+
217+
218+
static int dependency_singleton_cb (flux_plugin_t *p,
219+
const char *topic,
220+
flux_plugin_arg_t *args,
221+
void *data)
222+
{
223+
struct singleton_ctx *sctx = global_ctx;
224+
flux_jobid_t id;
225+
uint32_t userid;
226+
const char *name = NULL;
227+
char key [1024];
228+
int count;
229+
230+
if (flux_plugin_arg_unpack (args,
231+
FLUX_PLUGIN_ARG_IN,
232+
"{s:I s:i s:{s:{s:{s?{s?s}}}}}",
233+
"id", &id,
234+
"userid", &userid,
235+
"jobspec",
236+
"attributes",
237+
"system",
238+
"job",
239+
"name", &name) < 0)
240+
return flux_jobtap_reject_job (p,
241+
args,
242+
"failed to unpack plugin args: %s",
243+
flux_plugin_arg_strerror (args));
244+
if (name == NULL)
245+
return flux_jobtap_reject_job (p,
246+
args,
247+
"singleton jobs require a job name");
248+
249+
if (singleton_key_create (key, sizeof (key), userid, name) < 0)
250+
return flux_jobtap_reject_job (p, args, "error creating singleton key");
251+
252+
/* Get current count of matching jobs in PRIORITY|SCHED|RUN|CLEANUP state.
253+
* If there are no other matching jobs then release this one immediately.
254+
*
255+
* Note: The current job is not included in the `counts` hash yet since
256+
* `job.dependency.*` callbacks run before `job.new`, which is only called
257+
* on valid jobs.
258+
*/
259+
if ((count = get_current_active_count (sctx, key)) == 0)
260+
return 0;
261+
262+
if (flux_jobtap_dependency_add (p, id, "singleton") < 0)
263+
return flux_jobtap_reject_job (p, args, "failed to add dependency");
264+
265+
return singleton_list_push (p, sctx, key, id);
266+
}
267+
268+
static json_t *singleton_list_to_json (const char *key)
269+
{
270+
json_t *o;
271+
zlistx_t *l;
272+
struct singleton *s;
273+
274+
if (!(o = json_array ()))
275+
return NULL;
276+
277+
if (!(l = hola_hash_lookup (global_ctx->singletons, key)))
278+
return o;
279+
280+
s = zlistx_first (l);
281+
while (s) {
282+
json_t *id = json_integer (s->id);
283+
if (!id || json_array_append_new (o, id)) {
284+
json_decref (id);
285+
goto error;
286+
}
287+
s = zlistx_next (l);
288+
}
289+
return o;
290+
error:
291+
json_decref (o);
292+
return NULL;
293+
}
294+
295+
static json_t *singletons_to_json (void)
296+
{
297+
json_t *o = NULL;
298+
zlistx_t *keys = NULL;
299+
const char *key;
300+
301+
if (!(keys = zhashx_keys (global_ctx->counts))
302+
|| !(o = json_object ()))
303+
goto error;
304+
305+
key = zlistx_first (keys);
306+
while (key) {
307+
json_t *singletons;
308+
json_t *entry = NULL;
309+
int count = get_current_active_count (global_ctx, key);
310+
311+
if (!(singletons = singleton_list_to_json (key))
312+
|| !(entry = json_pack ("{s:i s:O}",
313+
"count", count,
314+
"singletons", singletons))
315+
|| json_object_set_new (o, key, entry) < 0) {
316+
json_decref (entry);
317+
json_decref (singletons);
318+
goto error;
319+
}
320+
json_decref (singletons);
321+
key = zlistx_next (keys);
322+
}
323+
zlistx_destroy (&keys);
324+
return o;
325+
error:
326+
json_decref (o);
327+
zlistx_destroy (&keys);
328+
return NULL;
329+
}
330+
331+
static int query_cb (flux_plugin_t *p,
332+
const char *topic,
333+
flux_plugin_arg_t *args,
334+
void *data)
335+
{
336+
int rc;
337+
json_t *o;
338+
if (!(o = singletons_to_json ()))
339+
return -1;
340+
rc = flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, "O", o);
341+
json_decref (o);
342+
return rc;
343+
}
344+
345+
static const struct flux_plugin_handler tab[] = {
346+
{ "job.dependency.singleton", dependency_singleton_cb, NULL },
347+
{ "job.new", new_cb, NULL },
348+
{ "job.state.inactive", inactive_cb, NULL },
349+
{ "plugin.query", query_cb, NULL },
350+
{ 0 },
351+
};
352+
353+
int singleton_plugin_init (flux_plugin_t *p)
354+
{
355+
if (!(global_ctx = singleton_ctx_create ())
356+
|| flux_plugin_aux_set (p,
357+
NULL,
358+
global_ctx,
359+
(flux_free_f) singleton_ctx_destroy) < 0) {
360+
singleton_ctx_destroy (global_ctx);
361+
return -1;
362+
}
363+
return flux_plugin_register (p, ".dependency-singleton", tab);
364+
}
365+
366+
/*
367+
* vi:tabstop=4 shiftwidth=4 expandtab
368+
*/
369+

0 commit comments

Comments
 (0)