Skip to content

Commit 71fc0d5

Browse files
committed
libflux: add msg_route_sendto()
Problem: iteratively sending the same message to multiple peers, as occurs in the broker, is not as lightweight as it could be. Moreover, this code path has shown up in a hot spot in #6806. Add a private helper function to libflux that avoids a malloc at each iteration.
1 parent 060d6ae commit 71fc0d5

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

src/common/libflux/message_route.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static struct route_id *route_id_create (const char *id, unsigned int id_len)
3636
struct route_id *r;
3737
if (!(r = calloc (1, sizeof (*r) + id_len + 1)))
3838
return NULL;
39+
r->id = (char *)(r + 1);
3940
if (id && id_len) {
4041
memcpy (r->id, id, id_len);
4142
list_node_init (&(r->route_id_node));
@@ -93,6 +94,36 @@ int msg_route_delete_last (flux_msg_t *msg)
9394
return 0;
9495
}
9596

97+
/* This function was added to streamline a critical path in the broker that
98+
* iteratively sends the same message to multiple peers. It is equivalent to
99+
* flux_msg_route_push (msg, id, strlen (id))
100+
* cb (msg, arg)
101+
* flux_msg_route_delete_last (msg)
102+
* but borrows the copy of id rather than copying it each time.
103+
*/
104+
int msg_route_sendto (const flux_msg_t *cmsg,
105+
const char *id,
106+
msg_route_send_f cb,
107+
void *arg)
108+
{
109+
flux_msg_t *msg = (flux_msg_t *)cmsg; // drop const since net effect is nil
110+
struct route_id r;
111+
int rc;
112+
113+
list_node_init (&r.route_id_node);
114+
r.id = (char *)id;
115+
116+
list_add (&msg->routes, &r.route_id_node);
117+
msg->routes_len++;
118+
119+
rc = cb (msg, arg);
120+
121+
list_pop (&msg->routes, struct route_id, route_id_node);
122+
msg->routes_len--;
123+
124+
return rc;
125+
}
126+
96127
/*
97128
* vi:tabstop=4 shiftwidth=4 expandtab
98129
*/

src/common/libflux/message_route.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
#ifndef _FLUX_CORE_MESSAGE_ROUTE_H
1212
#define _FLUX_CORE_MESSAGE_ROUTE_H
1313

14+
#include "ccan/list/list.h"
15+
1416
struct route_id {
1517
struct list_node route_id_node;
16-
char id[0]; /* variable length id stored at end of struct */
18+
char *id; /* variable length id stored at end of struct */
1719
};
1820

1921
int msg_route_push (flux_msg_t *msg,
@@ -28,6 +30,13 @@ void msg_route_clear (flux_msg_t *msg);
2830

2931
int msg_route_delete_last (flux_msg_t *msg);
3032

33+
typedef int (*msg_route_send_f)(const flux_msg_t *msg, void *arg);
34+
35+
int msg_route_sendto (const flux_msg_t *msg,
36+
const char *id,
37+
msg_route_send_f cb,
38+
void *arg);
39+
3140
#endif /* !_FLUX_CORE_MESSAGE_ROUTE_H */
3241

3342
/*

0 commit comments

Comments
 (0)