Skip to content

Commit d87899b

Browse files
committed
broker: add overlay.trace streaming RPC
Problem: there is no way to observe the message the broker is sending and receving on the overlay network. Add the overlay.trace streaming RPC which can send a stream of message summaries to a tool.
1 parent d2ae7ec commit d87899b

File tree

1 file changed

+141
-3
lines changed

1 file changed

+141
-3
lines changed

src/broker/overlay.c

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ struct overlay {
191191
void *recv_arg;
192192

193193
struct flux_msglist *health_requests;
194+
struct flux_msglist *trace_requests;
194195
};
195196

196197
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg);
@@ -203,6 +204,10 @@ static int overlay_control_parent (struct overlay *ov,
203204
int status);
204205
static void overlay_health_respond_all (struct overlay *ov);
205206
static struct child *child_lookup_byrank (struct overlay *ov, uint32_t rank);
207+
static void message_trace (struct overlay *ov,
208+
const char *prefix,
209+
int rank,
210+
const flux_msg_t *msg);
206211

207212
/* Convenience iterator for ov->children
208213
*/
@@ -517,8 +522,11 @@ static int overlay_sendmsg_parent (struct overlay *ov, const flux_msg_t *msg)
517522
goto done;
518523
}
519524
rc = zmqutil_msg_send (ov->parent.zsock, msg);
520-
if (rc == 0)
525+
if (rc == 0) {
521526
ov->parent.lastsent = flux_reactor_now (ov->reactor);
527+
if (flux_msglist_count (ov->trace_requests) > 0)
528+
message_trace (ov, "tx", ov->parent.rank, msg);
529+
}
522530
done:
523531
return rc;
524532
}
@@ -811,6 +819,22 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg)
811819
}
812820
errno = saved_errno;
813821
}
822+
if (rc == 0 && flux_msglist_count (ov->trace_requests) > 0) {
823+
const char *uuid;
824+
struct child *child = NULL;
825+
int rank = -1;
826+
int type = 0;
827+
828+
(void)flux_msg_get_type (msg, &type);
829+
830+
// N.B. events are traced in overlay_mcast_child()
831+
if (type != FLUX_MSGTYPE_EVENT) {
832+
if ((uuid = flux_msg_route_last (msg))
833+
&& (child = child_lookup_online (ov, uuid)))
834+
rank = child->rank;
835+
message_trace (ov, "tx", rank, msg);
836+
}
837+
}
814838
done:
815839
return rc;
816840
}
@@ -831,6 +855,7 @@ static int overlay_mcast_child_one (struct overlay *ov,
831855
static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
832856
{
833857
struct child *child;
858+
int count = 0;
834859

835860
flux_msg_route_enable (msg);
836861

@@ -843,8 +868,12 @@ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
843868
(unsigned long)child->rank);
844869
}
845870
}
871+
else
872+
count++;
846873
}
847874
}
875+
if (count > 0 && flux_msglist_count (ov->trace_requests) > 0)
876+
message_trace (ov, "tx", -1, msg);
848877
}
849878

850879
static void logdrop (struct overlay *ov,
@@ -891,6 +920,67 @@ static int clear_msg_role (flux_msg_t *msg, uint32_t role)
891920
return 0;
892921
}
893922

