Skip to content

Commit 6dcba10

Browse files
committed
job-exec: always use stdio for exec barrier
Problem: job-exec uses either stdio or a subprocess channel to implement the exec barrier, but now that we require a minimum flux-security version of 0.9.0, stdio may be used in all cases. The critical change in flux-security is that the IMP can now use a helper program to obtain J in lieu of reading it from stdin. Simplify the code in job-exec so that stdio is always used for the exec barrier and the IMP always uses the helper. This also makes it easier to integrate pending work to execute jobs as systemd transient units, since subprocess channels will not work in that environment.
1 parent 70ec65f commit 6dcba10

File tree

6 files changed

+21
-107
lines changed

6 files changed

+21
-107
lines changed

src/modules/job-exec/exec.c

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -88,27 +88,6 @@ static void start_cb (struct bulk_exec *exec, void *arg)
8888
{
8989
struct jobinfo *job = arg;
9090
jobinfo_started (job);
91-
/* This is going to be really slow. However, it should at least
92-
* work for now. We wait for all imp's to start, then send input
93-
*/
94-
if (job->multiuser && !config_use_imp_helper ()) {
95-
char *input = NULL;
96-
json_t *o = json_pack ("{s:s}", "J", job->J);
97-
if (!o || !(input = json_dumps (o, JSON_COMPACT))) {
98-
jobinfo_fatal_error (job, errno, "Failed to get input to IMP");
99-
goto out;
100-
}
101-
if (bulk_exec_write (exec, "stdin", input, strlen (input)) < 0)
102-
jobinfo_fatal_error (job,
103-
errno,
104-
"Failed to write %ld bytes input to IMP",
105-
strlen (input));
106-
(void) bulk_exec_close (exec, "stdin");
107-
out:
108-
json_decref (o);
109-
free (input);
110-
}
111-
11291
}
11392

