diff --git a/configure.ac b/configure.ac index 12ff51c8adf0..61a37e909f58 100644 --- a/configure.ac +++ b/configure.ac @@ -465,22 +465,36 @@ fi ## RRA_WITH_SYSTEMD_UNITDIR +AC_PKGCONFIG ## -# libev +# Which event loop implementation? ## -AC_ARG_WITH([external-libev], - AS_HELP_STRING([--with-external-libev], [Use external libev])) -AS_IF([test "x$with_external_libev" = "xyes"], [ +AC_ARG_WITH([libuv], + AS_HELP_STRING([--with-libuv], [Use libuv for event loop (experimental)])) +AC_ARG_WITH([libev], + AS_HELP_STRING([--with-libev], [Use non-vendored libev for event loop])) +AS_IF([test "x$with_libuv" = "xyes" -a "x$with_libev" = "xyes"], [ + AC_MSG_ERROR([--with-libuv conflicts with --with-libev]) +]) +AS_IF([test "x$with_libuv" = "xyes"], [ + PKG_CHECK_MODULES([LIBUV], [libuv], [], []) + AC_DEFINE([HAVE_LIBUV],[1],[Event loop is libuv]) +]) +AS_IF([test "x$with_libev" = "xyes"], [ AC_SEARCH_LIBS([ev_run], [ev], [], [ - AC_MSG_ERROR([--with-external-libev requested but external libev not found]) + AC_MSG_ERROR([--with-libev requested but external libev not found]) ]) -],[ + AC_DEFINE([HAVE_LIBEV],[1],[Event loop is libev]) +]) +AS_IF([test "x$with_libev" != "xyes" -a "x$with_libuv" != "xyes"], [ + with_internal_libev=yes m4_include([src/common/libev/libev.m4]) + AC_DEFINE([HAVE_LIBEV],[1],[Event loop is libev]) + AC_DEFINE([HAVE_LIBEV_INTERNAL],[1],[Use vendored libev]) ]) -AM_CONDITIONAL([INTERNAL_LIBEV],[test "x$with_external_libev" != "xyes"]) - -AC_PKGCONFIG +AM_CONDITIONAL([INTERNAL_LIBEV],[test "x$with_internal_libev" = "xyes"]) +AM_CONDITIONAL([LIBUV], [test "x$with_libuv" = "xyes"]) ## # Project directories diff --git a/scripts/configure-macos.sh b/scripts/configure-macos.sh index 411e1fe90186..860591871b94 100755 --- a/scripts/configure-macos.sh +++ b/scripts/configure-macos.sh @@ -27,4 +27,4 @@ source macos-venv/bin/activate ./autogen.sh CPPFLAGS="$CPPFLAGS" LDFLAGS=$LDFLAGS PKG_CONFIG_PATH=$PKG_CONFIG_PATH \ - ./configure --with-external-libev + ./configure --with-libev diff --git a/scripts/install-deps-deb.sh b/scripts/install-deps-deb.sh index 63c13fd4e9ad..4ed100f72129 100755 --- a/scripts/install-deps-deb.sh +++ b/scripts/install-deps-deb.sh @@ -8,6 +8,7 @@ apt install \ pkg-config \ libc6-dev \ libzmq3-dev \ + libuv1-dev \ uuid-dev \ libjansson-dev \ liblz4-dev \ diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 9fe7827818fa..dc1705df6fa4 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -62,6 +62,7 @@ libflux_internal_la_LIBADD = \ $(LIBPTHREAD) \ $(LIBDL) \ $(LIBRT) \ + $(LIBUV_LIBS) \ $(FLUX_SECURITY_LIBS) if INTERNAL_LIBEV libflux_internal_la_LIBADD += $(builddir)/libev/libev.la diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index 781c34de60e8..f9f07c7575ce 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -17,11 +17,8 @@ AM_CPPFLAGS = \ -DLUADIR=\"$(luadir)\" \ -DLUAEXECDIR=\"$(luaexecdir)\" \ $(JANSSON_CFLAGS) \ - $(LIBUUID_CFLAGS) - -if INTERNAL_LIBEV -AM_CPPFLAGS += -I$(top_srcdir)/src/common/libev -endif + $(LIBUUID_CFLAGS) \ + $(LIBUV_CFLAGS) fluxcoreinclude_HEADERS = \ flux.h \ @@ -69,7 +66,6 @@ libflux_la_SOURCES = \ reactor_private.h \ watcher.c \ watcher_private.h \ - watcher_wrap.c \ hwatcher.c \ msg_handler.c \ message.c \ @@ -102,6 +98,12 @@ libflux_la_SOURCES = \ fripp.h \ fripp.c +if LIBUV +libflux_la_SOURCES += watcher_uv.c +else +libflux_la_SOURCES += watcher_ev.c +endif + libflux_la_LDFLAGS = \ $(AM_LDFLAGS) @@ -141,6 +143,7 @@ test_ldadd = \ $(top_builddir)/src/common/libtap/libtap.la \ $(LIBUUID_LIBS) \ $(JANSSON_LIBS) \ + $(LIBUV_LIBS) \ $(LIBPTHREAD) \ $(LIBDL) diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 562087a3f63a..39b469e7ea2e 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -15,31 +15,36 @@ #include #include #include -#include +#if HAVE_LIBUV +# include +#elif HAVE_LIBEV_INTERNAL +# include "src/common/libev/ev.h" +#else +# include +#endif #include #include "reactor_private.h" struct flux_reactor { +#if HAVE_LIBUV + uv_loop_t loop; +#else struct ev_loop *loop; +#endif int usecount; unsigned int errflag:1; }; -static int valid_flags (int flags, int valid) -{ - if ((flags & ~valid)) { - errno = EINVAL; - return -1; - } - return 0; -} - void flux_reactor_decref (flux_reactor_t *r) { if (r && --r->usecount == 0) { int saved_errno = errno; +#if HAVE_LIBUV + (void)uv_loop_close (&r->loop); // could return -EBUSY +#else ev_loop_destroy (r->loop); +#endif free (r); errno = saved_errno; } @@ -66,63 +71,114 @@ flux_reactor_t *flux_reactor_create (int flags) } if (!(r = calloc (1, sizeof (*r)))) return NULL; - r->loop = ev_loop_new (EVFLAG_NOSIGMASK | EVFLAG_SIGNALFD); - if (!r->loop) { +#if HAVE_LIBUV + int uverr; + if ((uverr = uv_loop_init (&r->loop)) < 0) { + free (r); + errno = -uverr; + return NULL; + } +#else + if (!(r->loop = ev_loop_new (EVFLAG_NOSIGMASK | EVFLAG_SIGNALFD))) { + free (r); errno = ENOMEM; - flux_reactor_destroy (r); return NULL; } ev_set_userdata (r->loop, r); +#endif r->usecount = 1; return r; } int flux_reactor_run (flux_reactor_t *r, int flags) { - int ev_flags = 0; int count; + int rflags; - if (valid_flags (flags, FLUX_REACTOR_NOWAIT | FLUX_REACTOR_ONCE) < 0) - return -1; - if (flags & FLUX_REACTOR_NOWAIT) - ev_flags |= EVRUN_NOWAIT; - if (flags & FLUX_REACTOR_ONCE) - ev_flags |= EVRUN_ONCE; r->errflag = 0; - count = ev_run (r->loop, ev_flags); +#if HAVE_LIBUV + if (flags == FLUX_REACTOR_NOWAIT) + rflags = UV_RUN_NOWAIT; + else if (flags == FLUX_REACTOR_ONCE) + rflags = UV_RUN_ONCE; + else if (flags == 0) + rflags = UV_RUN_DEFAULT; + else + goto error; + count = uv_run (&r->loop, rflags); +#else + if (flags == FLUX_REACTOR_NOWAIT) + rflags = EVRUN_NOWAIT; + else if (flags == FLUX_REACTOR_ONCE) + rflags = EVRUN_ONCE; + else if (flags == 0) + rflags = 0; + else + goto error; + count = ev_run (r->loop, rflags); +#endif return (r->errflag ? -1 : count); +error: + errno = EINVAL; + return -1; } double flux_reactor_time (void) { +#if HAVE_LIBUV + return 1E-9 * uv_hrtime(); +#else return ev_time (); +#endif } double flux_reactor_now (flux_reactor_t *r) { +#if HAVE_LIBUV + return 1E-3 * uv_now (&r->loop); +#else return ev_now (r->loop); +#endif } void flux_reactor_now_update (flux_reactor_t *r) { +#if HAVE_LIBUV + return uv_update_time (&r->loop); +#else return ev_now_update (r->loop); +#endif } void flux_reactor_stop (flux_reactor_t *r) { r->errflag = 0; +#if HAVE_LIBUV + uv_stop (&r->loop); +#else ev_break (r->loop, EVBREAK_ALL); +#endif } void flux_reactor_stop_error (flux_reactor_t *r) { r->errflag = 1; +#if HAVE_LIBUV + uv_stop (&r->loop); +#else ev_break (r->loop, EVBREAK_ALL); +#endif } void *reactor_get_loop (flux_reactor_t *r) { - return r ? r->loop : NULL; + if (!r) + return NULL; +#if HAVE_LIBUV + return &r->loop; +#else + return r->loop; +#endif } /* diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index d518d9b54d21..e5e08596702a 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -25,6 +25,12 @@ #include "src/common/libtap/tap.h" #include "ccan/array_size/array_size.h" +// XXX let the unit test compile during libuv integration +#if HAVE_LIBEV +#define HAVE_PERIODIC_WATCHER 1 +#define HAVE_CHECK_PRIORITY 1 +#endif + void watcher_is (flux_watcher_t *w, bool exp_active, bool exp_referenced, @@ -210,7 +216,7 @@ static void oneshot (flux_reactor_t *r, static void test_timer (flux_reactor_t *reactor) { flux_watcher_t *w; - double elapsed, t0, t[] = { 0.001, 0.010, 0.050, 0.100, 0.200 }; + double elapsed, t0, t[] = { 0.005, 0.010, 0.050, 0.100, 0.200 }; int i, rc; /* in case this test runs a while after last reactor run. @@ -260,7 +266,7 @@ static void test_timer (flux_reactor_t *reactor) elapsed = flux_reactor_now (reactor) - t0; ok (repeat_countdown == 0, "timer: repeat timer ran 10x and stopped itself"); - ok (elapsed >= 0.001*10, + ok (elapsed >= 0.001*10 - 0.001, // see libuv note below "timer: elapsed time is >= 10*1ms (%.3fs)", elapsed); flux_watcher_stop (w); flux_watcher_destroy (w); @@ -275,12 +281,15 @@ static void test_timer (flux_reactor_t *reactor) oneshot_runs = 0; rc = flux_reactor_run (reactor, 0); elapsed = flux_reactor_now (reactor) - t0; - ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i], - "timer: reactor ran %.3fs oneshot at >= time (%.3fs)", t[i], elapsed); + // libuv timer rez is 1ms so allow event to fire up to 1ms early + ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i] - 0.001, + "timer: reactor ran %.3fs oneshot punctually", t[i]); + diag ("elapsed time was %.3fs", elapsed); } flux_watcher_destroy (w); } +#if HAVE_PERIODIC_WATCHER /* A reactor callback that immediately stops reactor without error */ static bool do_stop_callback_ran = false; @@ -407,6 +416,8 @@ static void test_periodic (flux_reactor_t *reactor) } +#endif + static int idle_count = 0; static void idle_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -748,6 +759,7 @@ static void test_unref (flux_reactor_t *r) count = 0; ok (flux_reactor_run (r, 0) == 0 && count == 1, "flux_reactor_run with one unref watcher returned after 1 iteration"); + diag ("count=%d", count); flux_watcher_stop (w); // calls ev_ref() flux_watcher_ref (w); @@ -811,6 +823,7 @@ static void test_reactor_flags (flux_reactor_t *r) "flux_reactor_create flags=0xffff fails with EINVAL"); } +#if HAVE_CHECK_PRIORITY static char cblist[6] = {0}; static int cblist_index = 0; static flux_watcher_t *priority_prep = NULL; @@ -883,6 +896,7 @@ static void test_priority (flux_reactor_t *r) flux_watcher_destroy (priority_prep); flux_watcher_destroy (priority_idle); } +#endif int main (int argc, char *argv[]) { @@ -902,7 +916,9 @@ int main (int argc, char *argv[]) "flux_watcher_is_active (NULL) returns false"); test_timer (reactor); +#if HAVE_PERIODIC_WATCHER test_periodic (reactor); +#endif test_fd (reactor); test_idle (reactor); test_prepcheck (reactor); @@ -911,7 +927,9 @@ int main (int argc, char *argv[]) test_handle (reactor); test_unref (reactor); test_reactor_flags (reactor); +#if HAVE_CHECK_PRIORITY test_priority (reactor); +#endif flux_reactor_destroy (reactor); diff --git a/src/common/libflux/watcher.c b/src/common/libflux/watcher.c index d24e3dd6a804..2d360183ee5c 100644 --- a/src/common/libflux/watcher.c +++ b/src/common/libflux/watcher.c @@ -143,12 +143,14 @@ bool flux_watcher_is_referenced (flux_watcher_t *w) void flux_watcher_destroy (flux_watcher_t *w) { if (w) { + int saved_errno = errno; if (w->ops->stop) w->ops->stop (w); if (w->ops->destroy) w->ops->destroy (w); flux_reactor_decref (w->r); free (w); + errno = saved_errno; } } diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_ev.c similarity index 99% rename from src/common/libflux/watcher_wrap.c rename to src/common/libflux/watcher_ev.c index 756f37ec0cfe..06b8c35c27e5 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_ev.c @@ -8,12 +8,16 @@ * SPDX-License-Identifier: LGPL-3.0 \************************************************************/ -/* watcher_wrap.c - wrapped libev watchers */ +/* watcher_ev.c - wrapped libev watchers */ #if HAVE_CONFIG_H #include "config.h" #endif -#include +#if HAVE_LIBEV_INTERNAL +# include "src/common/libev/ev.h" +#else +# include +#endif #include #include "reactor_private.h" diff --git a/src/common/libflux/watcher_uv.c b/src/common/libflux/watcher_uv.c new file mode 100644 index 000000000000..18b6306eb962 --- /dev/null +++ b/src/common/libflux/watcher_uv.c @@ -0,0 +1,595 @@ +/************************************************************\ + * Copyright 2025 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* watcher_uv.c - wrapped libuv watchers + * + * Notes on transitioning from libev: + * - handle destruction is asynchronous, see libuv_close_cb() below + * - timer requests are limited to millisecond precision + * - watcher priorities cannot be changed + * - no periodic watchers + * + * See also: flux-framework/flux-core#6492 + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libutil/errno_safe.h" + +#include "reactor_private.h" +#include "watcher_private.h" + +static inline int events_to_libuv (int events) +{ + int e = 0; + if (events & FLUX_POLLIN) + e |= UV_READABLE; + if (events & FLUX_POLLOUT) + e |= UV_WRITABLE; + if (events & FLUX_POLLERR) + e |= UV_DISCONNECT; + return e; +} + +static inline int libuv_to_events (int events) +{ + int e = 0; + if (events & UV_READABLE) + e |= FLUX_POLLIN; + if (events & UV_WRITABLE) + e |= FLUX_POLLOUT; + if (events & UV_DISCONNECT) + e |= FLUX_POLLERR; + return e; +} + +/* A libuv handle cannot be directly destroyed. ops->destroy() calls uv_close(), + * registering libuv_close_cb(), which calls free(). If the reactor is destroyed + * before the callback can run, handle memory is leaked. + */ +static void libuv_close_cb (uv_handle_t *uvh) +{ + free (uvh); +} + +/* Generic callbacks that assume the first member of 'struct TYPE_watcher' below + * is a uv_TYPE_t pointer that inherits from uv_handle_t. + */ +struct libuv_watcher { + uv_handle_t *uvh; +}; + +static void libuv_watcher_ref (flux_watcher_t *w) +{ + struct libuv_watcher *uvw = watcher_get_data (w); + uv_ref (uvw->uvh); +} + +static void libuv_watcher_unref (flux_watcher_t *w) +{ + struct libuv_watcher *uvw = watcher_get_data (w); + uv_unref (uvw->uvh); +} + +static bool libuv_watcher_is_active (flux_watcher_t *w) +{ + struct libuv_watcher *uvw = watcher_get_data (w); + return uv_is_active (uvw->uvh); +} + +static void libuv_watcher_destroy (flux_watcher_t *w) +{ + struct libuv_watcher *uvw = watcher_get_data (w); + uv_close (uvw->uvh, libuv_close_cb); +} + +/* file descriptors + */ + +struct fd_watcher { + uv_poll_t *uvh; + int revents; +}; + +static void fd_watcher_cb (uv_poll_t *uvh, int status, int events) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, status < 0 ? FLUX_POLLERR : libuv_to_events (events)); +} + +static void fd_watcher_start (flux_watcher_t *w) +{ + struct fd_watcher *fdw = watcher_get_data (w); + uv_poll_start (fdw->uvh, fdw->revents, fd_watcher_cb); +} + +static void fd_watcher_stop (flux_watcher_t *w) +{ + struct fd_watcher *fdw = watcher_get_data (w); + uv_poll_stop (fdw->uvh); +} + +static struct flux_watcher_ops fd_watcher_ops = { + .start = fd_watcher_start, + .stop = fd_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, + int fd, + int events, + flux_watcher_f cb, + void *arg) +{ + struct fd_watcher *fdw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*fdw), &fd_watcher_ops, cb, arg))) + return NULL; + fdw = watcher_get_data (w); + fdw->revents = events_to_libuv (events); // for uv_poll_start () + if (!(fdw->uvh = calloc (1, sizeof (*fdw->uvh)))) + goto error; + uv_poll_init (reactor_get_loop (r), fdw->uvh, fd); + uv_handle_set_data ((uv_handle_t *)fdw->uvh, w); // for fd_watcher_cb () + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +int flux_fd_watcher_get_fd (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &fd_watcher_ops) { + errno = EINVAL; + return -1; + } + struct fd_watcher *fdw = watcher_get_data (w); + int uverr; + int fd; + if ((uverr = uv_fileno ((uv_handle_t *)fdw->uvh, &fd)) < 0) { + errno = -uverr; + return -1; + } + return fd; +} + +/* Timer + */ + +struct timer_watcher { + uv_timer_t *uvh; + uint64_t timeout; + uint64_t repeat; +}; + +static void timer_watcher_cb (uv_timer_t *uvh) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, 0); +} + +static void timer_watcher_start (flux_watcher_t *w) +{ + struct timer_watcher *tmw = watcher_get_data (w); + uv_timer_start (tmw->uvh, timer_watcher_cb, tmw->timeout, tmw->repeat); +} + +static void timer_watcher_stop (flux_watcher_t *w) +{ + struct timer_watcher *tmw = watcher_get_data (w); + uv_timer_stop (tmw->uvh); +} + +static struct flux_watcher_ops timer_watcher_ops = { + .start = timer_watcher_start, + .stop = timer_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, + double after, + double repeat, + flux_watcher_f cb, + void *arg) +{ + struct timer_watcher *tmw; + flux_watcher_t *w; + if (after < 0 || repeat < 0) { + errno = EINVAL; + return NULL; + } + if (!(w = watcher_create (r, sizeof (*tmw), &timer_watcher_ops, cb, arg))) + return NULL; + tmw = watcher_get_data (w); + if (!(tmw->uvh = calloc (1, sizeof (*tmw->uvh)))) + goto error; + uv_timer_init (reactor_get_loop (r), tmw->uvh); + uv_handle_set_data ((uv_handle_t *)(tmw->uvh), w); // for tmwatcher_cb() + tmw->timeout = 1000ULL * after; + tmw->repeat = 1000ULL * repeat; + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) +{ + if (watcher_get_ops (w) != &timer_watcher_ops) + return; + struct timer_watcher *tmw = watcher_get_data (w); + tmw->timeout = 1000ULL * after; + tmw->repeat = 1000ULL * repeat; +} + +void flux_timer_watcher_again (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &timer_watcher_ops) + return; + struct timer_watcher *tmw = watcher_get_data (w); + /* in future.c::then_context_set_timeout() we assume that 'again' can be + * run on a timer that hasn't been started. That was apparently allowed + * by libev, but is not allowed by libev + */ + if (uv_timer_again (tmw->uvh) == UV_EINVAL) { + if (tmw->repeat > 0) + flux_watcher_start (w); + } +} + +double flux_watcher_next_wakeup (flux_watcher_t *w) +{ + if (watcher_get_ops (w) == &timer_watcher_ops) { + struct timer_watcher *tmw = watcher_get_data (w); + flux_reactor_t *r = watcher_get_reactor (w); + return flux_reactor_now (r) + (1E-3 * uv_timer_get_due_in (tmw->uvh)); + } + errno = EINVAL; + return -1; +} + +/* Prepare + */ + +struct prepare_watcher { + uv_prepare_t *uvh; +}; + +static void prepare_watcher_cb (uv_prepare_t *uvh) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, 0); +} + +static void prepare_watcher_start (flux_watcher_t *w) +{ + struct prepare_watcher *pw = watcher_get_data (w); + uv_prepare_start (pw->uvh, prepare_watcher_cb); +} + +static void prepare_watcher_stop (flux_watcher_t *w) +{ + struct prepare_watcher *pw = watcher_get_data (w); + uv_prepare_stop (pw->uvh); +} + +static struct flux_watcher_ops prepare_watcher_ops = { + .start = prepare_watcher_start, + .stop = prepare_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + struct prepare_watcher *pw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher_ops, cb, arg))) + return NULL; + pw = watcher_get_data (w); + if (!(pw->uvh = calloc (1, sizeof (*pw->uvh)))) + goto error; + uv_prepare_init (reactor_get_loop (r), pw->uvh); + uv_handle_set_data ((uv_handle_t *)pw->uvh, w); // for prepare_watcher_cb () + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +/* Check + */ + +struct check_watcher { + uv_check_t *uvh; +}; + +static void check_watcher_cb (uv_check_t *uvh) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, 0); +} + +static void check_watcher_start (flux_watcher_t *w) +{ + struct check_watcher *cw = watcher_get_data (w); + uv_check_start (cw->uvh, check_watcher_cb); +} + +static void check_watcher_stop (flux_watcher_t *w) +{ + struct check_watcher *cw = watcher_get_data (w); + uv_check_stop (cw->uvh); +} + +static struct flux_watcher_ops check_watcher_ops = { + .start = check_watcher_start, + .stop = check_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + struct check_watcher *cw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*cw), &check_watcher_ops, cb, arg))) + return NULL; + cw = watcher_get_data (w); + if (!(cw->uvh = calloc (1, sizeof (*cw->uvh)))) + goto error; + uv_check_init (reactor_get_loop (r), cw->uvh); + uv_handle_set_data ((uv_handle_t *)cw->uvh, w); // for check_watcher_cb () + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +/* Idle + */ + +struct idle_watcher { + uv_idle_t *uvh; +}; + +static void idle_watcher_cb (uv_idle_t *uvh) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, 0); +} + +static void idle_watcher_start (flux_watcher_t *w) +{ + struct idle_watcher *iw = watcher_get_data (w); + uv_idle_start (iw->uvh, idle_watcher_cb); +} + +static void idle_watcher_stop (flux_watcher_t *w) +{ + struct idle_watcher *iw = watcher_get_data (w); + uv_idle_stop (iw->uvh); +} + +static struct flux_watcher_ops idle_watcher_ops = { + .start = idle_watcher_start, + .stop = idle_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, + flux_watcher_f cb, + void *arg) +{ + struct idle_watcher *iw; + flux_watcher_t *w; + + if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher_ops, cb, arg))) + return NULL; + iw = watcher_get_data (w); + if (!(iw->uvh = calloc (1, sizeof (*iw->uvh)))) + goto error; + uv_idle_init (reactor_get_loop (r), iw->uvh); + uv_handle_set_data ((uv_handle_t *)iw->uvh, w); // for idle_watcher_cb () + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +/* Signal + */ + +struct signal_watcher { + uv_signal_t *uvh; + int signum; +}; + +static void signal_watcher_cb (uv_signal_t *uvh, int signum) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call (w, 0); +} + +static void signal_watcher_start (flux_watcher_t *w) +{ + struct signal_watcher *sw = watcher_get_data (w); + uv_signal_start (sw->uvh, signal_watcher_cb, sw->signum); +} + +static void signal_watcher_stop (flux_watcher_t *w) +{ + struct signal_watcher *sw = watcher_get_data (w); + uv_signal_stop (sw->uvh); +} + +static struct flux_watcher_ops signal_watcher_ops = { + .start = signal_watcher_start, + .stop = signal_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = libuv_watcher_destroy, +}; + +flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, + int signum, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + struct signal_watcher *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher_ops, cb, arg))) + return NULL; + sw = watcher_get_data (w); + if (!(sw->uvh = calloc (1, sizeof (*sw->uvh)))) + goto error; + sw->signum = signum; // for sigwatcher_start() + uv_signal_init (reactor_get_loop (r), sw->uvh); + uv_handle_set_data ((uv_handle_t *)sw->uvh, w); // for sigwatcher_cb() + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +int flux_signal_watcher_get_signum (flux_watcher_t *w) +{ + if (watcher_get_ops (w) != &signal_watcher_ops) { + errno = EINVAL; + return -1; + } + struct signal_watcher *sw = watcher_get_data (w); + return sw->signum; +} + +/* Stat + */ + +struct stat_watcher { + uv_fs_event_t *uvh; + char *path; + struct stat prev; + struct stat stat; +}; + +static void stat_watcher_cb (uv_fs_event_t *uvh, + const char *filename, + int events, + int status) +{ + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + struct stat_watcher *sw = watcher_get_data (w); + sw->prev = sw->stat; + if (stat (sw->path, &sw->stat) < 0) + sw->stat.st_nlink = 0; + watcher_call (w, 0); +} + +static void stat_watcher_start (flux_watcher_t *w) +{ + struct stat_watcher *sw = watcher_get_data (w); + uv_fs_event_start (sw->uvh, + stat_watcher_cb, + sw->path, + UV_FS_EVENT_WATCH_ENTRY); +} + +static void stat_watcher_stop (flux_watcher_t *w) +{ + struct stat_watcher *sw = watcher_get_data (w); + uv_fs_event_stop (sw->uvh); +} + +static void stat_watcher_destroy (flux_watcher_t *w) +{ + struct stat_watcher *sw = watcher_get_data (w); + uv_close ((uv_handle_t *)sw->uvh, libuv_close_cb); + ERRNO_SAFE_WRAP (free, sw->path); +} + +static struct flux_watcher_ops stat_watcher_ops = { + .start = stat_watcher_start, + .stop = stat_watcher_stop, + .ref = libuv_watcher_ref, + .unref = libuv_watcher_unref, + .is_active = libuv_watcher_is_active, + .destroy = stat_watcher_destroy, +}; + +flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, + const char *path, + double interval, + flux_watcher_f cb, + void *arg) +{ + flux_watcher_t *w; + struct stat_watcher *sw; + + if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher_ops, cb, arg))) + return NULL; + sw = watcher_get_data (w); + if (!(sw->uvh = calloc (1, sizeof (*sw->uvh)))) + goto error; + uv_fs_event_init (reactor_get_loop (r), sw->uvh); + if (stat (path, &sw->stat) < 0) + sw->stat.st_nlink = 0; + sw->prev = sw->stat; + if (!(sw->path = strdup (path))) + goto error; + uv_handle_set_data ((uv_handle_t *)sw->uvh, w); + return w; +error: + flux_watcher_destroy (w); + return NULL; +} + +int flux_stat_watcher_get_rstat (flux_watcher_t *w, + struct stat *stat, + struct stat *prev) +{ + if (watcher_get_ops (w) != &stat_watcher_ops) { + errno = EINVAL; + return -1; + } + struct stat_watcher *sw = watcher_get_data (w); + if (stat) + *stat = sw->stat; + if (prev) + *prev = sw->prev; + return 0; +} + +// vi:ts=4 sw=4 expandtab