Skip to content

Commit 3b1ad4c

Browse files
authored
Merge pull request #6809 from garlick/publisher_doc
broker: document/clean up event publishing code
2 parents 5f55d3c + 7173236 commit 3b1ad4c

File tree

2 files changed

+56
-14
lines changed

2 files changed

+56
-14
lines changed

src/broker/broker.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1575,7 +1575,7 @@ static int handle_event (broker_ctx_t *ctx, const flux_msg_t *msg)
15751575
const char *topic;
15761576

15771577
if (flux_msg_get_seq (msg, &seq) < 0
1578-
|| flux_msg_get_topic (msg, &topic) < 0) {
1578+
|| flux_msg_get_topic (msg, &topic) < 0) {
15791579
flux_log (ctx->h, LOG_ERR, "dropping malformed event");
15801580
return -1;
15811581
}

src/broker/publisher.c

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,28 @@
88
* SPDX-License-Identifier: LGPL-3.0
99
\************************************************************/
1010

11-
/* publisher.c - event publishing service on rank 0 */
11+
/* publisher.c - event publishing service on rank 0
12+
*
13+
* There are two methods for publishing an event:
14+
*
15+
* 1) Call one of the following API functions:
16+
* flux_event_publish(3)
17+
* flux_event_publish_pack(3)
18+
* flux_event_publish_raw(3)
19+
* These send an RPC to rank 0 (handled below). The RPC response contains
20+
* the published event's sequence number. The event message payload, if any,
21+
* must be base64-encoded to be included in the RPC payload.
22+
*
23+
* 2) Call one of the following functions
24+
* flux_event_encode(3)
25+
* flux_event_pack(3)
26+
* flux_event_encode_raw(3)
27+
* then
28+
* flux_send(3)
29+
* flux_send_new(3)
30+
* This puts the RFC 3 event message right on the wire without base64 encoding.
31+
* However, it is "fire and forget" from the sender's perspective.
32+
*/
1233

1334
#if HAVE_CONFIG_H
1435
#include "config.h"
@@ -34,9 +55,11 @@ struct publisher {
3455
void *arg;
3556
};
3657

37-
static flux_msg_t *encode_event (const char *topic, int flags,
58+
static flux_msg_t *encode_event (const char *topic,
59+
int flags,
3860
struct flux_msg_cred cred,
39-
uint32_t seq, const char *src)
61+
uint32_t seq,
62+
const char *src)
4063
{
4164
flux_msg_t *msg;
4265
char *dst = NULL;
@@ -81,7 +104,10 @@ static flux_msg_t *encode_event (const char *topic, int flags,
81104
return NULL;
82105
}
83106

84-
/* Broadcast event using all senders.
107+
/* Call the callback registered with publisher_create() to publish 'msg',
108+
* an RFC 3 event message. The broker registers its handle_event() function
109+
* here, which sends the message to downstream overlay peers, local modules,
110+
* and in-broker subscribers.
85111
* Log failure, but don't abort the event at this point.
86112
*/
87113
static void send_event (struct publisher *pub, const flux_msg_t *msg)
@@ -90,6 +116,8 @@ static void send_event (struct publisher *pub, const flux_msg_t *msg)
90116
flux_log_error (pub->ctx->h, "error publishing event message");
91117
}
92118

119+
/* Publish a message from a "publish request" (method 1 above).
120+
*/
93121
static void publish_cb (flux_t *h,
94122
flux_msg_handler_t *mh,
95123
const flux_msg_t *msg,
@@ -103,10 +131,12 @@ static void publish_cb (flux_t *h,
103131
flux_msg_t *event = NULL;
104132
const char *errmsg = NULL;
105133

106-
if (flux_request_unpack (msg, NULL, "{s:s s:i s?s}",
107-
"topic", &topic,
108-
"flags", &flags,
109-
"payload", &payload) < 0)
134+
if (flux_request_unpack (msg,
135+
NULL,
136+
"{s:s s:i s?s}",
137+
"topic", &topic,
138+
"flags", &flags,
139+
"payload", &payload) < 0)
110140
goto error;
111141
if (pub->ctx->rank > 0) {
112142
errno = EPROTO;
@@ -134,6 +164,15 @@ static void publish_cb (flux_t *h,
134164
flux_msg_destroy (event);
135165
}
136166

167+
/* Publish a raw RFC 3 event message (method 2 above).
168+
* This is a bit confusing: this function is called from broker.c, then
169+
* it calls a callback registered from broker.c. In short:
170+
*
171+
* broker_event_sendmsg_new() => publisher_send() =>
172+
* send_event() => handle_event()
173+
*
174+
* The goal was to assign event sequence numbers in one place only.
175+
*/
137176
int publisher_send (struct publisher *pub, const flux_msg_t *msg)
138177
{
139178
flux_msg_t *cpy;
@@ -152,8 +191,10 @@ int publisher_send (struct publisher *pub, const flux_msg_t *msg)
152191
return -1;
153192
}
154193

155-
static void subscribe_cb (flux_t *h, flux_msg_handler_t *mh,
156-
const flux_msg_t *msg, void *arg)
194+
static void subscribe_cb (flux_t *h,
195+
flux_msg_handler_t *mh,
196+
const flux_msg_t *msg,
197+
void *arg)
157198
{
158199
struct publisher *pub = arg;
159200
const char *uuid;
@@ -181,8 +222,10 @@ static void subscribe_cb (flux_t *h, flux_msg_handler_t *mh,
181222
flux_log_error (h, "error responding to subscribe request");
182223
}
183224

184-
static void unsubscribe_cb (flux_t *h, flux_msg_handler_t *mh,
185-
const flux_msg_t *msg, void *arg)
225+
static void unsubscribe_cb (flux_t *h,
226+
flux_msg_handler_t *mh,
227+
const flux_msg_t *msg,
228+
void *arg)
186229
{
187230
struct publisher *pub = arg;
188231
const char *uuid;
@@ -245,7 +288,6 @@ struct publisher *publisher_create (struct broker *ctx,
245288
return pub;
246289
}
247290

248-
249291
/*
250292
* vi:tabstop=4 shiftwidth=4 expandtab
251293
*/

0 commit comments

Comments
 (0)