Skip to content

Commit 300c7a1

Browse files
committed
libsubprocess: avoid using child watchers
Problem: child watchers are only used by libsubprocess, and probably do not need to be in the public API since libsubprocess is provided for managing child processes. In addition, we are considering porting Flux to libuv and libuv does not offer similar functionality. Register a signal watcher for SIGCHLD within libsubprocess that persists as long as there are subprocesses to monitor, and calls waitpid(2) to consume all subprocess state changes. Add a hash by pid and allow subprocess objects to register a callback to receive these changes for a given pid. Have all subprocess users create reactors without FLUX_REACTOR_SIGCHLD, as the default ev_loop registers a SIGCHLD watcher that conflicts with this one. Add EVFLAG_SIGNALFD to the non-default loop, as that flag was used with reactors created with FLUX_REACTOR_SIGCHLD, and appears to be be required to avoid sharness tests hanging randomly.
1 parent da7a958 commit 300c7a1

File tree

22 files changed

+414
-76
lines changed

22 files changed

+414
-76
lines changed

src/broker/broker.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,16 +248,15 @@ int main (int argc, char *argv[])
248248
* ctx.h is used conventionally within the broker for RPCs, message
249249
* handlers, etc. ctx.h_internal belongs to the broker's routing logic
250250
* and is accessed using flux_send() and flux_recv() only. Both handles
251-
* share a reactor, which is created with FLUX_REACTOR_SIGCHLD in order
252-
* to support libsubprocess.
251+
* share a reactor.
253252
*
254253
* N.B. since both handles are in the same thread, synchronous RPCs on
255254
* ctx.h will deadlock. The main broker reactor must run in order
256255
* to move messages from the interthread queue to the routing logic.
257256
* Careful with flux_attr_get(), which hides a synchronous RPC if the
258257
* requested value is not cached.
259258
*/
260-
if (!(ctx.reactor = flux_reactor_create (FLUX_REACTOR_SIGCHLD))
259+
if (!(ctx.reactor = flux_reactor_create (0))
261260
|| !(ctx.h = flux_open ("interthread://broker", 0))
262261
|| flux_set_reactor (ctx.h, ctx.reactor) < 0
263262
|| !(ctx.h_internal = flux_open ("interthread://broker", 0))

src/broker/test/runat.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,6 @@ void badinput (flux_t *h)
350350
int main (int argc, char *argv[])
351351
{
352352
flux_t *h;
353-
flux_reactor_t *r;
354353

355354
plan (NO_PLAN);
356355

@@ -360,12 +359,8 @@ int main (int argc, char *argv[])
360359

361360
if (!(logs = zlist_new ()))
362361
BAIL_OUT ("zlist_new failed");
363-
if (!(r = flux_reactor_create (FLUX_REACTOR_SIGCHLD)))
364-
BAIL_OUT ("flux_reactor_create failed");
365362
if (!(h = flux_open ("loop://", 0)))
366363
BAIL_OUT ("could not create loop handle");
367-
if (flux_set_reactor (h, r) < 0)
368-
BAIL_OUT ("flux_set_reactor failed");
369364
if (flux_attr_set_cacheonly (h, "rank", "0") < 0)
370365
BAIL_OUT ("flux_attr_set_cacheonly rank failed");
371366
flux_log_set_redirect (h, diag_logger, NULL);
@@ -374,7 +369,6 @@ int main (int argc, char *argv[])
374369
basic (h);
375370
badinput (h);
376371

377-
flux_reactor_destroy (r);
378372
flux_close (h);
379373

380374
clear_list (logs);

src/cmd/builtin/proxy.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -385,10 +385,8 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[])
385385
free (uri);
386386
flux_log_set_appname (ctx.h, "proxy");
387387
ctx.proxy_user = getuid ();
388-
if (!(r = flux_reactor_create (FLUX_REACTOR_SIGCHLD)))
389-
log_err_exit ("flux_reactor_create");
390-
if (flux_set_reactor (ctx.h, r) < 0)
391-
log_err_exit ("flux_set_reactor");
388+
if (!(r = flux_get_reactor (ctx.h)))
389+
log_err_exit ("flux_get_reactor");
392390

393391
/* Register handler for loss of broker connection if --reconnect
394392
*/

src/cmd/flux-start.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ int start_session (const char *cmd_argz,
10741074
if (signal (SIGTTOU, SIG_IGN) == SIG_ERR)
10751075
log_err_exit ("signal");
10761076
}
1077-
if (!(ctx.reactor = flux_reactor_create (FLUX_REACTOR_SIGCHLD)))
1077+
if (!(ctx.reactor = flux_reactor_create (0)))
10781078
log_err_exit ("flux_reactor_create");
10791079
if (!(ctx.timer = flux_timer_watcher_create (ctx.reactor,
10801080
ctx.exit_timeout,

src/cmd/flux-terminus.c

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -293,23 +293,16 @@ static int run_service (const char *service, int fd)
293293
struct flux_terminus_server *ts;
294294
flux_future_t *f;
295295
int rc = -1;
296-
flux_reactor_t *r;
297296

298297
if (fdwalk (terminus_server_closefd, &fd) < 0) {
299298
log_err ("fdwalk");
300299
goto err;
301300
}
302301

303-
r = flux_reactor_create (FLUX_REACTOR_SIGCHLD);
304-
if (!r) {
305-
log_err ("flux_reactor_create");
306-
goto err;
307-
}
308302
if (!(h = flux_open (NULL, 0))) {
309303
log_err ("flux_open");
310304
goto err;
311305
}
312-
flux_set_reactor (h, r);
313306
if (!(f = flux_service_register (h, service))
314307
|| flux_future_get (f, NULL) < 0) {
315308
log_err ("flux_service_register (%s)", service);
@@ -330,11 +323,10 @@ static int run_service (const char *service, int fd)
330323

331324
/* Set up to exit when the last session exits
332325
*/
333-
flux_terminus_server_notify_empty (ts, empty_cb, r);
326+
flux_terminus_server_notify_empty (ts, empty_cb, flux_get_reactor (h));
334327

335328
rc = flux_reactor_run (flux_get_reactor (h), 0);
336329
flux_terminus_server_destroy (ts);
337-
flux_reactor_destroy (r);
338330
flux_close (h);
339331
return rc;
340332
err:

src/common/libflux/reactor.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ flux_reactor_t *flux_reactor_create (int flags)
7272
if ((flags & FLUX_REACTOR_SIGCHLD))
7373
r->loop = ev_default_loop (EVFLAG_SIGNALFD);
7474
else
75-
r->loop = ev_loop_new (EVFLAG_NOSIGMASK);
75+
r->loop = ev_loop_new (EVFLAG_NOSIGMASK | EVFLAG_SIGNALFD);
7676
if (!r->loop) {
7777
errno = ENOMEM;
7878
flux_reactor_destroy (r);

src/common/libsubprocess/Makefile.am

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ noinst_LTLIBRARIES = \
2020
libsubprocess_la_SOURCES = \
2121
command.c \
2222
command_private.h \
23+
sigchld.c \
24+
sigchld.h \
2325
local.c \
2426
local.h \
2527
fork.c \
@@ -48,6 +50,7 @@ fluxcoreinclude_HEADERS = \
4850
subprocess.h
4951

5052
TESTS = \
53+
test_sigchld.t \
5154
test_command.t \
5255
test_subprocess.t \
5356
test_stdio.t \
@@ -102,6 +105,11 @@ bulk_exec_LDADD = \
102105

103106
bulk_exec_LDFLAGS = $(test_ldflags)
104107

108+
test_sigchld_t_SOURCES = test/sigchld.c
109+
test_sigchld_t_CPPFLAGS = $(test_cppflags)
110+
test_sigchld_t_LDADD = $(test_ldadd)
111+
test_sigchld_t_LDFLAGS = $(test_ldflags)
112+
105113
test_command_t_SOURCES = test/command.c
106114
test_command_t_CPPFLAGS = $(test_cppflags)
107115
test_command_t_LDADD = $(test_ldadd)

src/common/libsubprocess/local.c

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "fork.h"
3434
#include "posix_spawn.h"
3535
#include "util.h"
36+
#include "sigchld.h"
3637

3738
static void local_channel_flush (struct subprocess_channel *c)
3839
{
@@ -449,20 +450,9 @@ static void close_child_fds (flux_subprocess_t *p)
449450
}
450451
}
451452

452-
static void child_watch_cb (flux_reactor_t *r,
453-
flux_watcher_t *w,
454-
int revents,
455-
void *arg)
453+
static void sigchld_cb (pid_t pid, int status, void *arg)
456454
{
457455
flux_subprocess_t *p = arg;
458-
int status;
459-
460-
if ((status = flux_child_watcher_get_rstatus (w)) < 0) {
461-
llog_error (p,
462-
"flux_child_watcher_get_rstatus: %s",
463-
strerror (errno));
464-
return;
465-
}
466456

467457
p->status = status;
468458

@@ -480,9 +470,8 @@ static void child_watch_cb (flux_reactor_t *r,
480470
state_change_start (p);
481471
}
482472

483-
/* Child watcher no longer needed, pid now invalid */
484-
if (p->child_w)
485-
flux_watcher_stop (p->child_w);
473+
/* callback no longer needed, pid now invalid */
474+
sigchld_unregister (pid);
486475
}
487476

488477
if (p->state == FLUX_SUBPROCESS_EXITED)
@@ -502,16 +491,10 @@ static int start_local_watchers (flux_subprocess_t *p)
502491
{
503492
struct subprocess_channel *c;
504493

505-
/* no-op if reactor is !FLUX_REACTOR_SIGCHLD */
506-
if (!(p->child_w = flux_child_watcher_create (p->reactor,
507-
p->pid,
508-
true,
509-
child_watch_cb,
510-
p))) {
511-
llog_debug (p, "flux_child_watcher_create: %s", strerror (errno));
494+
if (sigchld_register (p->reactor, p->pid, sigchld_cb, p) < 0) {
495+
llog_debug (p, "sigchld_register: %s", strerror (errno));
512496
return -1;
513497
}
514-
flux_watcher_start (p->child_w);
515498

516499
c = zhash_first (p->channels);
517500
while (c) {

src/common/libsubprocess/server.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "server.h"
3131
#include "client.h"
3232
#include "util.h"
33+
#include "sigchld.h"
3334

3435
extern char **environ;
3536

@@ -55,6 +56,7 @@ struct subprocess_server {
5556
// The shutdown future is created when user calls shutdown,
5657
// and fulfilled once subprocesses list becomes empty.
5758
flux_future_t *shutdown;
59+
bool has_sigchld_ctx;
5860
};
5961

6062
static void server_kill (flux_subprocess_t *p, int signum);
@@ -709,6 +711,8 @@ void subprocess_server_destroy (subprocess_server_t *s)
709711
flux_future_destroy (s->shutdown);
710712
free (s->service_name);
711713
free (s->local_uri);
714+
if (s->has_sigchld_ctx)
715+
sigchld_finalize ();
712716
free (s);
713717
errno = saved_errno;
714718
}
@@ -750,6 +754,11 @@ subprocess_server_t *subprocess_server_create (flux_t *h,
750754
&s->handlers) < 0)
751755
goto error;
752756

757+
/* Avoid unnecessary on-demand create/destroy of SIGCHLD handler + hash.
758+
*/
759+
if (sigchld_initialize (flux_get_reactor (h)) < 0)
760+
goto error;
761+
s->has_sigchld_ctx = true;
753762
return s;
754763

755764
error:

src/common/libsubprocess/server.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ typedef int (*subprocess_server_auth_f) (const flux_msg_t *msg,
1919
void *arg,
2020
flux_error_t *error);
2121

22-
/* Create a subprocess server. The handle 'h' must contain a reactor
23-
* created with the FLUX_REACTOR_SIGCHLD flag. Note that there can be
24-
* only one reactor per process with this flag set. Also, it may be wise
25-
* to block SIGPIPE to avoid termination when writing to stdin of a subprocess
26-
* that has terminated.
22+
/* Create a subprocess server.
23+
* This sets up a signal watcher for SIGCHLD. Make sure SIGCHLD cannot be
24+
* delivered to other threads. Also, it may be wise to block SIGPIPE to
25+
* avoid termination when writing to stdin of a subprocess that has terminated.
2726
*/
2827
subprocess_server_t *subprocess_server_create (flux_t *h,
2928
const char *service_name,

0 commit comments

Comments
 (0)