Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 184 additions & 11 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "src/common/libutil/basename.h"
#include "src/common/librouter/subhash.h"
#include "ccan/array_size/array_size.h"
#include "src/common/libccan/ccan/base64/base64.h"
#include "ccan/str/str.h"
#include "ccan/ptrint/ptrint.h"
#ifndef HAVE_STRLCPY
Expand Down Expand Up @@ -1476,6 +1477,162 @@ static void service_remove_cb (flux_t *h,
flux_log_error (h, "service_remove: flux_respond_error");
}

/* Distribute an event message to broker and broker module subscribers.
*/
static void mcast_event_locally_new (struct broker *ctx, flux_msg_t **msg)
{
if (modhash_event_mcast (ctx->modhash, *msg) < 0)
flux_log_error (ctx->h, "event mcast failed to broker modules");

const char *topic = NULL;
(void)flux_msg_get_topic (*msg, &topic);
if (subhash_topic_match (ctx->sub, topic)) {
if (flux_send_new (ctx->h_internal, msg, 0) < 0)
flux_log_error (ctx->h, "send failed on internal broker handle");
}

flux_msg_decref (*msg);
*msg = NULL;
}

/* Distribute an event message to broker subscribers, broker module
* subscribers, and overlay peers. Avoid unnecessary message copies.
* Call this on rank 0 only.
*/
static void mcast_event_globally_new (struct broker *ctx, flux_msg_t **msg)
{
if (modhash_event_mcast (ctx->modhash, *msg) < 0)
flux_log_error (ctx->h, "mcast failed to broker modules");

const char *topic = NULL;
(void)flux_msg_get_topic (*msg, &topic);
if (subhash_topic_match (ctx->sub, topic)) {
if (ctx->size > 1) {
if (flux_send (ctx->h_internal, *msg, 0) < 0)
flux_log_error (ctx->h, "mcast failed to internal handle");
}
else {
if (flux_send_new (ctx->h_internal, msg, 0) < 0)
flux_log_error (ctx->h, "mcast failed to internal handle");
}
}

if (ctx->size > 1) {
if (flux_send_new (ctx->h_overlay, msg, 0) < 0)
flux_log_error (ctx->h, "could not forward event to overlay");
}

flux_msg_decref (*msg);
*msg = NULL;
}

/* Assign a sequence number to an event message and distribute globally.
* The event sequence starts with 1, leaving 0 to mean "unpublished"
* when appearing in a message, although that is not required by RFC 3
* nor used in broker routing decisions.
* Call this on rank 0 only.
*/
static int publish_event_new (struct broker *ctx, flux_msg_t **msg, int *seq)
{
static int event_seq = 1;

if (flux_msg_set_seq (*msg, event_seq) < 0)
return -1;
mcast_event_globally_new (ctx, msg);
if (seq)
*seq = event_seq;
event_seq++;
return 0;
}

static flux_msg_t *encode_event (const char *topic,
int flags,
struct flux_msg_cred cred,
const char *src)
{
flux_msg_t *msg;
char *dst = NULL;

if (!(msg = flux_msg_create (FLUX_MSGTYPE_EVENT)))
goto error;
if (flux_msg_set_topic (msg, topic) < 0)
goto error;
if (flux_msg_set_cred (msg, cred) < 0)
goto error;
if ((flags & FLUX_MSGFLAG_PRIVATE)) {
if (flux_msg_set_private (msg) < 0)
goto error;
}
if (src) { // optional payload
int srclen = strlen (src);
size_t dstbuflen = base64_decoded_length (srclen);
ssize_t dstlen;

if (!(dst = malloc (dstbuflen)))
goto error;
if ((dstlen = base64_decode (dst, dstbuflen, src, srclen)) < 0) {
errno = EPROTO;
goto error;
}
if (flux_msg_set_payload (msg, dst, dstlen) < 0) {
if (errno == EINVAL)
errno = EPROTO;
goto error;
}
}
free (dst);
return msg;
error:
ERRNO_SAFE_WRAP (free, dst);
flux_msg_destroy (msg);
return NULL;
}

