Skip to content

Commit 770ffcb

Browse files
committed
delegate: allow delegation of a job to a different flux instance
Problem: Delegating a submitted job to a different flux instance is currently not supported. This feature is useful for users that have jobs that need to be executed in a multi-cluster setup, or for workflows where a certain sub-job needs to be executed on a peer cluster. Allow delegation of jobs to a different, peer-level flux instance by utilizing the URI of the other instance. Currently, explicit checking for feasibility of the job on the delegated instance is not performed, and it is assumed that both the instances belong to the same user.
1 parent ea37596 commit 770ffcb

File tree

2 files changed

+237
-1
lines changed

2 files changed

+237
-1
lines changed

src/modules/job-manager/Makefile.am

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ jobtap_plugin_LTLIBRARIES = \
2020
plugins/submit-hold.la \
2121
plugins/alloc-bypass.la \
2222
plugins/alloc-check.la \
23-
plugins/perilog.la
23+
plugins/perilog.la \
24+
plugins/delegate.la
2425

2526
libjob_manager_la_SOURCES = \
2627
job-manager.c \
@@ -121,6 +122,14 @@ plugins_perilog_la_LDFLAGS = \
121122
$(fluxplugin_ldflags) \
122123
-module
123124

125+
plugins_delegate_la_SOURCES = \
126+
plugins/delegate.c
127+
plugins_delegate_la_LIBADD = \
128+
$(top_builddir)/src/common/libflux-core.la
129+
$(JANSSON_LIBS)
130+
plugins_delegate_la_LDFLAGS = \
131+
$(fluxplugin_ldflags) \
132+
-module
124133