923+
static void message_trace (struct overlay *ov,
924+
const char *prefix,
925+
int rank,
926+
const flux_msg_t *msg)
927+
{
928+
const flux_msg_t *req;
929+
double now = flux_reactor_now (ov->reactor);
930+
int type = 0;
931+
char buf[64];
932+
const char *topic = NULL;
933+
int payload_size = 0;
934+
935+
(void)flux_msg_get_type (msg, &type);
936+
if (type == FLUX_MSGTYPE_CONTROL) {
937+
int ctype;
938+
int cstatus;
939+
if (flux_control_decode (msg, &ctype, &cstatus) == 0) {
940+
snprintf (buf,
941+
sizeof (buf),
942+
"%s %d",
943+
ctype == CONTROL_HEARTBEAT ? "heartbeat" :
944+
ctype == CONTROL_STATUS ? "status" :
945+
ctype == CONTROL_DISCONNECT ? "disconnect" : "unknown",
946+
cstatus);
947+
topic = buf;
948+
}
949+
}
950+
else {
951+
(void)flux_msg_get_topic (msg, &topic);
952+
(void)flux_msg_get_payload (msg, NULL, &payload_size);
953+
}
954+
955+
req = flux_msglist_first (ov->trace_requests);
956+
while (req) {
957+
struct flux_match match = FLUX_MATCH_ANY;
958+
int nodeid;
959+
if (flux_request_unpack (req,
960+
NULL,
961+
"{s:i s:s s:i}",
962+
"typemask", &match.typemask,
963+
"topic_glob", &match.topic_glob,
964+
"nodeid", &nodeid) < 0
965+
|| (nodeid != FLUX_NODEID_ANY && nodeid != rank)
966+
|| !flux_msg_cmp (msg, match))
967+
goto next;
968+
969+
if (flux_respond_pack (ov->h,
970+
req,
971+
"{s:f s:s s:i s:i s:s s:i}",
972+
"timestamp", now,
973+
"prefix", prefix,
974+
"rank", rank,
975+
"type", type,
976+
"topic", topic ? topic : "NO-TOPIC",
977+
"payload_size", payload_size) < 0)
978+
flux_log_error (ov->h, "error responding to overlay.trace");
979+
next:
980+
req = flux_msglist_next (ov->trace_requests);
981+
}
982+
}
983+
894984
/* Handle a message received from TBON child (downstream).
895985
*/
896986
static void child_cb (flux_reactor_t *r,
@@ -929,6 +1019,7 @@ static void child_cb (flux_reactor_t *r,
9291019
&& flux_msg_get_topic (msg, &topic) == 0
9301020
&& streq (topic, "overlay.hello")
9311021
&& !ov->shutdown_in_progress) {
1022+
message_trace (ov, "rx", -1, msg);
9321023
hello_request_handler (ov, msg);
9331024
}
9341025
/* Or one of the following cases occurred that requires (or at least
@@ -956,8 +1047,10 @@ static void child_cb (flux_reactor_t *r,
9561047
case FLUX_MSGTYPE_CONTROL: {
9571048
int type, status;
9581049
if (flux_control_decode (msg, &type, &status) == 0
959-
&& type == CONTROL_STATUS)
1050+
&& type == CONTROL_STATUS) {
1051+
message_trace (ov, "rx", child->rank, msg);
9601052
overlay_child_status_update (ov, child, status, NULL);
1053+
}
9611054
goto done;
9621055
}
9631056
case FLUX_MSGTYPE_REQUEST:
@@ -975,6 +1068,8 @@ static void child_cb (flux_reactor_t *r,
9751068
case FLUX_MSGTYPE_EVENT:
9761069
break;
9771070
}
1071+
if (flux_msglist_count (ov->trace_requests) > 0)
1072+
message_trace (ov, "rx", child->rank, msg);
9781073
if (ov->recv_cb (&msg, OVERLAY_DOWNSTREAM, ov->recv_arg) < 0)
9791074
goto done;
9801075
return;
@@ -1064,6 +1159,9 @@ static void parent_cb (flux_reactor_t *r,
10641159
default:
10651160
break;
10661161
}
1162+
if (flux_msglist_count (ov->trace_requests) > 0) {
1163+
message_trace (ov, "rx", ov->parent.rank, msg);
1164+
}
10671165
if (ov->recv_cb (&msg, OVERLAY_UPSTREAM, ov->recv_arg) < 0)
10681166
goto done;
10691167
return;
@@ -1731,6 +1829,10 @@ static void disconnect_cb (flux_t *h,
17311829
flux_log_error (h, "error handling overlay.disconnect");
17321830
if (count > 0)
17331831
flux_log (h, LOG_DEBUG, "overlay: goodbye to %d health clients", count);
1832+
if ((count = flux_msglist_disconnect (ov->trace_requests, msg)) < 0)
1833+
flux_log_error (h, "error handling overlay.disconnect");
1834+
if (count > 0)
1835+
flux_log (h, LOG_DEBUG, "overlay: goodbye to %d trace clients", count);
17341836
}
17351837

17361838
const char *overlay_get_subtree_status (struct overlay *ov, int rank)
@@ -1851,6 +1953,34 @@ static void overlay_disconnect_subtree_cb (flux_t *h,
18511953
flux_log_error (h, "error responding to overlay.disconnect-subtree");
18521954
}
18531955

1956+
static void overlay_trace_cb (flux_t *h,
1957+
flux_msg_handler_t *mh,
1958+
const flux_msg_t *msg,
1959+
void *arg)
1960+
{
1961+
struct overlay *ov = arg;
1962+
struct flux_match match = FLUX_MATCH_ANY;
1963+
int nodeid;
1964+
1965+
if (flux_request_unpack (msg,
1966+
NULL,
1967+
"{s:i s:s s:i}",
1968+
"typemask", &match.typemask,
1969+
"topic_glob", &match.topic_glob,
1970+
"nodeid", &nodeid) < 0)
1971+
goto error;
1972+
if (!flux_msg_is_streaming (msg)) {
1973+
errno = EPROTO;
1974+
goto error;
1975+
}
1976+
if (flux_msglist_append (ov->trace_requests, msg) < 0)
1977+
goto error;
1978+
return;
1979+
error:
1980+
if (flux_respond_error (h, msg, errno, NULL) < 0)
1981+
flux_log_error (h, "error responding to overlay.trace");
1982+
}
1983+
18541984
int overlay_cert_load (struct overlay *ov, const char *path)
18551985
{
18561986
struct stat sb;
@@ -2231,6 +2361,7 @@ void overlay_destroy (struct overlay *ov)
22312361
free (mon);
22322362
zlist_destroy (&ov->monitor_callbacks);
22332363
}
2364+
flux_msglist_destroy (ov->trace_requests);
22342365
topology_decref (ov->topo);
22352366
if (!ov->zctx_external)
22362367
zmq_ctx_term (ov->zctx);
@@ -2240,6 +2371,12 @@ void overlay_destroy (struct overlay *ov)
22402371
}
22412372

22422373
static const struct flux_msg_handler_spec htab[] = {
2374+
{
2375+
FLUX_MSGTYPE_REQUEST,
2376+
"overlay.trace",
2377+
overlay_trace_cb,
2378+
0,
2379+
},
22432380
{
22442381
FLUX_MSGTYPE_REQUEST,
22452382
"overlay.stats-get",
@@ -2358,7 +2495,8 @@ struct overlay *overlay_create (flux_t *h,
23582495
goto error;
23592496
if (!(ov->cert = cert_create ()))
23602497
goto nomem;
2361-
if (!(ov->health_requests = flux_msglist_create ()))
2498+
if (!(ov->health_requests = flux_msglist_create ())
2499+
|| !(ov->trace_requests = flux_msglist_create ()))
23622500
goto error;
23632501
if (!(ov->parent.f_goodbye = flux_future_create (NULL, NULL)))
23642502
goto error;

0 commit comments

Comments
 (0)