From aace27d8123c39da959818673e0f30ae66ed1d3d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 2 Dec 2025 09:00:04 -0800 Subject: [PATCH] broker: assign event seqs in the broker not overlay Problem: flux-framework/flux-core#7109 moved all event publishing from the broker to the overlay subsystem so that the peer multicast portion could eventually run in separate thread. Unfortunately, moving all event publication machinery to overlay creates an unnecessary requirement that overlay be running on a size=1 instance, and will complicate 'flux module reload overlay', should that be useful. Leave the resource intensive event mcast code in overlay but move the rest of it back into the broker. TL;DR Recall there are two ways that event messages are routed per RFC 3: 1) A bare event message is forwarded upstream on the TBON and published when it arrives on rank 0. 2) An event message is base64 encoded and encapsulated in a request message that is sent to rank 0, where it is published. Publication occurs only on rank 0 and consists of assigning a monotonically increasing sequence number, distributing the message to local broker and module subscribers, and sending the event message to the overlay via the interthread message channel for further distribution. The overlay now routes events as follows: - If received from the local broker on the interthread channel: On rank 0, messages are mcast to all children. On rank > 0, messages are forwarded to the parent for publication. - If received from an overlay child: On rank 0, messages are forwarded to the local broker for publication. On rank > 0, message are forwarded to the parent for publication - If received from the overlay parent (rank > 0), messages are mcast to all children AND sent to the local broker on the interthread channel for distribution to local broker and module subscribers. Update the overlay unit test that were exercising full event publication. Update some event sharness tests that used the other RPC name. --- src/broker/broker.c | 195 ++++++++++++++++++++++++++++++++++--- src/broker/modhash.c | 2 +- src/broker/overlay.c | 174 +++++---------------------------- src/broker/test/overlay.c | 63 ------------ src/common/libflux/event.c | 4 +- t/lua/t0003-events.t | 26 ++--- t/t0004-event.t | 4 +- 7 files changed, 225 insertions(+), 243 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index fe7a592a3fff..706701f93ca3 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -56,6 +56,7 @@ #include "src/common/libutil/basename.h" #include "src/common/librouter/subhash.h" #include "ccan/array_size/array_size.h" +#include "src/common/libccan/ccan/base64/base64.h" #include "ccan/str/str.h" #include "ccan/ptrint/ptrint.h" #ifndef HAVE_STRLCPY @@ -1476,6 +1477,162 @@ static void service_remove_cb (flux_t *h, flux_log_error (h, "service_remove: flux_respond_error"); } +/* Distribute an event message to broker and broker module subscribers. + */ +static void mcast_event_locally_new (struct broker *ctx, flux_msg_t **msg) +{ + if (modhash_event_mcast (ctx->modhash, *msg) < 0) + flux_log_error (ctx->h, "event mcast failed to broker modules"); + + const char *topic = NULL; + (void)flux_msg_get_topic (*msg, &topic); + if (subhash_topic_match (ctx->sub, topic)) { + if (flux_send_new (ctx->h_internal, msg, 0) < 0) + flux_log_error (ctx->h, "send failed on internal broker handle"); + } + + flux_msg_decref (*msg); + *msg = NULL; +} + +/* Distribute an event message to broker subscribers, broker module + * subscribers, and overlay peers. Avoid unnecessary message copies. + * Call this on rank 0 only. + */ +static void mcast_event_globally_new (struct broker *ctx, flux_msg_t **msg) +{ + if (modhash_event_mcast (ctx->modhash, *msg) < 0) + flux_log_error (ctx->h, "mcast failed to broker modules"); + + const char *topic = NULL; + (void)flux_msg_get_topic (*msg, &topic); + if (subhash_topic_match (ctx->sub, topic)) { + if (ctx->size > 1) { + if (flux_send (ctx->h_internal, *msg, 0) < 0) + flux_log_error (ctx->h, "mcast failed to internal handle"); + } + else { + if (flux_send_new (ctx->h_internal, msg, 0) < 0) + flux_log_error (ctx->h, "mcast failed to internal handle"); + } + } + + if (ctx->size > 1) { + if (flux_send_new (ctx->h_overlay, msg, 0) < 0) + flux_log_error (ctx->h, "could not forward event to overlay"); + } + + flux_msg_decref (*msg); + *msg = NULL; +} + +/* Assign a sequence number to an event message and distribute globally. + * The event sequence starts with 1, leaving 0 to mean "unpublished" + * when appearing in a message, although that is not required by RFC 3 + * nor used in broker routing decisions. + * Call this on rank 0 only. + */ +static int publish_event_new (struct broker *ctx, flux_msg_t **msg, int *seq) +{ + static int event_seq = 1; + + if (flux_msg_set_seq (*msg, event_seq) < 0) + return -1; + mcast_event_globally_new (ctx, msg); + if (seq) + *seq = event_seq; + event_seq++; + return 0; +} + +static flux_msg_t *encode_event (const char *topic, + int flags, + struct flux_msg_cred cred, + const char *src) +{ + flux_msg_t *msg; + char *dst = NULL; + + if (!(msg = flux_msg_create (FLUX_MSGTYPE_EVENT))) + goto error; + if (flux_msg_set_topic (msg, topic) < 0) + goto error; + if (flux_msg_set_cred (msg, cred) < 0) + goto error; + if ((flags & FLUX_MSGFLAG_PRIVATE)) { + if (flux_msg_set_private (msg) < 0) + goto error; + } + if (src) { // optional payload + int srclen = strlen (src); + size_t dstbuflen = base64_decoded_length (srclen); + ssize_t dstlen; + + if (!(dst = malloc (dstbuflen))) + goto error; + if ((dstlen = base64_decode (dst, dstbuflen, src, srclen)) < 0) { + errno = EPROTO; + goto error; + } + if (flux_msg_set_payload (msg, dst, dstlen) < 0) { + if (errno == EINVAL) + errno = EPROTO; + goto error; + } + } + free (dst); + return msg; +error: + ERRNO_SAFE_WRAP (free, dst); + flux_msg_destroy (msg); + return NULL; +} + +static void event_publish_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct broker *ctx = arg; + const char *topic; + const char *payload = NULL; // optional + int flags; + struct flux_msg_cred cred; + flux_msg_t *event; + const char *errmsg = NULL; + int seq; + + if (flux_request_unpack (msg, + NULL, + "{s:s s:i s?s}", + "topic", &topic, + "flags", &flags, + "payload", &payload) < 0) + goto error; + if (ctx->rank > 0) { + errno = EPROTO; + errmsg = "this service is only available on rank 0"; + goto error; + } + if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) { + errno = EPROTO; + goto error; + } + if (flux_msg_get_cred (msg, &cred) < 0) + goto error; + if (!(event = encode_event (topic, flags, cred, payload)) + || publish_event_new (ctx, &event, &seq) < 0) { + flux_msg_decref (event); + goto error; + } + if (flux_respond_pack (h, msg, "{s:i}", "seq", seq) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; +error: + if (flux_respond_error (h, msg, errno, errmsg) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + static void event_subscribe_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -1575,6 +1732,12 @@ static const struct flux_msg_handler_spec htab[] = { service_remove_cb, FLUX_ROLE_USER, }, + { + FLUX_MSGTYPE_REQUEST, + "event.publish", + event_publish_cb, + FLUX_ROLE_USER, + }, { FLUX_MSGTYPE_REQUEST, "event.subscribe", @@ -1649,8 +1812,6 @@ static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx, **/ /* Handle messages on interthread://overlay - * N.B. even when there is only one node, event messages are processed - * by the overlay. */ static void overlay_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -1680,14 +1841,16 @@ static void overlay_cb (flux_reactor_t *r, goto drop; break; case FLUX_MSGTYPE_EVENT: - if (modhash_event_mcast (ctx->modhash, msg) < 0) - flux_log_error (ctx->h, - "mcast failed to broker modules"); - if (subhash_topic_match (ctx->sub, topic) - && flux_send_new (ctx->h_internal, &msg, 0) < 0) { - flux_log_error (ctx->h, - "send failed on internal broker handle"); + /* The overlay sends the broker only unpublished events on rank 0. + * It only sends published events on other ranks. + */ + if (ctx->rank == 0) { + if (publish_event_new (ctx, &msg, NULL) < 0) + goto drop; } + else + mcast_event_locally_new (ctx, &msg); + break; default: break; } @@ -1704,6 +1867,7 @@ static void overlay_cb (flux_reactor_t *r, flux_msg_typestr (type), topic); } + flux_msg_decref (msg); } static bool signal_is_deadly (int signum) @@ -1891,8 +2055,17 @@ static void h_internal_watcher (flux_reactor_t *r, goto error; break; case FLUX_MSGTYPE_EVENT: - if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) - goto error; + /* Events sent on ctx->h are assumed to be unpublished, + * so they must either be published or forwarded upstream. + */ + if (ctx->rank == 0) { + if (publish_event_new (ctx, &msg, NULL) < 0) + goto error; + } + else { + if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) + goto error; + } break; default: goto error; diff --git a/src/broker/modhash.c b/src/broker/modhash.c index fffbf3894b25..b7ce0889cc20 100644 --- a/src/broker/modhash.c +++ b/src/broker/modhash.c @@ -227,7 +227,7 @@ static void module_cb (module_t *p, void *arg) broker_request_sendmsg_new (ctx, &msg); break; case FLUX_MSGTYPE_EVENT: - if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) { + if (flux_send_new (ctx->h, &msg, 0) < 0) { flux_log_error (ctx->h, "%s(%s): send to overlay: %s", __FUNCTION__, diff --git a/src/broker/overlay.c b/src/broker/overlay.c index f93e0ee550ad..0e5fa425ce90 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -36,7 +36,6 @@ #include "src/common/libutil/errprintf.h" #include "src/common/librouter/rpc_track.h" #include "src/common/libflux/message_route.h" // for msg_route_sendto() -#include "src/common/libccan/ccan/base64/base64.h" #include "ccan/str/str.h" #ifndef HAVE_STRLCPY #include "src/common/libmissing/strlcpy.h" @@ -159,7 +158,7 @@ struct overlay { struct topology *topo; uint32_t size; uint32_t rank; - int event_seq; // assign on rank 0, track on rank > 0 + int event_seq; // used for sequence verification char uuid[UUID_STR_LEN]; int version; int zmqdebug; @@ -202,9 +201,9 @@ static int overlay_control_parent (struct overlay *ov, int status); static void overlay_health_respond_all (struct overlay *ov); static struct child *child_lookup_byrank (struct overlay *ov, uint32_t rank); -static int overlay_publish_new (struct overlay *ov, flux_msg_t **msg, int *seq); static int overlay_goodbye_parent (struct overlay *overlay, flux_error_t *errp); static int overlay_get_child_online_peer_count (struct overlay *ov); +static void overlay_event_checkseq (struct overlay *ov, const flux_msg_t *msg); /* Convenience iterator for ov->children */ @@ -656,13 +655,15 @@ static void channel_cb (flux_reactor_t *r, } break; case FLUX_MSGTYPE_EVENT: - if (ov->rank == 0) { // publish - if (overlay_publish_new (ov, &msg, NULL) < 0) { - flux_log_error (ov->h, "error publishing event"); - goto done; - } + /* On rank 0, the broker sends events to the overlay for downstream + * distribution. On other ranks, the broker sends unpublished + * events for upstream publication. + */ + if (ov->rank == 0) { + overlay_event_checkseq (ov, msg); + overlay_mcast_child (ov, msg); } - else { // forward upstream + else { flux_msg_route_enable (msg); if (overlay_sendmsg_parent (ov, msg) < 0) { flux_log_error (ov->h, "error forwarding event upstream"); @@ -909,10 +910,7 @@ static int overlay_mcast_send (const flux_msg_t *msg, void *arg) return overlay_sendmsg_child (ov, msg); } -/* Forward an event message to downstream peers. This may be a new event - * published on rank 0 via overlay_publish_new() or an event received from - * the upstream (parent) overlay peer. In either case, this propagates - * the event to the next TBON level, where propagation continues. +/* Forward an event message to downstream peers. */ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg) { @@ -946,41 +944,6 @@ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg) } } -/* Publish an event message on rank 0 originating from: - * - a downstream (child) overlay peer - received on child_cb() - * - the local broker on behalf of a module or user - * - the overlay.publish RPC (used when the publisher needs the seq number) - * - * N.B. the _new suffix in the function name is meant to indicate that - * the reference to *msg is stolen by the function on success. - */ -static int overlay_publish_new (struct overlay *ov, - flux_msg_t **msg, - int *seqp) -{ - int seq; - - if (ov->rank != 0) { - errno = EINVAL; - return -1; - } - /* The event sequence starts with 1. - * This allows 0 to indicate "sequence not set". - */ - seq = ++ov->event_seq; - if (flux_msg_set_seq (*msg, seq) < 0) { - ov->event_seq--; - return -1; - } - overlay_mcast_child (ov, *msg); - if (seqp) - *seqp = seq; - (void)flux_send_new (ov->h_channel, msg, 0); - flux_msg_decref (*msg); - *msg = NULL; - return 0; -} - static void logdrop (struct overlay *ov, const char *where, const flux_msg_t *msg, @@ -1123,13 +1086,17 @@ static void child_cb (flux_reactor_t *r, rpc_track_update (child->tracker, msg); break; case FLUX_MSGTYPE_EVENT: - if (ov->rank == 0) // publish - (void)overlay_publish_new (ov, &msg, NULL); - else { // forward upstream + /* An event message traveling upstream will always be unpublished. + * Forward upstream, or on rank 0, to local broker for publication. + */ + if (ov->rank > 0) { flux_msg_route_enable (msg); overlay_sendmsg_parent (ov, msg); + goto done; } - goto done; + flux_msg_route_disable (msg); + // fall through to forward message to broker + break; } trace_overlay_msg (ov->h, "rx", child->rank, ov->trace_requests, msg); if (flux_send_new (ov->h_channel, &msg, 0) < 0) @@ -1165,10 +1132,9 @@ static void parent_disconnect (struct overlay *ov) } } -/* Sanity check an event message that has been received from the upstream - * (parent) overlay peer. +/* Sanity check that event messages are properly sequenced. */ -static void parent_event_checkseq (struct overlay *ov, const flux_msg_t *msg) +static void overlay_event_checkseq (struct overlay *ov, const flux_msg_t *msg) { uint32_t seq; @@ -1239,10 +1205,10 @@ static void parent_cb (flux_reactor_t *r, * An event type message should not have routing enabled * under normal circumstances, so turn it off here. */ - parent_event_checkseq (ov, msg); + overlay_event_checkseq (ov, msg); overlay_mcast_child (ov, msg); flux_msg_route_disable (msg); - // fall through and let local broker receive this message + // fall through and let local broker distribute locally break; case FLUX_MSGTYPE_CONTROL: { int ctrl_type, reason; @@ -2167,94 +2133,6 @@ static void overlay_trace_cb (flux_t *h, flux_log_error (h, "error responding to overlay.trace"); } -static flux_msg_t *encode_event (const char *topic, - int flags, - struct flux_msg_cred cred, - const char *src) -{ - flux_msg_t *msg; - char *dst = NULL; - - if (!(msg = flux_msg_create (FLUX_MSGTYPE_EVENT))) - goto error; - if (flux_msg_set_topic (msg, topic) < 0) - goto error; - if (flux_msg_set_cred (msg, cred) < 0) - goto error; - if ((flags & FLUX_MSGFLAG_PRIVATE)) { - if (flux_msg_set_private (msg) < 0) - goto error; - } - if (src) { // optional payload - int srclen = strlen (src); - size_t dstbuflen = base64_decoded_length (srclen); - ssize_t dstlen; - - if (!(dst = malloc (dstbuflen))) - goto error; - if ((dstlen = base64_decode (dst, dstbuflen, src, srclen)) < 0) { - errno = EPROTO; - goto error; - } - if (flux_msg_set_payload (msg, dst, dstlen) < 0) { - if (errno == EINVAL) - errno = EPROTO; - goto error; - } - } - free (dst); - return msg; -error: - ERRNO_SAFE_WRAP (free, dst); - flux_msg_destroy (msg); - return NULL; -} - -static void overlay_publish_cb (flux_t *h, - flux_msg_handler_t *mh, - const flux_msg_t *msg, - void *arg) -{ - struct overlay *ov = arg; - const char *topic; - const char *payload = NULL; // optional - int flags; - struct flux_msg_cred cred; - flux_msg_t *event; - const char *errmsg = NULL; - int seq; - - if (flux_request_unpack (msg, - NULL, - "{s:s s:i s?s}", - "topic", &topic, - "flags", &flags, - "payload", &payload) < 0) - goto error; - if (ov->rank > 0) { - errno = EPROTO; - errmsg = "this service is only available on rank 0"; - goto error; - } - if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) { - errno = EPROTO; - goto error; - } - if (flux_msg_get_cred (msg, &cred) < 0) - goto error; - if (!(event = encode_event (topic, flags, cred, payload)) - || overlay_publish_new (ov, &event, &seq) < 0) { - flux_msg_decref (event); - goto error; - } - if (flux_respond_pack (h, msg, "{s:i}", "seq", seq) < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - return; -error: - if (flux_respond_error (h, msg, errno, errmsg) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - int overlay_cert_load (struct overlay *ov, const char *path, flux_error_t *errp) @@ -2740,12 +2618,6 @@ void overlay_destroy (struct overlay *ov) } static const struct flux_msg_handler_spec htab[] = { - { - FLUX_MSGTYPE_REQUEST, - "overlay.publish", - overlay_publish_cb, - FLUX_ROLE_USER, - }, { FLUX_MSGTYPE_REQUEST, "overlay.trace", diff --git a/src/broker/test/overlay.c b/src/broker/test/overlay.c index cfe4bd5f8291..21cf4d68ad52 100644 --- a/src/broker/test/overlay.c +++ b/src/broker/test/overlay.c @@ -174,7 +174,6 @@ void single (flux_t *h) char *s; struct idset *critical_ranks; const char *topic; - uint32_t seq; ok (overlay_set_topology (ctx->ov, ctx->topo) == 0, "%s: overlay_set_topology size=1 rank=0 works", ctx->name); @@ -198,56 +197,6 @@ void single (flux_t *h) check_attr (ctx, "tbon.maxlevel", "0"); check_attr (ctx, "tbon.descendants", "0"); - /* Event - * Overlay re-publishes non-sequenced message, so we get it - * back with a sequence number. - */ - if (!(msg = flux_event_encode ("foo_event", NULL))) - BAIL_OUT ("flux_event_encode failed"); - ok (flux_send (ctx->h_channel, msg, 0) == 0, - "%s: flux_send event works", ctx->name); - flux_msg_decref (msg); - - ok (flux_reactor_run (r, FLUX_REACTOR_ONCE) >= 0, - "flux_reactor_run ONCE"); - - msg = flux_recv (ctx->h_channel, FLUX_MATCH_EVENT, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 && streq (topic, "foo_event"), - "%s: overlay published our message", ctx->name); - ok (flux_msg_get_seq (msg, &seq) == 0 && seq == 1, - "%s: event sequence = 1", ctx->name); - flux_msg_decref (msg); - - /* Event publish request - */ - if (!(msg = flux_request_encode ("overlay.publish", NULL)) - || flux_msg_pack (msg, - "{s:s s:i}", - "topic", "smurf", - "flags", FLUX_MSGFLAG_PRIVATE) < 0) - BAIL_OUT ("flux_request_encode failed"); - ok (flux_send (ctx->h, msg, 0) == 0, - "%s: flux_send event works", ctx->name); - flux_msg_decref (msg); - - ok (flux_reactor_run (r, FLUX_REACTOR_ONCE) >= 0, - "flux_reactor_run ONCE"); - - msg = flux_recv (ctx->h, FLUX_MATCH_RESPONSE, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 - && streq (topic, "overlay.publish"), - "%s overlay responded to publish request", ctx->name); - flux_msg_decref (msg); - - msg = flux_recv (ctx->h_channel, FLUX_MATCH_EVENT, FLUX_O_NONBLOCK); - ok (flux_msg_get_topic (msg, &topic) == 0 && streq (topic, "smurf"), - "%s: event message is received", ctx->name); - ok (flux_msg_get_seq (msg, &seq) == 0 && seq == 2, - "%s: event sequence is 2", ctx->name); - ok (flux_msg_is_private (msg), - "%s: privacy flag is set", ctx->name); - flux_msg_decref (msg); - /* Response * Will try child but there isn't one, so message is dropped. */ @@ -493,12 +442,6 @@ void trio (flux_t *h) "%s: flux_msg_is_local returns false for event from child", ctx[0]->name); - rmsg = recvmsg_timeout (ctx[1], 5); - ok (rmsg != NULL, - "%s: event was received by overlay", ctx[1]->name); - ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeek"), - "%s: received message has expected topic", ctx[1]->name); - /* Response 0->1 */ if (!(msg = flux_response_encode ("moop", NULL))) @@ -531,12 +474,6 @@ void trio (flux_t *h) ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeeb"), "%s: received message has expected topic", ctx[1]->name); - rmsg = recvmsg_timeout (ctx[0], 5); - ok (rmsg != NULL, - "%s: event was received by overlay", ctx[0]->name); - ok (flux_msg_get_topic (rmsg, &topic) == 0 && streq (topic, "eeeb"), - "%s: received message has expected topic", ctx[0]->name); - /* Cover some error code in overlay_bind() where the ZAP handler * fails to initialize because its endpoint is already bound. */ diff --git a/src/common/libflux/event.c b/src/common/libflux/event.c index d353525aa93e..f56d187da1ed 100644 --- a/src/common/libflux/event.c +++ b/src/common/libflux/event.c @@ -282,7 +282,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, return NULL; } if (!(f = flux_rpc_pack (h, - "overlay.publish", + "event.publish", 0, 0, "{s:s s:i s:s}", @@ -298,7 +298,7 @@ static flux_future_t *wrap_event_rpc (flux_t *h, } else { if (!(f = flux_rpc_pack (h, - "overlay.publish", + "event.publish", 0, 0, "{s:s s:i}", diff --git a/t/lua/t0003-events.t b/t/lua/t0003-events.t index 1eb9398ed5b3..b83190037c6a 100755 --- a/t/lua/t0003-events.t +++ b/t/lua/t0003-events.t @@ -55,36 +55,36 @@ type_ok (msg, 'table', "recv_event: got msg as a table") is_deeply (msg, {}, "recv_event: got empty payload as expected") --- poke at overlay.publish service +-- poke at event.publish service -- good request, no payload local request = { topic = "foo", flags = 0 } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works without payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works without payload") -- good request, with raw payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ=" } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works with payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with payload") -- good request, with JSON "{}\0" local request = { topic = "foo", flags = 0, payload = "e30A" } -local response, err = f:rpc ("overlay.publish", request); -is (err, nil, "overlay.publish: works with json payload") +local response, err = f:rpc ("event.publish", request); +is (err, nil, "event.publish: works with json payload") -- flags missing from request local request = { topic = "foo" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: no flags, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: no flags, fails with EPROTO") -- mangled base64 payload local request = { topic = "foo", flags = 0, payload = "aGVsbG8gd29ybGQ%" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: bad base64, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad base64, fails with EPROTO") -- good request, mangled JSON payload "{\0" local request = { topic = "foo", flags = 4, payload = "ewA=" } -local response, err = f:rpc ("overlay.publish", request); -is (err, "Protocol error", "overlay.publish: bad json payload, fails with EPROTO") +local response, err = f:rpc ("event.publish", request); +is (err, "Protocol error", "event.publish: bad json payload, fails with EPROTO") done_testing () diff --git a/t/t0004-event.t b/t/t0004-event.t index b0fa2bd897f7..bd438ff0d9d4 100755 --- a/t/t0004-event.t +++ b/t/t0004-event.t @@ -87,8 +87,8 @@ test_expect_success 'publish private event with no payload (synchronous,loopback run_timeout 5 flux event pub -p -s -l foo.bar ' -test_expect_success 'overlay.publish request with empty payload fails with EPROTO(71)' ' - ${RPC} overlay.publish 71