125134
TESTS = \
126135
test_job.t \
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/************************************************************\
2+
* Copyright 2024 Lawrence Livermore National Security, LLC
3+
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
*
5+
* This file is part of the Flux resource manager framework.
6+
* For details, see https://github.com/flux-framework.
7+
*
8+
* SPDX-License-Identifier: LGPL-3.0
9+
\************************************************************/
10+
11+
/*
12+
* Jobtap plugin for delegating jobs to another Flux instance.
13+
*/
14+
15+
#if HAVE_CONFIG_H
16+
#include "config.h"
17+
#endif
18+
19+
#include <flux/core.h>
20+
#include <flux/jobtap.h>
21+
#include <inttypes.h>
22+
#include <jansson.h>
23+
#include <stdint.h>
24+
25+
/*
26+
* Callback firing when job has completed.
27+
*/
28+
static void wait_callback (flux_future_t *f, void *arg)
29+
{
30+
flux_plugin_t *p = arg;
31+
json_int_t *id;
32+
bool success;
33+
const char *errstr;
34+
35+
if (!(id = flux_future_aux_get (f, "flux::jobid"))) {
36+
return;
37+
}
38+
if (flux_job_wait_get_status (f, &success, &errstr) < 0) {
39+
flux_jobtap_raise_exception (p,
40+
*id,
41+
"DelegationFailure",
42+
0,
43+
"Could not fetch result of job");
44+
return;
45+
}
46+
if (success) {
47+
flux_jobtap_raise_exception (p, *id, "DelegationSuccess", 0, "");
48+
} else {
49+
flux_jobtap_raise_exception (p,
50+
*id,
51+
"DelegationFailure",
52+
0,
53+
"errstr %s",
54+
errstr);
55+
}
56+
flux_future_destroy (f);
57+
}
58+
59+
/*
60+
* Callback firing when job has been submitted and ID is ready.
61+
*/
62+
static void submit_callback (flux_future_t *f, void *arg)
63+
{
64+
flux_t *h, *delegated_h;
65+
flux_plugin_t *p = arg;
66+
json_int_t *orig_id;
67+
flux_jobid_t delegated_id;
68+
flux_future_t *wait_future = NULL;
69+
const char *errstr;
70+
71+
if (!(h = flux_jobtap_get_flux (p))) {
72+
flux_future_destroy (f);
73+
return;
74+
} else if (!(orig_id = flux_future_aux_get (f, "flux::jobid"))) {
75+
flux_log_error (h, "in submit callback: couldn't get jobid");
76+
flux_future_destroy (f);
77+
return;
78+
}
79+
if (!(delegated_h = flux_future_get_flux (f))
80+
|| flux_job_submit_get_id (f, &delegated_id) < 0
81+
|| !(wait_future = flux_job_wait (delegated_h, delegated_id))
82+
|| flux_future_aux_set (wait_future, "flux::jobid", orig_id, NULL) < 0
83+
|| flux_future_then (wait_future, -1, wait_callback, p) < 0
84+
|| flux_jobtap_event_post_pack (p,
85+
*orig_id,
86+
"delegated",
87+
"{s:I}",
88+
"jobid",
89+
(json_int_t)delegated_id)
90+
< 0) {
91+
if (!(errstr = flux_future_error_string (f))) {
92+
errstr = "";
93+
}
94+
flux_log_error (h,
95+
"%" JSON_INTEGER_FORMAT
96+
": submission to specified Flux instance failed",
97+
*orig_id);
98+
flux_jobtap_raise_exception (p, *orig_id, "DelegationFailure", 0, errstr);
99+
flux_future_destroy (wait_future);
100+
flux_future_destroy (f);
101+
return;
102+
}
103+
flux_future_destroy (f);
104+
}
105+
106+
/*
107+
* Remove all dependencies from jobspec.
108+
*
109+
* Dependencies may reference jobids that the instance the job is
110+
* being sent to does not recognize.
111+
*
112+
* Also, if the 'delegate' dependency in particular were not removed,
113+
* one of two things would happen. If the instance the job is sent to
114+
* does not have this jobtap plugin loaded, then the job would be
115+
* rejected. Otherwise, if the instance DOES have this jobtap plugin
116+
* loaded, it would attempt to delegate to itself in an infinite
117+
* loop.
118+
*/
119+
static char *remove_dependency_and_encode (json_t *jobspec)
120+
{
121+
char *encoded_jobspec;
122+
json_t *dependency_list = NULL;
123+
124+
if (!(jobspec = json_deep_copy (jobspec))) {
125+
return NULL;
126+
}
127+
if (json_unpack (jobspec,
128+
"{s:{s:{s:o}}}",
129+
"attributes",
130+
"system",
131+
"dependencies",
132+
&dependency_list)
133+
< 0
134+
|| json_array_clear (dependency_list) < 0) {
135+
json_decref (jobspec);
136+
return NULL;
137+
}
138+
encoded_jobspec = json_dumps (jobspec, 0);
139+
json_decref (jobspec);
140+
return encoded_jobspec;
141+
}
142+
143+
/*
144+
* Handle job.dependency.delegate requests
145+
*/
146+
static int depend_cb (flux_plugin_t *p,
147+
const char *topic,
148+
flux_plugin_arg_t *args,
149+
void *arg)
150+
{
151+
flux_t *h = flux_jobtap_get_flux (p);
152+
json_int_t *id;
153+
flux_t *delegated;
154+
const char *uri;
155+
json_t *jobspec;
156+
char *encoded_jobspec = NULL;
157+
flux_future_t *jobid_future = NULL;
158+
159+
if (!h || !(id = malloc (sizeof (json_int_t)))) {
160+
return flux_jobtap_reject_job (p,
161+
args,
162+
"error processing delegate: %s",
163+
flux_plugin_arg_strerror (args));
164+
}
165+
if (flux_plugin_arg_unpack (args,
166+
FLUX_PLUGIN_ARG_IN,
167+
"{s:I s:{s:s} s:o}",
168+
"id",
169+
id,
170+
"dependency",
171+
"value",
172+
&uri,
173+
"jobspec",
174+
&jobspec)
175+
< 0
176+
|| flux_jobtap_job_aux_set (p, *id, "flux::jobid", id, free) < 0) {
177+
free (id);
178+
return flux_jobtap_reject_job (p,
179+
args,
180+
"error processing delegate: %s",
181+
flux_plugin_arg_strerror (args));
182+
}
183+
if (!(delegated = flux_open (uri, 0))) {
184+
flux_log_error (h, "%" JSON_INTEGER_FORMAT ": could not open URI %s", *id, uri);
185+
return -1;
186+
}
187+
if (flux_jobtap_dependency_add (p, *id, "delegated") < 0
188+
|| flux_jobtap_job_aux_set (p,
189+
*id,
190+
"flux::delegated_handle",
191+
delegated,
192+
(flux_free_f)flux_close)
193+
< 0
194+
|| flux_set_reactor (delegated, flux_get_reactor (h)) < 0) {
195+
flux_log_error (h, "%" JSON_INTEGER_FORMAT ": flux_jobtap_dependency_add", *id);
196+
flux_close (delegated);
197+
return -1;
198+
}
199+
// submit the job to the specified instance and attach a callback for fetching the
200+
// ID
201+
if (!(encoded_jobspec = remove_dependency_and_encode (jobspec))
202+
|| !(jobid_future =
203+
flux_job_submit (delegated, encoded_jobspec, 16, FLUX_JOB_WAITABLE))
204+
|| flux_future_then (jobid_future, -1, submit_callback, p) < 0
205+
|| flux_future_aux_set (jobid_future, "flux::jobid", id, NULL) < 0) {
206+
flux_log_error (h,
207+
"%" JSON_INTEGER_FORMAT
208+
": could not delegate job to specified Flux "
209+
"instance",
210+
*id);
211+
flux_future_destroy (jobid_future);
212+
free (encoded_jobspec);
213+
return -1;
214+
}
215+
free (encoded_jobspec);
216+
return 0;
217+
}
218+
219+
static const struct flux_plugin_handler tab[] = {
220+
{"job.dependency.delegate", depend_cb, NULL},
221+
{0},
222+
};
223+
224+
int flux_plugin_init (flux_plugin_t *p)
225+
{
226+
return flux_plugin_register (p, "delegate", tab);
227+
}

0 commit comments

Comments
 (0)