Skip to content

Commit abd95c1

Browse files
authored
Merge pull request #5186 from grondo/imp-exec-helper
job-exec: provide IMP exec helper when possible and use stdin/out for shell exec protocol
2 parents 94bf05d + a6a8a31 commit abd95c1

File tree

17 files changed

+102
-65
lines changed

17 files changed

+102
-65
lines changed

src/cmd/Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ dist_fluxcmd_SCRIPTS = \
112112
flux-pstree.py \
113113
flux-pgrep.py \
114114
flux-queue.py \
115-
flux-cancel.py
115+
flux-cancel.py \
116+
flux-imp-exec-helper
116117

117118
fluxcmd_PROGRAMS = \
118119
flux-terminus \

src/cmd/flux-imp-exec-helper

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/sh
2+
##############################################################
3+
# Copyright 2023 Lawrence Livermore National Security, LLC
4+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
5+
#
6+
# This file is part of the Flux resource manager framework.
7+
# For details, see https://github.com/flux-framework.
8+
#
9+
# SPDX-License-Identifier: LGPL-3.0
10+
##############################################################
11+
#
12+
# Helper for flux-imp exec functionality.
13+
# Emit input to IMP for jobid on stdout given jobid in $1
14+
#
15+
JOBID=${1:-$FLUX_JOB_ID}
16+
if test -z "$JOBID"; then
17+
echo "flux-imp-exec-helper: Unable to determine jobid" >&2
18+
exit 1
19+
fi
20+
printf '{"J": "%s"}' $(flux job info --orig $JOBID J)

src/common/libflux/Makefile.am

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ installed_conf_cppflags = \
3434
-DINSTALLED_UPMI_PLUGINPATH=\"$(fluxlibdir)/upmi/plugins\" \
3535
-DINSTALLED_CMDHELP_PATTERN=\"${datadir}/flux/help.d/*.json\" \
3636
-DINSTALLED_NO_DOCS_PATH=\"${datadir}/flux/.nodocs\" \
37-
-DINSTALLED_RUNDIR=\"${runstatedir}/flux\" \
38-
-DINSTALLED_BINDIR=\"$(fluxcmddir)\"
37+
-DINSTALLED_RUNDIR=\"${runstatedir}/flux\"
3938

4039
intree_conf_cppflags = \
4140
-DINTREE_MODULE_PATH=\"$(abs_top_builddir)/src/modules/.libs\" \
@@ -55,8 +54,7 @@ intree_conf_cppflags = \
5554
-DINTREE_JOBTAP_PLUGINPATH=\"$(abs_top_builddir)/src/modules/job-manager/plugins/.libs\" \
5655
-DINTREE_UPMI_PLUGINPATH=\"$(abs_top_builddir)/src/common/libpmi/plugins/.libs\" \
5756
-DINTREE_CMDHELP_PATTERN=\"${abs_top_builddir}/etc/flux/help.d/*.json\" \
58-
-DINTREE_NO_DOCS_PATH=\"${abs_top_builddir}/etc/flux/.nodocs\" \
59-
-DINTREE_BINDIR=\"${abs_top_builddir}/src/cmd\"
57+
-DINTREE_NO_DOCS_PATH=\"${abs_top_builddir}/etc/flux/.nodocs\"
6058

6159

6260
fluxcoreinclude_HEADERS = \

src/common/libflux/conf.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ static struct builtin builtin_tab[] = {
6666
{ "upmi_pluginpath",INSTALLED_UPMI_PLUGINPATH, INTREE_UPMI_PLUGINPATH },
6767
{ "no_docs_path", INSTALLED_NO_DOCS_PATH, INTREE_NO_DOCS_PATH },
6868
{ "rundir", INSTALLED_RUNDIR, NULL },
69-
{ "bindir", INSTALLED_BINDIR, INTREE_BINDIR },
7069
{ NULL, NULL, NULL },
7170
};
7271

