Skip to content

Commit 7a64c5b

Browse files
authored
Merge pull request #5267 from garlick/job-exec-barrier
job-exec: always use stdio for exec barrier
2 parents 872253c + 6dcba10 commit 7a64c5b

File tree

6 files changed

+27
-108
lines changed

6 files changed

+27
-108
lines changed

src/modules/job-exec/exec.c

Lines changed: 16 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ struct exec_ctx {
5656

5757
static void exec_ctx_destroy (struct exec_ctx *tc)
5858
{
59-
free (tc);
59+
if (tc) {
60+
int saved_errno = errno;
61+
free (tc);
62+
errno = saved_errno;
63+
}
6064
}
6165

6266
static struct exec_ctx *exec_ctx_create (json_t *jobspec)
@@ -84,27 +88,6 @@ static void start_cb (struct bulk_exec *exec, void *arg)
8488
{
8589
struct jobinfo *job = arg;
8690
jobinfo_started (job);
87-
/* This is going to be really slow. However, it should at least
88-
* work for now. We wait for all imp's to start, then send input
89-
*/
90-
if (job->multiuser && !config_use_imp_helper ()) {
91-
char *input = NULL;
92-
json_t *o = json_pack ("{s:s}", "J", job->J);
93-
if (!o || !(input = json_dumps (o, JSON_COMPACT))) {
94-
jobinfo_fatal_error (job, errno, "Failed to get input to IMP");
95-
goto out;
96-
}
97-
if (bulk_exec_write (exec, "stdin", input, strlen (input)) < 0)
98-
jobinfo_fatal_error (job,
99-
errno,
100-
"Failed to write %ld bytes input to IMP",
101-
strlen (input));
102-
(void) bulk_exec_close (exec, "stdin");
103-
out:
104-
json_decref (o);
105-
free (input);
106-
}
107-
10891
}
10992

11093
static void complete_cb (struct bulk_exec *exec, void *arg)
@@ -118,13 +101,11 @@ static void complete_cb (struct bulk_exec *exec, void *arg)
118101
static int exec_barrier_enter (struct bulk_exec *exec)
119102
{
120103
struct exec_ctx *ctx = bulk_exec_aux_get (exec, "ctx");
121-
const char *stream = config_use_imp_helper () ?
122-
"stdin" :
123-
"FLUX_EXEC_PROTOCOL_FD";
104+
124105
if (!ctx)
125106
return -1;
126107
if (++ctx->barrier_enter_count == bulk_exec_total (exec)) {
127-
if (bulk_exec_write (exec, stream, "exit=0\n", 7) < 0)
108+
if (bulk_exec_write (exec, "stdin", "exit=0\n", 7) < 0)
128109
return -1;
129110
ctx->barrier_enter_count = 0;
130111
ctx->barrier_completion_count++;
@@ -136,7 +117,7 @@ static int exec_barrier_enter (struct bulk_exec *exec)
136117
* case where a shell exits while a barrier is already in progress
137118
* is handled in exit_cb().
138119
*/
139-
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
120+
if (bulk_exec_write (exec, "stdin", "exit=1\n", 7) < 0)
140121
return -1;
141122
}
142123
return 0;
@@ -151,8 +132,7 @@ static void output_cb (struct bulk_exec *exec, flux_subprocess_t *p,
151132
struct jobinfo *job = arg;
152133
const char *cmd = flux_cmd_arg (flux_subprocess_get_cmd (p), 0);
153134

154-
if (streq (stream, "FLUX_EXEC_PROTOCOL_FD")
155-
|| (config_use_imp_helper () && streq (stream, "stdout"))) {
135+
if (streq (stream, "stdout")) {
156136
if (streq (data, "enter\n")
157137
&& exec_barrier_enter (exec) < 0) {
158138
jobinfo_fatal_error (job,
@@ -292,10 +272,7 @@ static void exit_cb (struct bulk_exec *exec,
292272
*/
293273
if (ctx->barrier_completion_count == 0
294274
|| ctx->barrier_enter_count > 0) {
295-
const char *stream = config_use_imp_helper () ?
296-
"stdin" :
297-
"FLUX_EXEC_PROTOCOL_FD";
298-
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
275+
if (bulk_exec_write (exec, "stdin", "exit=1\n", 7) < 0)
299276
jobinfo_fatal_error (job, 0,
300277
"failed to terminate barrier: %s",
301278
strerror (errno));
@@ -338,6 +315,7 @@ static int exec_init (struct jobinfo *job)
338315
}
339316
if (bulk_exec_aux_set (exec, "ctx", ctx,
340317
(flux_free_f) exec_ctx_destroy) < 0) {
318+
exec_ctx_destroy (ctx);
341319
flux_log_error (job->h, "exec_init: bulk_exec_aux_set");
342320
goto err;
343321
}
@@ -350,12 +328,11 @@ static int exec_init (struct jobinfo *job)
350328
goto err;
351329
}
352330
if (job->multiuser) {
353-
if (config_use_imp_helper ()
354-
&& flux_cmd_setenvf (cmd,
355-
1,
356-
"FLUX_IMP_EXEC_HELPER",
357-
"flux imp-exec-helper %ju",
358-
(uintmax_t) job->id) < 0) {
331+
if (flux_cmd_setenvf (cmd,
332+
1,
333+
"FLUX_IMP_EXEC_HELPER",
334+
"flux imp-exec-helper %ju",
335+
(uintmax_t) job->id) < 0) {
359336
flux_log_error (job->h, "exec_init: flux_cmd_setenvf");
360337
goto err;
361338
}
@@ -370,19 +347,6 @@ static int exec_init (struct jobinfo *job)
370347
flux_log_error (job->h, "exec_init: flux_cmd_argv_append");
371348
goto err;
372349
}
373-
374-
/* If more than one shell is involved in this job, set up a channel
375-
* for exec system based barrier:
376-
*/
377-
if (idset_count (ranks) > 1 && !config_use_imp_helper ()) {
378-
if (flux_cmd_add_channel (cmd, "FLUX_EXEC_PROTOCOL_FD") < 0
379-
|| flux_cmd_setopt (cmd,
380-
"FLUX_EXEC_PROTOCOL_FD_LINE_BUFFER",
381-
"true") < 0) {
382-
flux_log_error (job->h, "exec_init: flux_cmd_add_channel");
383-
goto err;
384-
}
385-
}
386350
if (bulk_exec_push_cmd (exec, ranks, cmd, 0) < 0) {
387351
flux_log_error (job->h, "exec_init: bulk_exec_push_cmd");
388352
goto err;

src/modules/job-exec/exec_config.c

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
static const char *default_cwd = "/tmp";
2828
static const char *default_job_shell = NULL;
2929
static const char *flux_imp_path = NULL;
30-
static bool use_imp_helper = false;
3130

3231
static const char *jobspec_get_job_shell (json_t *jobspec)
3332
{
@@ -74,11 +73,6 @@ const char *config_get_imp_path (void)
7473
return flux_imp_path;
7574
}
7675

77-
bool config_use_imp_helper (void)
78-
{
79-
return use_imp_helper;
80-
}
81-
8276
/* Initialize common configurations for use by job-exec exec modules.
8377
*/
8478
int config_init (flux_t *h, int argc, char **argv)
@@ -114,33 +108,22 @@ int config_init (flux_t *h, int argc, char **argv)
114108
return -1;
115109
}
116110

117-
#if HAVE_FLUX_SECURITY
118-
/* Use IMP helper by default for flux-security >= 0.9.0
119-
*/
120-
if (FLUX_SECURITY_VERSION_MAJOR >= 0
121-
&& FLUX_SECURITY_VERSION_MINOR >= 9)
122-
use_imp_helper = true;
123-
#endif /* HAVE_FLUX_SECURITY */
124-
125111
if (argv && argc) {
126112
/* Finally, override values on cmdline */
127113
for (int i = 0; i < argc; i++) {
128114
if (strstarts (argv[i], "job-shell="))
129115
default_job_shell = argv[i]+10;
130116
else if (strstarts (argv[i], "imp="))
131117
flux_imp_path = argv[i]+4;
132-
else if (streq (argv[i], "no-imp-helper"))
133-
use_imp_helper = false;
134118
}
135119
}
136120

137121
flux_log (h, LOG_DEBUG, "using default shell path %s", default_job_shell);
138122
if (flux_imp_path) {
139123
flux_log (h,
140124
LOG_DEBUG,
141-
"using imp path %s (%s helper)",
142-
flux_imp_path,
143-
use_imp_helper ? "with" : "without");
125+
"using imp path %s (with helper)",
126+
flux_imp_path);
144127
}
145128
return 0;
146129
}

src/modules/job-exec/exec_config.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ const char *config_get_cwd (struct jobinfo *job);
2121

2222
const char *config_get_imp_path (void);
2323

24-
bool config_use_imp_helper (void);
25-
2624
int config_init (flux_t *h, int argc, char **argv);
2725

2826
#endif /* !HAVE_JOB_EXEC_CONFIG_EXEC_H */

src/modules/job-exec/job-exec.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ void jobinfo_decref (struct jobinfo *job)
147147
job->ctx = NULL;
148148
flux_msg_decref (job->req);
149149
job->req = NULL;
150-
free (job->J);
151150
resource_set_destroy (job->R);
152151
json_decref (job->jobspec);
153152
free (job->rootref);
@@ -892,13 +891,6 @@ static void jobinfo_start_continue (flux_future_t *f, void *arg)
892891
jobinfo_fatal_error (job, errno, "initializing critical ranks");
893892
goto done;
894893
}
895-
if (job->multiuser && !config_use_imp_helper ()) {
896-
const char *J = jobinfo_kvs_lookup_get (f, "J");
897-
if (!J || !(job->J = strdup (J))) {
898-
jobinfo_fatal_error (job, errno, "reading J: %s", error.text);
899-
goto done;
900-
}
901-
}
902894
if (jobinfo_load_implementation (job) < 0) {
903895
jobinfo_fatal_error (job, errno, "failed to initialize implementation");
904896
goto done;
@@ -1057,12 +1049,6 @@ static flux_future_t *jobinfo_start_init (struct jobinfo *job)
10571049
if (!(f_kvs = flux_jobid_kvs_lookup (h, job->id, 0, "R"))
10581050
|| flux_future_push (f, "R", f_kvs) < 0)
10591051
goto err;
1060-
if (job->multiuser
1061-
&& !config_use_imp_helper ()
1062-
&& (!(f_kvs = flux_jobid_kvs_lookup (h, job->id, 0, "J"))
1063-
|| flux_future_push (f, "J", f_kvs) < 0)) {
1064-
goto err;
1065-
}
10661052
if (job->reattach)
10671053
f_kvs = ns_get_rootref (h, job, 0);
10681054
else

src/modules/job-exec/job-exec.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ struct jobinfo {
6868

6969
struct resource_set * R; /* Fetched and parsed resource set R */
7070
json_t * jobspec; /* Fetched jobspec */
71-
char * J; /* Signed jobspec */
7271

7372
struct idset * critical_ranks; /* critical shell ranks */
7473

src/modules/job-exec/sdexec.c

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,14 @@ static struct sdexec *sdexec_create (flux_t *h,
210210
flux_log_error (job->h, "flux_cmd_argv_append");
211211
goto cleanup;
212212
}
213+
if (flux_cmd_setenvf (se->cmd,
214+
1,
215+
"FLUX_IMP_EXEC_HELPER",
216+
"flux imp-exec-helper %ju",
217+
(uintmax_t) job->id) < 0) {
218+
flux_log_error (job->h, "flux_cmd_setenvf");
219+
goto cleanup;
220+
}
213221
}
214222

215223
if (flux_cmd_argv_append (se->cmd, job_shell) < 0
@@ -357,26 +365,7 @@ static void state_cb (sdprocess_t *sdp, sdprocess_state_t state, void *arg)
357365
if (!se->job->reattach && !se->job->running)
358366
jobinfo_started (se->job);
359367

360-
if (state == SDPROCESS_ACTIVE) {
361-
/* Don't try to write J to stdin_fd of -1
362-
* This probably indicates we've reattached to this job
363-
*/
364-
if (se->job->multiuser && se->stdin_fds[0] >= 0) {
365-
char *input = NULL;
366-
json_t *o = json_pack ("{s:s}", "J", se->job->J);
367-
if (!o || !(input = json_dumps (o, JSON_COMPACT))) {
368-
jobinfo_fatal_error (se->job, errno, "Failed to get input to IMP");
369-
return;
370-
}
371-
if (write (se->stdin_fds[0], input, strlen (input)) < 0) {
372-
jobinfo_fatal_error (se->job, errno, "write");
373-
return;
374-
}
375-
close (se->stdin_fds[0]);
376-
se->stdin_fds[0] = -1;
377-
}
378-
}
379-
else if (state == SDPROCESS_EXITED
368+
if (state == SDPROCESS_EXITED
380369
&& !se->jobinfo_tasks_complete_called) {
381370

382371
/* Since we are calling jobinfo_tasks_complete(), the

0 commit comments

Comments
 (0)