diff --git a/src/broker/broker.c b/src/broker/broker.c index fe7a592a3fff..706701f93ca3 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -56,6 +56,7 @@ #include "src/common/libutil/basename.h" #include "src/common/librouter/subhash.h" #include "ccan/array_size/array_size.h" +#include "src/common/libccan/ccan/base64/base64.h" #include "ccan/str/str.h" #include "ccan/ptrint/ptrint.h" #ifndef HAVE_STRLCPY @@ -1476,6 +1477,162 @@ static void service_remove_cb (flux_t *h, flux_log_error (h, "service_remove: flux_respond_error"); } +/* Distribute an event message to broker and broker module subscribers. + */ +static void mcast_event_locally_new (struct broker *ctx, flux_msg_t **msg) +{ + if (modhash_event_mcast (ctx->modhash, *msg) < 0) + flux_log_error (ctx->h, "event mcast failed to broker modules"); + + const char *topic = NULL; + (void)flux_msg_get_topic (*msg, &topic); + if (subhash_topic_match (ctx->sub, topic)) { + if (flux_send_new (ctx->h_internal, msg, 0) < 0) + flux_log_error (ctx->h, "send failed on internal broker handle"); + } + + flux_msg_decref (*msg); + *msg = NULL; +} + +/* Distribute an event message to broker subscribers, broker module + * subscribers, and overlay peers. Avoid unnecessary message copies. + * Call this on rank 0 only. + */ +static void mcast_event_globally_new (struct broker *ctx, flux_msg_t **msg) +{ + if (modhash_event_mcast (ctx->modhash, *msg) < 0) + flux_log_error (ctx->h, "mcast failed to broker modules"); + + const char *topic = NULL; + (void)flux_msg_get_topic (*msg, &topic); + if (subhash_topic_match (ctx->sub, topic)) { + if (ctx->size > 1) { + if (flux_send (ctx->h_internal, *msg, 0) < 0) + flux_log_error (ctx->h, "mcast failed to internal handle"); + } + else { + if (flux_send_new (ctx->h_internal, msg, 0) < 0) + flux_log_error (ctx->h, "mcast failed to internal handle"); + } + } + + if (ctx->size > 1) { + if (flux_send_new (ctx->h_overlay, msg, 0) < 0) + flux_log_error (ctx->h, "could not forward event to overlay"); + } + + flux_msg_decref (*msg); + *msg = NULL; +} + +/* Assign a sequence number to an event message and distribute globally. + * The event sequence starts with 1, leaving 0 to mean "unpublished" + * when appearing in a message, although that is not required by RFC 3 + * nor used in broker routing decisions. + * Call this on rank 0 only. + */ +static int publish_event_new (struct broker *ctx, flux_msg_t **msg, int *seq) +{ + static int event_seq = 1; + + if (flux_msg_set_seq (*msg, event_seq) < 0) + return -1; + mcast_event_globally_new (ctx, msg); + if (seq) + *seq = event_seq; + event_seq++; + return 0; +} + +static flux_msg_t *encode_event (const char *topic, + int flags, + struct flux_msg_cred cred, + const char *src) +{ + flux_msg_t *msg; + char *dst = NULL; + + if (!(msg = flux_msg_create (FLUX_MSGTYPE_EVENT))) + goto error; + if (flux_msg_set_topic (msg, topic) < 0) + goto error; + if (flux_msg_set_cred (msg, cred) < 0) + goto error; + if ((flags & FLUX_MSGFLAG_PRIVATE)) { + if (flux_msg_set_private (msg) < 0) + goto error; + } + if (src) { // optional payload + int srclen = strlen (src); + size_t dstbuflen = base64_decoded_length (srclen); + ssize_t dstlen; + + if (!(dst = malloc (dstbuflen))) + goto error; + if ((dstlen = base64_decode (dst, dstbuflen, src, srclen)) < 0) { + errno = EPROTO; + goto error; + } + if (flux_msg_set_payload (msg, dst, dstlen) < 0) { + if (errno == EINVAL) + errno = EPROTO; + goto error; + } + } + free (dst); + return msg; +error: + ERRNO_SAFE_WRAP (free, dst); + flux_msg_destroy (msg); + return NULL; +} + +static void event_publish_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct broker *ctx = arg; + const char *topic; + const char *payload = NULL; // optional + int flags; + struct flux_msg_cred cred; + flux_msg_t *event; + const char *errmsg = NULL; + int seq; + + if (flux_request_unpack (msg, + NULL, + "{s:s s:i s?s}", + "topic", &topic, + "flags", &flags, + "payload", &payload) < 0) + goto error; + if (ctx->rank > 0) { + errno = EPROTO; + errmsg = "this service is only available on rank 0"; + goto error; + } + if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) { + errno = EPROTO; + goto error; + } + if (flux_msg_get_cred (msg, &cred) < 0) + goto error; + if (!(event = encode_event (topic, flags, cred, payload)) + || publish_event_new (ctx, &event, &seq) < 0) { + flux_msg_decref (event); + goto error; + } + if (flux_respond_pack (h, msg, "{s:i}", "seq", seq) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + static void event_subscribe_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -1575,6 +1732,12 @@ static const struct flux_msg_handler_spec htab[] = { service_remove_cb, FLUX_ROLE_USER, }, + { + FLUX_MSGTYPE_REQUEST, + "event.publish", + event_publish_cb, + FLUX_ROLE_USER, + }, { FLUX_MSGTYPE_REQUEST, "event.subscribe", @@ -1649,8 +1812,6 @@ static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx, **/ /* Handle messages on interthread://overlay - * N.B. even when there is only one node, event messages are processed - * by the overlay. */ static void overlay_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -1680,14 +1841,16 @@ static void overlay_cb (flux_reactor_t *r, goto drop; break; case FLUX_MSGTYPE_EVENT: - if (modhash_event_mcast (ctx->modhash, msg) < 0) - flux_log_error (ctx->h, - "mcast failed to broker modules"); - if (subhash_topic_match (ctx->sub, topic) - && flux_send_new (ctx->h_internal, &msg, 0) < 0) { - flux_log_error (ctx->h, - "send failed on internal broker handle"); + /* The overlay sends the broker only unpublished events on rank 0. + * It only sends published events on other ranks. + */ + if (ctx->rank == 0) { + if (publish_event_new (ctx, &msg, NULL) < 0) + goto drop; } + else + mcast_event_locally_new (ctx, &msg); + break; default: break; } @@ -1704,6 +1867,7 @@ static void overlay_cb (flux_reactor_t *r, flux_msg_typestr (type), topic); } + flux_msg_decref (msg); } static bool signal_is_deadly (int signum) @@ -1891,8 +2055,17 @@ static void h_internal_watcher (flux_reactor_t *r, goto error; break; case FLUX_MSGTYPE_EVENT: - if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) - goto error; + /* Events sent on ctx->h are assumed to be unpublished, + * so they must either be published or forwarded upstream. + */ + if (ctx->rank == 0) { + if (publish_event_new (ctx, &msg, NULL) < 0) + goto error; + } + else { + if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) + goto error; + } break; default: goto error; diff --git a/src/broker/modhash.c b/src/broker/modhash.c index fffbf3894b25..b7ce0889cc20 100644 --- a/src/broker/modhash.c +++ b/src/broker/modhash.c @@ -227,7 +227,7 @@ static void module_cb (module_t *p, void *arg) broker_request_sendmsg_new (ctx, &msg); break; case FLUX_MSGTYPE_EVENT: - if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) { + if (flux_send_new (ctx->h, &msg, 0) < 0) { flux_log_error (ctx->h, "%s(%s): send to overlay: %s", __FUNCTION__, diff --git a/src/broker/overlay.c b/src/broker/overlay.c index f93e0ee550ad..0e5fa425ce90 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -36,7 +36,6 @@ #include "src/common/libutil/errprintf.h" #include "src/common/librouter/rpc_track.h" #include "src/common/libflux/message_route.h" // for msg_route_sendto() -#include "src/common/libccan/ccan/base64/base64.h" #include "ccan/str/str.h" #ifndef HAVE_STRLCPY #include "src/common/libmissing/strlcpy.h" @@ -159,7 +158,7 @@ struct overlay { struct topology *topo; uint32_t size; uint32_t rank; - int event_seq; // assign on rank 0, track on rank > 0 + int event_seq; // used for sequence verification char uuid[UUID_STR_LEN]; int version; int zmqdebug; @@ -202,9 +201,9 @@ static int overlay_control_parent (struct overlay *ov, int status); static void overlay_health_respond_all (struct overlay *ov); static struct child *child_lookup_byrank (struct overlay *ov, uint32_t rank); -static int overlay_publish_new (struct overlay *ov, flux_msg_t **msg, int *seq); static int overlay_goodbye_parent (struct overlay *overlay, flux_error_t *errp); static int overlay_get_child_online_peer_count (struct overlay *ov); +static void overlay_event_checkseq (struct overlay *ov, const flux_msg_t *msg); /* Convenience iterator for ov->children */ @@ -656,13 +655,15 @@ static void channel_cb (flux_reactor_t *r, } break; case FLUX_MSGTYPE_EVENT: - if (ov->rank == 0) { // publish - if (overlay_publish_new (ov, &msg, NULL) < 0) { - flux_log_error (ov->h, "error publishing event"); - goto done; - } + /* On rank 0, the broker sends events to the overlay for downstream + * distribution. On other ranks, the broker sends unpublished + * events for upstream publication. + */ + if (ov->rank == 0) { + overlay_event_checkseq (ov, msg); + overlay_mcast_child (ov, msg); } - else { // forward upstream + else { flux_msg_route_enable (msg); if (overlay_sendmsg_parent (ov, msg) < 0) { flux_log_error (ov->h, "error forwarding event upstream"); @@ -909,10 +910,7 @@ static int overlay_mcast_send (const flux_msg_t *msg, void *arg) return overlay_sendmsg_child (ov, msg); } -/* Forward an event message to downstream peers. This may be a new event - * published on rank 0 via overlay_publish_new() or an event received from - * the upstream (parent) overlay peer. In either case, this propagates - * the event to the next TBON level, where propagation continues. +/* Forward an event message to downstream peers. */ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg) { @@ -946,41 +944,6 @@ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg) } } -/* Publish an event message on rank 0 originating from: - * - a downstream (child) overlay peer - received on child_cb() - * - the local broker on behalf of a module or user - * - the overlay.publish RPC (used when the publisher needs the seq number) - * - * N.B. the _new suffix in the function name is meant to indicate that - * the reference to *msg is stolen by the function on success. - */ -static int overlay_publish_new (struct overlay *ov, - flux_msg_t **msg, - int *seqp) -{ - int seq; - - if (ov->rank != 0) { - errno = EINVAL; - return -1; - } - /* The event sequence starts with 1. - * This allows 0 to indicate "sequence not set". - */ - seq = ++ov->event_seq; - if (flux_msg_set_seq (*msg, seq) < 0) { - ov->event_seq--; - return -1; - } - overlay_mcast_child (ov, *msg); - if (seqp) - *seqp = seq; - (void)flux_send_new (ov->h_channel, msg, 0); - flux_msg_decref (*msg); - *msg = NULL; - return 0; -} - static void logdrop (struct overlay *ov, const char *where, const flux_msg_t *msg, @@ -1123,13 +1086,17 @@ static void child_cb (flux_reactor_t *r, rpc_track_update (child->tracker, msg); break; case FLUX_MSGTYPE_EVENT: - if (ov->rank == 0) // publish - (void)overlay_publish_new (ov, &msg, NULL); - else { // forward upstream + /* An event message traveling upstream will always be unpublished. + * Forward upstream, or on rank 0, to local broker for publication. + */ + if (ov->rank > 0) { flux_msg_route_enable (msg); overlay_sendmsg_parent (ov, msg); + goto done; } - goto done; + flux_msg_route_disable (msg); + // fall through to forward message to broker + break; } trace_overlay_msg (ov->h, "rx", child->rank, ov->trace_requests, msg); if (flux_send_new (ov->h_channel, &msg, 0) < 0) @@ -1165,10 +1132,9 @@ static void parent_disconnect (struct overlay *ov) } } -/* Sanity check an event message that has been received from the upstream - * (parent) overlay peer. +/* Sanity check that event messages are properly sequenced. */ -static void parent_event_checkseq (struct overlay *ov, const flux_msg_t *msg) +static void overlay_event_checkseq (struct overlay *ov, const flux_msg_t *msg) { uint32_t seq; @@ -1239,10 +1205,10 @@ static void parent_cb (flux_reactor_t *r, * An event type message should not have routing enabled * under normal circumstances, so turn it off here. */ - parent_event_checkseq (ov, msg); + overlay_event_checkseq (ov, msg); overlay_mcast_child (ov, msg); flux_msg_route_disable (msg); - // fall through and let local broker receive this message + // fall through and let local broker distribute locally break; case FLUX_MSGTYPE_CONTROL: { int ctrl_type, reason; @@ -2167,94 +2133,6 @@ static void overlay_trace_cb (flux_t *h, flux_log_error (h, "error responding to overlay.trace"); } -static flux_msg_t *encode_event (const char *topic, - int flags, - struct flux_msg_cred cred, - const char *src) -{ - flux_msg_t *msg; - char *dst = NULL; - - if (!(msg = flux_msg_create (FLUX_MSGTYPE_EVENT))) - goto error; - if (flux_msg_set_topic (msg, topic) < 0) - goto error; - if (flux_msg_set_cred (msg, cred) < 0) - goto error; - if ((flags & FLUX_MSGFLAG_PRIVATE)) { - if (flux_msg_set_private (msg) < 0) - goto error; - } - if (src) { // optional payload - int srclen = strlen (src); - size_t dstbuflen = base64_decoded_length (srclen); - ssize_t dstlen; - - if (!(dst = malloc (dstbuflen))) - goto error; - if ((dstlen = base64_decode (dst, dstbuflen, src, srclen)) < 0) { - errno = EPROTO; - goto error; - } - if (flux_msg_set_payload (msg, dst, dstlen) < 0) { - if (errno == EINVAL) - errno = EPROTO; - goto error; - } - } - free (dst); - return msg; -error: - ERRNO_SAFE_WRAP (free, dst); - flux_msg_destroy (msg); - return NULL; -} - -static void overlay_publish_cb (flux_t *h, - flux_msg_handler_t *mh, - const flux_msg_t *msg, - void *arg) -{ - struct overlay *ov = arg; - const char *topic; - const char *payload = NULL; // optional - int flags; - struct flux_msg_cred cred; - flux_msg_t *event; - const char *errmsg = NULL; - int seq; - - if (flux_request_unpack (msg, - NULL, - "{s:s s:i s?s}", - "topic", &topic, - "flags", &flags, - "payload", &payload) < 0) - goto error; - if (ov->rank > 0) { - errno = EPROTO; - errmsg = "this service is only available on rank 0"; - goto error; - } - if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) { - errno = EPROTO; - goto error; - } - if (flux_msg_get_cred (msg, &cred) < 0) - goto error; - if (!(event = encode_event (topic, flags, cred, payload)) - || overlay_publish_new (ov, &event, &seq) < 0) { - flux_msg_decref (event); - goto error; - } - if (flux_respond_pack (h, msg, "{s:i}", "seq", seq) < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, errmsg) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - int overlay_cert_load (struct overlay *ov, const char *path, flux_error_t *errp) @@ -2740,12 +2618,6 @@ void overlay_destroy (struct overlay *ov) } static const struct flux_msg_handler_spec htab[] = { - { - FLUX_MSGTYPE_REQUEST, - "overlay.publish", - overlay_publish_cb, - FLUX_ROLE_USER, - }, { FLUX_MSGTYPE_REQUEST, "overlay.trace", diff --git a/src/broker/test/overlay.c b/src/broker/test/overlay.c index cfe4bd5f8291..21cf4d68ad52 100644 --- a/src/broker/test/overlay.c +++ b/src/broker/test/overlay.c @@ -174,7 +174,6 @@ void single (flux_t *h) char *s; struct idset *critical_ranks; const char *topic; - uint32_t seq; ok (overlay_set_topology (ctx->ov, ctx->topo) == 0, "%s: overlay_set_topology size=1 rank=0 works", ctx->name); @@ -198,56 +197,6 @@ void single (flux_t *h) check_attr (ctx, "tbon.maxlevel", "0"); check_attr (ctx, "tbon.descendants", "0"); - /* Event - * Overlay re-publishes non-sequenced message, so we get it - * back with a sequence number. - */ - if (!(msg = flux_event_encode ("foo_event", NULL))) - BAIL_OUT ("flux_event_encode failed"); - ok (flux_send (ctx->h_channel, msg, 0) == 0, - "%s: flux_send event works", ctx->name); - flux_msg_decref (msg); - - ok (flux_reactor_run (r, FLUX_REACTOR_ONCE) >= 0, - "flux_reactor_run ONCE"); - - msg = flux_recv (ctx->h_channel, FLUX_MATCH_EVENT, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 && streq (topic, "foo_event"), - "%s: overlay published our message", ctx->name); - ok (flux_msg_get_seq (msg, &seq) == 0 && seq == 1, - "%s: event sequence = 1", ctx->name); - flux_msg_decref (msg); - - /* Event publish request - */ - if (!(msg = flux_request_encode ("overlay.publish", NULL)) - || flux_msg_pack (msg, - "{s:s s:i}", - "topic", "smurf", - "flags", FLUX_MSGFLAG_PRIVATE) < 0) - BAIL_OUT ("flux_request_encode failed"); - ok (flux_send (ctx->h, msg, 0) == 0, - "%s: flux_send event works", ctx->name); - flux_msg_decref (msg); - - ok (flux_reactor_run (r, FLUX_REACTOR_ONCE) >= 0, - "flux_reactor_run ONCE"); - - msg = flux_recv (ctx->h, FLUX_MATCH_RESPONSE, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 - && streq (topic, "overlay.publish"), - "%s overlay responded to publish request", ctx->name); - flux_msg_decref (msg); - - msg = flux_recv (ctx->h_channel, FLUX_MATCH_EVENT, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 && streq (topic, "smurf"), - "%s: event message is received", ctx->name); - ok (flux_msg_get_seq (msg, &seq) == 0 && seq == 2, - "%s: event sequence is 2", ctx->name); - ok (flux_msg_is_private (msg), - "%s: privacy flag is set", ctx->name); - flux_msg_decref (msg); - /* Response * Will try child but there isn't one, so message is dropped. */ @@ -493,12 +442,6 @@ void trio (flux_t *h) "%s: flux_msg_is_local returns false for event from child", ctx[0]->name); - rmsg = recvmsg_timeout (ctx[1], 5); - ok (rmsg != NULL, - "%s: event was received by overlay", ctx[1]->name); - ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeek"), - "%s: received message has expected topic", ctx[1]->name); - /* Response 0->1 */ if (!(msg = flux_response_encode ("moop", NULL))) @@ -531,12 +474,6 @@ void trio (flux_t *h) ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeeb"), "%s: received message has expected topic", ctx[1]->name); - rmsg = recvmsg_timeout (ctx[0], 5); - ok (rmsg != NULL, - "%s: event was received by overlay", ctx[0]->name); - ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeeb"), - "%s: received message has expected topic", ctx[0]->name); - /* Cover some error code in overlay_bind() where the ZAP handler * fails to initialize because its endpoint is already bound. */ diff --git a/src/common/libflux/event.c b/src/common/libflux/event.c index d353525aa93e..f56d187da1ed 100644 --- a/src/common/libflux/event.c +++ b/src/common/libflux/event.c @@ -282,7 +282,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, return NULL; } if (!(f = flux_rpc_pack (h, - "overlay.publish", + "event.publish", 0, 0, "{s:s s:i s:s}", @@ -298,7 +298,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, } else { if (!(f = flux_rpc_pack (h, - "overlay.publish", + "event.publish", 0, 0, "{s:s s:i}", diff --git a/t/lua/t0003-events.t b/t/lua/t0003-events.t index 1eb9398ed5b3..b83190037c6a 100755 --- a/t/lua/t0003-events.t +++ b/t/lua/t0003-events.t @@ -55,36 +55,36 @@ type_ok (msg, 'table', "recv_event: got msg as a table") is_deeply (msg, {}, "recv_event: got empty payload as expected") --- poke at overlay.publish service +-- poke at event.publish service -- good request, no payload local request = { topic = "foo", flags = 0 } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works without payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works without payload") -- good request, with raw payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ=" } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works with payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with payload") -- good request, with JSON "{}\0" local request = { topic = "foo", flags = 0, payload = "e30A" } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works with json payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with json payload") -- flags missing from request local request = { topic = "foo" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: no flags, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: no flags, fails with EPROTO") -- mangled base64 payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ%" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: bad base64, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad base64, fails with EPROTO") -- good request, mangled JSON payload "{\0" local request = { topic = "foo", flags = 4, payload = "ewA=" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: bad json payload, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad json payload, fails with EPROTO") done_testing () diff --git a/t/t0004-event.t b/t/t0004-event.t index b0fa2bd897f7..bd438ff0d9d4 100755 --- a/t/t0004-event.t +++ b/t/t0004-event.t @@ -87,8 +87,8 @@ test_expect_success 'publish private event with no payload (synchronous,loopback run_timeout 5 flux event pub -p -s -l foo.bar ' -test_expect_success 'overlay.publish request with empty payload fails with EPROTO(71)' ' - ${RPC} overlay.publish 71