Skip to content

Commit 7766d9b

Browse files
authored
Merge pull request #6811 from garlick/mcast_lightweight
broker: avoid malloc in event distribution critical path
2 parents 060d6ae + 078ba6a commit 7766d9b

File tree

3 files changed

+50
-12
lines changed

3 files changed

+50
-12
lines changed

src/broker/overlay.c

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "src/common/libutil/monotime.h"
3737
#include "src/common/libutil/errprintf.h"
3838
#include "src/common/librouter/rpc_track.h"
39+
#include "src/common/libflux/message_route.h" // for msg_route_sendto()
3940
#include "ccan/str/str.h"
4041

4142
#include "overlay.h"
@@ -851,17 +852,11 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg)
851852
return rc;
852853
}
853854

854-
/* Push child->uuid onto the message, then pop it off again after sending.
855-
*/
856-
static int overlay_mcast_child_one (struct overlay *ov,
857-
flux_msg_t *msg,
858-
struct child *child)
855+
// callback for msg_route_sendto()
856+
static int overlay_mcast_send (const flux_msg_t *msg, void *arg)
859857
{
860-
if (flux_msg_route_push (msg, child->uuid) < 0)
861-
return -1;
862-
int rc = overlay_sendmsg_child (ov, msg);
863-
(void)flux_msg_route_delete_last (msg);
864-
return rc;
858+
struct overlay *ov = arg;
859+
return overlay_sendmsg_child (ov, msg);
865860
}
866861

867862
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
@@ -873,7 +868,10 @@ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
873868

874869
foreach_overlay_child (ov, child) {
875870
if (subtree_is_online (child->status)) {
876-
if (overlay_mcast_child_one (ov, msg, child) < 0) {
871+
if (msg_route_sendto (msg,
872+
child->uuid,
873+
overlay_mcast_send,
874+
ov) < 0) {
877875
if (errno != EHOSTUNREACH) {
878876
flux_log_error (ov->h,
879877
"mcast error to child rank %lu",

src/common/libflux/message_route.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static struct route_id *route_id_create (const char *id, unsigned int id_len)
3636
struct route_id *r;
3737
if (!(r = calloc (1, sizeof (*r) + id_len + 1)))
3838
return NULL;
39+
r->id = (char *)(r + 1);
3940
if (id && id_len) {
4041
memcpy (r->id, id, id_len);
4142
list_node_init (&(r->route_id_node));
@@ -93,6 +94,36 @@ int msg_route_delete_last (flux_msg_t *msg)
9394
return 0;
9495
}
9596

97+
/* This function was added to streamline a critical path in the broker that
98+
* iteratively sends the same message to multiple peers. It is equivalent to
99+
* flux_msg_route_push (msg, id, strlen (id))
100+
* cb (msg, arg)
101+
* flux_msg_route_delete_last (msg)
102+
* but borrows the copy of id rather than copying it each time.
103+
*/
104+
int msg_route_sendto (const flux_msg_t *cmsg,
105+
const char *id,
106+
msg_route_send_f cb,
107+
void *arg)
108+
{
109+
flux_msg_t *msg = (flux_msg_t *)cmsg; // drop const since net effect is nil
110+
struct route_id r;
111+
int rc;
112+
113+
list_node_init (&r.route_id_node);
114+
r.id = (char *)id;
115+
116+
list_add (&msg->routes, &r.route_id_node);
117+
msg->routes_len++;
118+
119+
rc = cb (msg, arg);
120+
121+
list_pop (&msg->routes, struct route_id, route_id_node);
122+
msg->routes_len--;
123+
124+
return rc;
125+
}
126+
96127
/*
97128
* vi:tabstop=4 shiftwidth=4 expandtab
98129
*/

src/common/libflux/message_route.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
#ifndef _FLUX_CORE_MESSAGE_ROUTE_H
1212
#define _FLUX_CORE_MESSAGE_ROUTE_H
1313

14+
#include "ccan/list/list.h"
15+
1416
struct route_id {
1517
struct list_node route_id_node;
16-
char id[0]; /* variable length id stored at end of struct */
18+
char *id; /* variable length id stored at end of struct */
1719
};
1820

1921
int msg_route_push (flux_msg_t *msg,
@@ -28,6 +30,13 @@ void msg_route_clear (flux_msg_t *msg);
2830

2931
int msg_route_delete_last (flux_msg_t *msg);
3032

33+
typedef int (*msg_route_send_f)(const flux_msg_t *msg, void *arg);
34+
35+
int msg_route_sendto (const flux_msg_t *msg,
36+
const char *id,
37+
msg_route_send_f cb,
38+
void *arg);
39+
3140
#endif /* !_FLUX_CORE_MESSAGE_ROUTE_H */
3241

3342
/*

0 commit comments

Comments
 (0)