11493
static void complete_cb (struct bulk_exec *exec, void *arg)
@@ -122,13 +101,11 @@ static void complete_cb (struct bulk_exec *exec, void *arg)
122101
static int exec_barrier_enter (struct bulk_exec *exec)
123102
{
124103
struct exec_ctx *ctx = bulk_exec_aux_get (exec, "ctx");
125-
const char *stream = config_use_imp_helper () ?
126-
"stdin" :
127-
"FLUX_EXEC_PROTOCOL_FD";
104+
128105
if (!ctx)
129106
return -1;
130107
if (++ctx->barrier_enter_count == bulk_exec_total (exec)) {
131-
if (bulk_exec_write (exec, stream, "exit=0\n", 7) < 0)
108+
if (bulk_exec_write (exec, "stdin", "exit=0\n", 7) < 0)
132109
return -1;
133110
ctx->barrier_enter_count = 0;
134111
ctx->barrier_completion_count++;
@@ -140,7 +117,7 @@ static int exec_barrier_enter (struct bulk_exec *exec)
140117
* case where a shell exits while a barrier is already in progress
141118
* is handled in exit_cb().
142119
*/
143-
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
120+
if (bulk_exec_write (exec, "stdin", "exit=1\n", 7) < 0)
144121
return -1;
145122
}
146123
return 0;
@@ -155,8 +132,7 @@ static void output_cb (struct bulk_exec *exec, flux_subprocess_t *p,
155132
struct jobinfo *job = arg;
156133
const char *cmd = flux_cmd_arg (flux_subprocess_get_cmd (p), 0);
157134

158-
if (streq (stream, "FLUX_EXEC_PROTOCOL_FD")
159-
|| (config_use_imp_helper () && streq (stream, "stdout"))) {
135+
if (streq (stream, "stdout")) {
160136
if (streq (data, "enter\n")
161137
&& exec_barrier_enter (exec) < 0) {
162138
jobinfo_fatal_error (job,
@@ -296,10 +272,7 @@ static void exit_cb (struct bulk_exec *exec,
296272
*/
297273
if (ctx->barrier_completion_count == 0
298274
|| ctx->barrier_enter_count > 0) {
299-
const char *stream = config_use_imp_helper () ?
300-
"stdin" :
301-
"FLUX_EXEC_PROTOCOL_FD";
302-
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
275+
if (bulk_exec_write (exec, "stdin", "exit=1\n", 7) < 0)
303276
jobinfo_fatal_error (job, 0,
304277
"failed to terminate barrier: %s",
305278
strerror (errno));
@@ -355,12 +328,11 @@ static int exec_init (struct jobinfo *job)
355328
goto err;
356329
}
357330
if (job->multiuser) {
358-
if (config_use_imp_helper ()
359-
&& flux_cmd_setenvf (cmd,
360-
1,
361-
"FLUX_IMP_EXEC_HELPER",
362-
"flux imp-exec-helper %ju",
363-
(uintmax_t) job->id) < 0) {
331+
if (flux_cmd_setenvf (cmd,
332+
1,
333+
"FLUX_IMP_EXEC_HELPER",
334+
"flux imp-exec-helper %ju",
335+
(uintmax_t) job->id) < 0) {
364336
flux_log_error (job->h, "exec_init: flux_cmd_setenvf");
365337
goto err;
366338
}
@@ -375,19 +347,6 @@ static int exec_init (struct jobinfo *job)
375347
flux_log_error (job->h, "exec_init: flux_cmd_argv_append");
376348
goto err;
377349
}
378-
379-
/* If more than one shell is involved in this job, set up a channel
380-
* for exec system based barrier:
381-
*/
382-
if (idset_count (ranks) > 1 && !config_use_imp_helper ()) {
383-
if (flux_cmd_add_channel (cmd, "FLUX_EXEC_PROTOCOL_FD") < 0
384-
|| flux_cmd_setopt (cmd,
385-
"FLUX_EXEC_PROTOCOL_FD_LINE_BUFFER",
386-
"true") < 0) {
387-
flux_log_error (job->h, "exec_init: flux_cmd_add_channel");
388-
goto err;
389-
}
390-
}
391350
if (bulk_exec_push_cmd (exec, ranks, cmd, 0) < 0) {
392351
flux_log_error (job->h, "exec_init: bulk_exec_push_cmd");
393352
goto err;

src/modules/job-exec/exec_config.c

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
static const char *default_cwd = "/tmp";
2828
static const char *default_job_shell = NULL;
2929
static const char *flux_imp_path = NULL;
30-
static bool use_imp_helper = false;
3130

3231
static const char *jobspec_get_job_shell (json_t *jobspec)
3332
{
@@ -74,11 +73,6 @@ const char *config_get_imp_path (void)
7473
return flux_imp_path;
7574
}
7675

77-
bool config_use_imp_helper (void)
78-
{
79-
return use_imp_helper;
80-
}
81-
8276
/* Initialize common configurations for use by job-exec exec modules.
8377
*/
8478
int config_init (flux_t *h, int argc, char **argv)
@@ -114,33 +108,22 @@ int config_init (flux_t *h, int argc, char **argv)
114108
return -1;
115109
}
116110

117-
#if HAVE_FLUX_SECURITY
118-
/* Use IMP helper by default for flux-security >= 0.9.0
119-
*/
120-
if (FLUX_SECURITY_VERSION_MAJOR >= 0
121-
&& FLUX_SECURITY_VERSION_MINOR >= 9)
122-
use_imp_helper = true;
123-
#endif /* HAVE_FLUX_SECURITY */
124-
125111
if (argv && argc) {
126112
/* Finally, override values on cmdline */
127113
for (int i = 0; i < argc; i++) {
128114
if (strstarts (argv[i], "job-shell="))
129115
default_job_shell = argv[i]+10;
130116
else if (strstarts (argv[i], "imp="))
131117
flux_imp_path = argv[i]+4;
132-
else if (streq (argv[i], "no-imp-helper"))
133-
use_imp_helper = false;
134118
}
135119
}
136120

137121
flux_log (h, LOG_DEBUG, "using default shell path %s", default_job_shell);
138122
if (flux_imp_path) {
139123
flux_log (h,
140124
LOG_DEBUG,
141-
"using imp path %s (%s helper)",
142-
flux_imp_path,
143-
use_imp_helper ? "with" : "without");
125+
"using imp path %s (with helper)",
126+
flux_imp_path);
144127
}
145128
return 0;
146129
}

src/modules/job-exec/exec_config.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ const char *config_get_cwd (struct jobinfo *job);
2121

2222
const char *config_get_imp_path (void);
2323

24-
bool config_use_imp_helper (void);
25-
2624
int config_init (flux_t *h, int argc, char **argv);
2725

2826
#endif /* !HAVE_JOB_EXEC_CONFIG_EXEC_H */

src/modules/job-exec/job-exec.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ void jobinfo_decref (struct jobinfo *job)
147147
job->ctx = NULL;
148148
flux_msg_decref (job->req);
149149
job->req = NULL;
150-
free (job->J);
151150
resource_set_destroy (job->R);
152151
json_decref (job->jobspec);
153152
free (job->rootref);
@@ -892,13 +891,6 @@ static void jobinfo_start_continue (flux_future_t *f, void *arg)
892891
jobinfo_fatal_error (job, errno, "initializing critical ranks");
893892
goto done;
894893
}
895-
if (job->multiuser && !config_use_imp_helper ()) {
896-
const char *J = jobinfo_kvs_lookup_get (f, "J");
897-
if (!J || !(job->J = strdup (J))) {
898-
jobinfo_fatal_error (job, errno, "reading J: %s", error.text);
899-
goto done;
900-
}
901-
}
902894
if (jobinfo_load_implementation (job) < 0) {
903895
jobinfo_fatal_error (job, errno, "failed to initialize implementation");
904896
goto done;
@@ -1057,12 +1049,6 @@ static flux_future_t *jobinfo_start_init (struct jobinfo *job)
10571049
if (!(f_kvs = flux_jobid_kvs_lookup (h, job->id, 0, "R"))
10581050
|| flux_future_push (f, "R", f_kvs) < 0)
10591051
goto err;
1060-
if (job->multiuser
1061-
&& !config_use_imp_helper ()
1062-
&& (!(f_kvs = flux_jobid_kvs_lookup (h, job->id, 0, "J"))
1063-
|| flux_future_push (f, "J", f_kvs) < 0)) {
1064-
goto err;
1065-
}
10661052
if (job->reattach)
10671053
f_kvs = ns_get_rootref (h, job, 0);
10681054
else

src/modules/job-exec/job-exec.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ struct jobinfo {
6868

6969
struct resource_set * R; /* Fetched and parsed resource set R */
7070
json_t * jobspec; /* Fetched jobspec */
71-
char * J; /* Signed jobspec */
7271

7372
struct idset * critical_ranks; /* critical shell ranks */
7473

src/modules/job-exec/sdexec.c

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,14 @@ static struct sdexec *sdexec_create (flux_t *h,
210210
flux_log_error (job->h, "flux_cmd_argv_append");
211211
goto cleanup;
212212
}
213+
if (flux_cmd_setenvf (se->cmd,
214+
1,
215+
"FLUX_IMP_EXEC_HELPER",
216+
"flux imp-exec-helper %ju",
217+
(uintmax_t) job->id) < 0) {
218+
flux_log_error (job->h, "flux_cmd_setenvf");
219+
goto cleanup;
220+
}
213221
}
214222

215223
if (flux_cmd_argv_append (se->cmd, job_shell) < 0
@@ -357,26 +365,7 @@ static void state_cb (sdprocess_t *sdp, sdprocess_state_t state, void *arg)
357365
if (!se->job->reattach && !se->job->running)
358366
jobinfo_started (se->job);
359367

360-
if (state == SDPROCESS_ACTIVE) {
361-
/* Don't try to write J to stdin_fd of -1
362-
* This probably indicates we've reattached to this job
363-
*/
364-
if (se->job->multiuser && se->stdin_fds[0] >= 0) {
365-
char *input = NULL;
366-
json_t *o = json_pack ("{s:s}", "J", se->job->J);
367-
if (!o || !(input = json_dumps (o, JSON_COMPACT))) {
368-
jobinfo_fatal_error (se->job, errno, "Failed to get input to IMP");
369-
return;
370-
}
371-
if (write (se->stdin_fds[0], input, strlen (input)) < 0) {
372-
jobinfo_fatal_error (se->job, errno, "write");
373-
return;
374-
}
375-
close (se->stdin_fds[0]);
376-
se->stdin_fds[0] = -1;
377-
}
378-
}
379-
else if (state == SDPROCESS_EXITED
368+
if (state == SDPROCESS_EXITED
380369
&& !se->jobinfo_tasks_complete_called) {
381370

382371
/* Since we are calling jobinfo_tasks_complete(), the

0 commit comments

Comments
 (0)