src/modules/job-exec/exec.c

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ static void start_cb (struct bulk_exec *exec, void *arg)
8686
/* This is going to be really slow. However, it should at least
8787
* work for now. We wait for all imp's to start, then send input
8888
*/
89-
if (job->multiuser) {
89+
if (job->multiuser && !config_use_imp_helper ()) {
9090
char *input = NULL;
9191
json_t *o = json_pack ("{s:s}", "J", job->J);
9292
if (!o || !(input = json_dumps (o, JSON_COMPACT))) {
@@ -117,13 +117,13 @@ static void complete_cb (struct bulk_exec *exec, void *arg)
117117
static int exec_barrier_enter (struct bulk_exec *exec)
118118
{
119119
struct exec_ctx *ctx = bulk_exec_aux_get (exec, "ctx");
120+
const char *stream = config_use_imp_helper () ?
121+
"stdin" :
122+
"FLUX_EXEC_PROTOCOL_FD";
120123
if (!ctx)
121124
return -1;
122125
if (++ctx->barrier_enter_count == bulk_exec_total (exec)) {
123-
if (bulk_exec_write (exec,
124-
"FLUX_EXEC_PROTOCOL_FD",
125-
"exit=0\n",
126-
7) < 0)
126+
if (bulk_exec_write (exec, stream, "exit=0\n", 7) < 0)
127127
return -1;
128128
ctx->barrier_enter_count = 0;
129129
ctx->barrier_completion_count++;
@@ -135,10 +135,7 @@ static int exec_barrier_enter (struct bulk_exec *exec)
135135
* case where a shell exits while a barrier is already in progress
136136
* is handled in exit_cb().
137137
*/
138-
if (bulk_exec_write (exec,
139-
"FLUX_EXEC_PROTOCOL_FD",
140-
"exit=1\n",
141-
7) < 0)
138+
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
142139
return -1;
143140
}
144141
return 0;
@@ -153,7 +150,8 @@ static void output_cb (struct bulk_exec *exec, flux_subprocess_t *p,
153150
struct jobinfo *job = arg;
154151
const char *cmd = flux_cmd_arg (flux_subprocess_get_cmd (p), 0);
155152

156-
if (streq (stream, "FLUX_EXEC_PROTOCOL_FD")) {
153+
if (streq (stream, "FLUX_EXEC_PROTOCOL_FD")
154+
|| (config_use_imp_helper () && streq (stream, "stdout"))) {
157155
if (streq (data, "enter\n")
158156
&& exec_barrier_enter (exec) < 0) {
159157
jobinfo_fatal_error (job,
@@ -293,10 +291,10 @@ static void exit_cb (struct bulk_exec *exec,
293291
*/
294292
if (ctx->barrier_completion_count == 0
295293
|| ctx->barrier_enter_count > 0) {
296-
if (bulk_exec_write (exec,
297-
"FLUX_EXEC_PROTOCOL_FD",
298-
"exit=1\n",
299-
7) < 0)
294+
const char *stream = config_use_imp_helper () ?
295+
"stdin" :
296+
"FLUX_EXEC_PROTOCOL_FD";
297+
if (bulk_exec_write (exec, stream, "exit=1\n", 7) < 0)
300298
jobinfo_fatal_error (job, 0,
301299
"failed to terminate barrier: %s",
302300
strerror (errno));
@@ -351,6 +349,15 @@ static int exec_init (struct jobinfo *job)
351349
goto err;
352350
}
353351
if (job->multiuser) {
352+
if (config_use_imp_helper ()
353+
&& flux_cmd_setenvf (cmd,
354+
1,
355+
"FLUX_IMP_EXEC_HELPER",
356+
"flux imp-exec-helper %ju",
357+
(uintmax_t) job->id) < 0) {
358+
flux_log_error (job->h, "exec_init: flux_cmd_setenvf");
359+
goto err;
360+
}
354361
if (flux_cmd_argv_append (cmd, config_get_imp_path ()) < 0
355362
|| flux_cmd_argv_append (cmd, "exec") < 0) {
356363
flux_log_error (job->h, "exec_init: flux_cmd_argv_append");
@@ -366,7 +373,7 @@ static int exec_init (struct jobinfo *job)
366373
/* If more than one shell is involved in this job, set up a channel
367374
* for exec system based barrier:
368375
*/
369-
if (idset_count (ranks) > 1) {
376+
if (idset_count (ranks) > 1 && !config_use_imp_helper ()) {
370377
if (flux_cmd_add_channel (cmd, "FLUX_EXEC_PROTOCOL_FD") < 0
371378
|| flux_cmd_setopt (cmd,
372379
"FLUX_EXEC_PROTOCOL_FD_LINE_BUFFER",

src/modules/job-exec/exec_config.c

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717
#include <jansson.h>
1818
#include <unistd.h>
1919

20+
#if HAVE_FLUX_SECURITY
21+
#include <flux/security/version.h>
22+
#endif
23+
2024
#include "exec_config.h"
2125
#include "ccan/str/str.h"
2226

2327
static const char *default_cwd = "/tmp";
2428
static const char *default_job_shell = NULL;
2529
static const char *flux_imp_path = NULL;
30+
static bool use_imp_helper = false;
2631

2732
static const char *jobspec_get_job_shell (json_t *jobspec)
2833
{
@@ -69,6 +74,11 @@ const char *config_get_imp_path (void)
6974
return flux_imp_path;
7075
}
7176

77+
bool config_use_imp_helper (void)
78+
{
79+
return use_imp_helper;
80+
}
81+
7282
/* Initialize common configurations for use by job-exec exec modules.
7383
*/
7484
int config_init (flux_t *h, int argc, char **argv)
@@ -104,19 +114,34 @@ int config_init (flux_t *h, int argc, char **argv)
104114
return -1;
105115
}
106116

117+
#if HAVE_FLUX_SECURITY
118+
/* Use IMP helper by default for flux-security >= 0.9.0
119+
*/
120+
if (FLUX_SECURITY_VERSION_MAJOR >= 0
121+
&& FLUX_SECURITY_VERSION_MINOR >= 9)
122+
use_imp_helper = true;
123+
#endif /* HAVE_FLUX_SECURITY */
124+
107125
if (argv && argc) {
108126
/* Finally, override values on cmdline */
109127
for (int i = 0; i < argc; i++) {
110128
if (strstarts (argv[i], "job-shell="))
111129
default_job_shell = argv[i]+10;
112130
else if (strstarts (argv[i], "imp="))
113131
flux_imp_path = argv[i]+4;
132+
else if (streq (argv[i], "no-imp-helper"))
133+
use_imp_helper = false;
114134
}
115135
}
116136

117137
flux_log (h, LOG_DEBUG, "using default shell path %s", default_job_shell);
118-
if (flux_imp_path)
119-
flux_log (h, LOG_DEBUG, "using imp path %s", flux_imp_path);
138+
if (flux_imp_path) {
139+
flux_log (h,
140+
LOG_DEBUG,
141+
"using imp path %s (%s helper)",
142+
flux_imp_path,
143+
use_imp_helper ? "with" : "without");
144+
}
120145
return 0;
121146
}
122147

src/modules/job-exec/exec_config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const char *config_get_cwd (struct jobinfo *job);
2121

2222
const char *config_get_imp_path (void);
2323

24+
bool config_use_imp_helper (void);
25+
2426
int config_init (flux_t *h, int argc, char **argv);
2527

2628
#endif /* !HAVE_JOB_EXEC_CONFIG_EXEC_H */

src/modules/job-exec/job-exec.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101

102102
#include "job-exec.h"
103103
#include "checkpoint.h"
104+
#include "exec_config.h"
104105

105106
static double kill_timeout=5.0;
106107

@@ -890,7 +891,7 @@ static void jobinfo_start_continue (flux_future_t *f, void *arg)
890891
jobinfo_fatal_error (job, errno, "initializing critical ranks");
891892
goto done;
892893
}
893-
if (job->multiuser) {
894+
if (job->multiuser && !config_use_imp_helper ()) {
894895
const char *J = jobinfo_kvs_lookup_get (f, "J");
895896
if (!J || !(job->J = strdup (J))) {
896897
jobinfo_fatal_error (job, errno, "reading J: %s", error.text);
@@ -1056,6 +1057,7 @@ static flux_future_t *jobinfo_start_init (struct jobinfo *job)
10561057
|| flux_future_push (f, "R", f_kvs) < 0)
10571058
goto err;
10581059
if (job->multiuser
1060+
&& !config_use_imp_helper ()
10591061
&& (!(f_kvs = flux_jobid_kvs_lookup (h, job->id, 0, "J"))
10601062
|| flux_future_push (f, "J", f_kvs) < 0)) {
10611063
goto err;

src/modules/job-exec/sdexec.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,6 @@ struct sdexec
8686
bool jobinfo_tasks_complete_called;
8787
};
8888

89-
static int sdexec_config (flux_t *h, int argc, char **argv)
90-
{
91-
return config_init (h, argc, argv);
92-
}
93-
9489
static void sdexec_destroy (void *data)
9590
{
9691
struct sdexec *se = data;
@@ -788,7 +783,6 @@ static int sdexec_cancel (struct jobinfo *job)
788783

789784
struct exec_implementation sdexec = {
790785
.name = "sdexec",
791-
.config = sdexec_config,
792786
.init = sdexec_init,
793787
.exit = sdexec_exit,
794788
.start = sdexec_start,

src/shell/internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct flux_shell {
2727
flux_jobid_t jobid;
2828
int broker_rank;
2929
char hostname [MAXHOSTNAMELEN + 1];
30-
int protocol_fd;
30+
int protocol_fd[2];
3131

3232
optparse_t *p;
3333
flux_t *h;

0 commit comments

Comments
 (0)