Skip to content

Commit 7a64e8c

Browse files
authored
Merge pull request #5202 from grondo/broker-signals
broker: foward nonfatal signals to all running jobs
2 parents 5948b01 + 550f87b commit 7a64e8c

File tree

4 files changed

+118
-5
lines changed

4 files changed

+118
-5
lines changed

src/broker/broker.c

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "src/common/libfluxutil/method.h"
4646
#include "ccan/array_size/array_size.h"
4747
#include "ccan/str/str.h"
48+
#include "ccan/ptrint/ptrint.h"
4849

4950
#include "module.h"
5051
#include "brokercfg.h"
@@ -1308,7 +1309,8 @@ static void broker_destroy_sigwatcher (void *data)
13081309

13091310
static int broker_handle_signals (broker_ctx_t *ctx)
13101311
{
1311-
int i, sigs[] = { SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGALRM };
1312+
int i, sigs[] = { SIGHUP, SIGINT, SIGQUIT, SIGTERM,
1313+
SIGALRM, SIGUSR1, SIGUSR2 };
13121314
int blocked[] = { SIGPIPE };
13131315
flux_watcher_t *w;
13141316

@@ -1965,13 +1967,80 @@ static void module_status_cb (module_t *p, int prev_status, void *arg)
19651967
}
19661968
}
19671969

1970+
static bool signal_is_deadly (int signum)
1971+
{
1972+
int deadly_sigs[] = { SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGALRM };
1973+
for (int i = 0; i < ARRAY_SIZE (deadly_sigs); i++) {
1974+
if (signum == deadly_sigs[i])
1975+
return true;
1976+
}
1977+
return false;
1978+
}
1979+
1980+
static void killall_cb (flux_future_t *f, void *arg)
1981+
{
1982+
broker_ctx_t *ctx = arg;
1983+
int count = 0;
1984+
if (flux_rpc_get_unpack (f, "{s:i}", "count", &count) < 0) {
1985+
flux_log_error (ctx->h,
1986+
"job-manager.killall: %s",
1987+
future_strerror (f, errno));
1988+
}
1989+
flux_future_destroy (f);
1990+
if (count) {
1991+
flux_log (ctx->h,
1992+
LOG_INFO,
1993+
"forwarded signal %d to %d jobs",
1994+
(int) ptr2int (flux_future_aux_get (f, "signal")),
1995+
count);
1996+
}
1997+
}
1998+
1999+
static int killall_jobs (broker_ctx_t *ctx, int signum)
2000+
{
2001+
flux_future_t *f = NULL;
2002+
if (!(f = flux_rpc_pack (ctx->h,
2003+
"job-manager.killall",
2004+
FLUX_NODEID_ANY,
2005+
0,
2006+
"{s:b s:i s:i}",
2007+
"dry_run", 0,
2008+
"userid", FLUX_USERID_UNKNOWN,
2009+
"signum", signum))
2010+
|| flux_future_then (f, -1., killall_cb, ctx) < 0) {
2011+
flux_future_destroy (f);
2012+
return -1;
2013+
}
2014+
if (flux_future_aux_set (f, "signum", int2ptr (signum), NULL) < 0)
2015+
flux_log_error (ctx->h, "killall: future_aux_set");
2016+
return 0;
2017+
}
2018+
19682019
static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
1969-
int revents, void *arg)
2020+
int revents, void *arg)
19702021
{
19712022
broker_ctx_t *ctx = arg;
19722023
int signum = flux_signal_watcher_get_signum (w);
19732024

19742025
flux_log (ctx->h, LOG_INFO, "signal %d", signum);
2026+
2027+
if (ctx->rank == 0 && !signal_is_deadly (signum)) {
2028+
/* Attempt to forward non-deadly signals to jobs. If that fails,
2029+
* then fall through to state_machine_kill() so the signal is
2030+
* delivered somewhere.
2031+
*/
2032+
if (killall_jobs (ctx, signum) == 0)
2033+
return;
2034+
/*
2035+
* Note: flux_rpc(3) in the rank 0 broker to the job manager module
2036+
* is expected to fail immediately if the job-manager module is not
2037+
* loaded due to the broker internal flux_t handle implementation.
2038+
*/
2039+
flux_log (ctx->h,
2040+
LOG_INFO,
2041+
"killall failed, delivering signal %d locally instead",
2042+
signum);
2043+
}
19752044
state_machine_kill (ctx->state_machine, signum);
19762045
}
19772046

src/broker/runat.c

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -465,9 +465,9 @@ int runat_push_shell_command (struct runat *r,
465465

466466
/* For shell commands run the target cmdline in a separate process
467467
* group so that any processes spawned by the shell will be signaled
468-
* in runat_abort(). This is probably unnecessary for single commands,
469-
* and does not work for an interactive shell (seems to disable access
470-
* to the pty), so we set the flag only here for now.
468+
* in runat_abort(). This does not work for an interactive shell
469+
* (seems to disable access to the pty), so this flag is not set in
470+
* runat_push_shell().
471471
*/
472472
cmd->flags |= FLUX_SUBPROCESS_FLAGS_SETPGRP;
473473

@@ -522,6 +522,13 @@ int runat_push_command (struct runat *r,
522522
}
523523
if (!(cmd = runat_command_create (environ, flags)))
524524
return -1;
525+
526+
/* Run the target cmdline in a separate process group so that any
527+
* processes spawned by the new process are also signaled in
528+
* runat_abort().
529+
*/
530+
cmd->flags |= FLUX_SUBPROCESS_FLAGS_SETPGRP;
531+
525532
if (runat_command_set_argz (cmd, argz, argz_len) < 0)
526533
goto error;
527534
if (runat_command_modenv (cmd, env_blocklist, r->local_uri) < 0)

t/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ dist_check_SCRIPTS = \
344344
issues/t4711-job-list-purge-inactive.sh \
345345
issues/t4771-flux-start-bash.sh \
346346
issues/t4852-t_submit-legacy.sh \
347+
issues/t5105-signal-propagation.sh \
347348
python/__init__.py \
348349
python/subflux.py \
349350
python/tap \
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/bin/sh
2+
# Run an 3 level nested instance, with a test job at the final level
3+
4+
waitfile=$SHARNESS_TEST_SRCDIR/scripts/waitfile.lua
5+
6+
cat <<EOF >test.py
7+
import signal
8+
import time
9+
import flux
10+
import os
11+
12+
h = flux.Flux()
13+
level = h.attr_get("instance-level")
14+
jobid = os.getenv("FLUX_JOB_ID")
15+
signal.signal(
16+
signal.SIGUSR1,
17+
lambda x, y: print(f"job {jobid} in level {level} got SIGUSR1", flush=True),
18+
)
19+
open("ready", 'a').close()
20+
signal.pause()
21+
EOF
22+
23+
id=$(flux submit --output=log flux start \
24+
flux run flux start \
25+
flux run flux python ./test.py)
26+
27+
$waitfile -t 30 -v ready
28+
29+
flux job kill -s SIGUSR1 $id
30+
31+
$waitfile -t 30 -v -p "got SIGUSR1" log
32+
33+
flux job status --json -v $id
34+
35+
# vi: ts=4 sw=4 expandtab
36+

0 commit comments

Comments
 (0)