static void event_publish_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct broker *ctx = arg;
const char *topic;
const char *payload = NULL; // optional
int flags;
struct flux_msg_cred cred;
flux_msg_t *event;
const char *errmsg = NULL;
int seq;

if (flux_request_unpack (msg,
NULL,
"{s:s s:i s?s}",
"topic", &topic,
"flags", &flags,
"payload", &payload) < 0)
goto error;
if (ctx->rank > 0) {
errno = EPROTO;
errmsg = "this service is only available on rank 0";
goto error;
}
if ((flags & ~(FLUX_MSGFLAG_PRIVATE)) != 0) {
errno = EPROTO;
goto error;
}
if (flux_msg_get_cred (msg, &cred) < 0)
goto error;
if (!(event = encode_event (topic, flags, cred, payload))
|| publish_event_new (ctx, &event, &seq) < 0) {
flux_msg_decref (event);
goto error;
}
if (flux_respond_pack (h, msg, "{s:i}", "seq", seq) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
error:
if (flux_respond_error (h, msg, errno, errmsg) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void event_subscribe_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -1575,6 +1732,12 @@ static const struct flux_msg_handler_spec htab[] = {
service_remove_cb,
FLUX_ROLE_USER,
},
{
FLUX_MSGTYPE_REQUEST,
"event.publish",
event_publish_cb,
FLUX_ROLE_USER,
},
{
FLUX_MSGTYPE_REQUEST,
"event.subscribe",
Expand Down Expand Up @@ -1649,8 +1812,6 @@ static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx,
**/

/* Handle messages on interthread://overlay
* N.B. even when there is only one node, event messages are processed
* by the overlay.
*/
static void overlay_cb (flux_reactor_t *r,
flux_watcher_t *w,
Expand Down Expand Up @@ -1680,14 +1841,16 @@ static void overlay_cb (flux_reactor_t *r,
goto drop;
break;
case FLUX_MSGTYPE_EVENT:
if (modhash_event_mcast (ctx->modhash, msg) < 0)
flux_log_error (ctx->h,
"mcast failed to broker modules");
if (subhash_topic_match (ctx->sub, topic)
&& flux_send_new (ctx->h_internal, &msg, 0) < 0) {
flux_log_error (ctx->h,
"send failed on internal broker handle");
/* The overlay sends the broker only unpublished events on rank 0.
* It only sends published events on other ranks.
*/
if (ctx->rank == 0) {
if (publish_event_new (ctx, &msg, NULL) < 0)
goto drop;
}
else
mcast_event_locally_new (ctx, &msg);
break;
default:
break;
}
Expand All @@ -1704,6 +1867,7 @@ static void overlay_cb (flux_reactor_t *r,
flux_msg_typestr (type),
topic);
}
flux_msg_decref (msg);
}

static bool signal_is_deadly (int signum)
Expand Down Expand Up @@ -1891,8 +2055,17 @@ static void h_internal_watcher (flux_reactor_t *r,
goto error;
break;
case FLUX_MSGTYPE_EVENT:
if (flux_send_new (ctx->h_overlay, &msg, 0) < 0)
goto error;
/* Events sent on ctx->h are assumed to be unpublished,
* so they must either be published or forwarded upstream.
*/
if (ctx->rank == 0) {
if (publish_event_new (ctx, &msg, NULL) < 0)
goto error;
}
else {
if (flux_send_new (ctx->h_overlay, &msg, 0) < 0)
goto error;
}
break;
default:
goto error;
Expand Down
2 changes: 1 addition & 1 deletion src/broker/modhash.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static void module_cb (module_t *p, void *arg)
broker_request_sendmsg_new (ctx, &msg);
break;
case FLUX_MSGTYPE_EVENT:
if (flux_send_new (ctx->h_overlay, &msg, 0) < 0) {
if (flux_send_new (ctx->h, &msg, 0) < 0) {
flux_log_error (ctx->h,
"%s(%s): send to overlay: %s",
__FUNCTION__,
Expand Down
Loading
Loading