Skip to content

Commit ff0cfd1

Browse files
committed
broker: add module.trace
broker: add module.trace streaming RPC Problem: there is no way to observe the messages a broker module sending and receving. Add the module.trace streaming RPC which can send a stream of message summaries to a tool.
1 parent 6714d8c commit ff0cfd1

File tree

3 files changed

+158
-2
lines changed

3 files changed

+158
-2
lines changed

src/broker/modhash.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,62 @@ static void debug_cb (flux_t *h,
454454
flux_log_error (h, "error responding to module.debug request");
455455
}
456456

457+
static void trace_cb (flux_t *h,
458+
flux_msg_handler_t *mh,
459+
const flux_msg_t *msg,
460+
void *arg)
461+
{
462+
broker_ctx_t *ctx = arg;
463+
struct flux_match match = FLUX_MATCH_ANY;
464+
json_t *names = NULL;
465+
size_t index;
466+
json_t *entry;
467+
const char *errmsg = NULL;
468+
flux_error_t error;
469+
zlist_t *l = NULL;
470+
module_t *p;
471+
472+
if (flux_request_unpack (msg,
473+
NULL,
474+
"{s:o s:i s:s}",
475+
"names", &names,
476+
"typemask", &match.typemask,
477+
"topic_glob", &match.topic_glob) < 0)
478+
goto error;
479+
if (!flux_msg_is_streaming (msg) || !json_is_array (names)) {
480+
errno = EPROTO;
481+
goto error;
482+
}
483+
/* Put modules in a list as the names are checked,
484+
*/
485+
if (!(l = zlist_new ()))
486+
goto nomem;
487+
json_array_foreach (names, index, entry) {
488+
const char *name = json_string_value (entry);
489+
if (!(p = modhash_lookup_byname (ctx->modhash, (name)))) {
490+
errprintf (&error, "%s module is not loaded", name);
491+
errmsg = error.text;
492+
errno = ENOENT;
493+
goto error;
494+
}
495+
if (zlist_append (l, p) < 0)
496+
goto nomem;
497+
}
498+
p = zlist_first (l);
499+
while (p) {
500+
(void)module_trace (p, msg);
501+
p = zlist_next (l);
502+
}
503+
zlist_destroy (&l);
504+
return;
505+
nomem:
506+
errno = ENOMEM;
507+
error:
508+
if (flux_respond_error (h, msg, errno, errmsg) < 0)
509+
flux_log_error (h, "error responding to module.trace");
510+
zlist_destroy (&l);
511+
}
512+
457513
static void status_cb (flux_t *h,
458514
flux_msg_handler_t *mh,
459515
const flux_msg_t *msg,
@@ -509,6 +565,14 @@ static void disconnect_cb (flux_t *h,
509565
const flux_msg_t *msg,
510566
void *arg)
511567
{
568+
broker_ctx_t *ctx = arg;
569+
module_t *p;
570+
571+
p = zhash_first (ctx->modhash->zh_byuuid);
572+
while (p) {
573+
module_trace_disconnect (p, msg);
574+
p = zhash_next (ctx->modhash->zh_byuuid);
575+
}
512576
}
513577

514578
static const struct flux_msg_handler_spec htab[] = {
@@ -542,6 +606,12 @@ static const struct flux_msg_handler_spec htab[] = {
542606
debug_cb,
543607
0,
544608
},
609+
{
610+
FLUX_MSGTYPE_REQUEST,
611+
"module.trace",
612+
trace_cb,
613+
0,
614+
},
545615
{
546616
FLUX_MSGTYPE_REQUEST,
547617
"module.disconnect",
@@ -662,6 +732,8 @@ module_t *modhash_lookup_byname (modhash_t *mh, const char *name)
662732
char *uuid;
663733
module_t *result = NULL;
664734

735+
if (!name)
736+
return NULL;
665737
if (!(uuids = zhash_keys (mh->zh_byuuid))) {
666738
errno = ENOMEM;
667739
return NULL;

src/broker/module.c

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include "modservice.h"
4646

4747
struct broker_module {
48+
flux_t *h; /* ref to broker's internal flux_t handle */
49+
4850
flux_watcher_t *broker_w;
4951

5052
double lastseen;
@@ -78,6 +80,7 @@ struct broker_module {
7880

7981
struct flux_msglist *rmmod_requests;
8082
struct flux_msglist *insmod_requests;
83+
struct flux_msglist *trace_requests;
8184
struct flux_msglist *deferred_messages;
8285

8386
flux_t *h_module_end; /* module end of interthread_channel */
@@ -332,6 +335,7 @@ module_t *module_create (flux_t *h,
332335
p->main = mod_main;
333336
p->dso = dso;
334337
p->rank = rank;
338+
p->h = h;
335339
if (!(p->conf = flux_conf_copy (flux_get_conf (h))))
336340
goto cleanup;
337341
if (!(p->parent_uuid_str = strdup (parent_uuid)))
@@ -349,7 +353,8 @@ module_t *module_create (flux_t *h,
349353
}
350354
if (!(p->path = strdup (path))
351355
|| !(p->rmmod_requests = flux_msglist_create ())
352-
|| !(p->insmod_requests = flux_msglist_create ()))
356+
|| !(p->insmod_requests = flux_msglist_create ())
357+
|| !(p->trace_requests = flux_msglist_create ()))
353358
goto nomem;
354359
if (name) {
355360
if (!(p->name = strdup (name)))
@@ -437,9 +442,70 @@ int module_get_status (module_t *p)
437442
return p ? p->status : 0;
438443
}
439444

445+
static void message_trace (module_t *p,
446+
const char *prefix,
447+
const flux_msg_t *msg)
448+
{
449+
const flux_msg_t *req;
450+
double now = flux_reactor_now (flux_get_reactor (p->h));
451+
int type = 0;
452+
char buf[64];
453+
const char *topic = NULL;
454+
int payload_size = 0;
455+
456+
(void)flux_msg_get_type (msg, &type);
457+
if (type == FLUX_MSGTYPE_CONTROL) {
458+
int ctype;
459+
int cstatus;
460+
if (flux_control_decode (msg, &ctype, &cstatus) == 0)
461+
snprintf (buf,
462+
sizeof (buf),
463+
"%s %d",
464+
ctype == FLUX_MODSTATE_INIT ? "init" :
465+
ctype == FLUX_MODSTATE_RUNNING ? "running" :
466+
ctype == FLUX_MODSTATE_FINALIZING ? "finalizing" :
467+
ctype == FLUX_MODSTATE_EXITED ? "exited" : "unknown",
468+
cstatus);
469+
}
470+
else {
471+
(void)flux_msg_get_topic (msg, &topic);
472+
(void)flux_msg_get_payload (msg, NULL, &payload_size);
473+
if (topic && streq (topic, "module.trace"))
474+
return;
475+
}
476+
477+
req = flux_msglist_first (p->trace_requests);
478+
while (req) {
479+
struct flux_match match = FLUX_MATCH_ANY;
480+
if (flux_request_unpack (req,
481+
NULL,
482+
"{s:i s:s}",
483+
"typemask", &match.typemask,
484+
"topic_glob", &match.topic_glob) < 0
485+
|| !flux_msg_cmp (msg, match))
486+
goto next;
487+
if (flux_respond_pack (p->h,
488+
req,
489+
"{s:f s:s s:i s:s s:s s:i}",
490+
"timestamp", now,
491+
"prefix", prefix,
492+
"type", type,
493+
"name", p->name,
494+
"topic", topic ? topic : "NO-TOPIC",
495+
"payload_size", payload_size) < 0)
496+
flux_log_error (p->h, "error responding to module.trace");
497+
next:
498+
req = flux_msglist_next (p->trace_requests);
499+
}
500+
}
501+
440502
flux_msg_t *module_recvmsg (module_t *p)
441503
{
442-
return flux_recv (p->h_broker_end, FLUX_MATCH_ANY, FLUX_O_NONBLOCK);
504+
flux_msg_t *msg;
505+
msg = flux_recv (p->h_broker_end, FLUX_MATCH_ANY, FLUX_O_NONBLOCK);
506+
if (msg && flux_msglist_count (p->trace_requests) > 0)
507+
message_trace (p, "tx", msg);
508+
return msg;
443509
}
444510

445511
int module_sendmsg_new (module_t *p, flux_msg_t **msg)
@@ -468,6 +534,8 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg)
468534
*msg = NULL;
469535
return 0;
470536
}
537+
if (flux_msglist_count (p->trace_requests) > 0)
538+
message_trace (p, "rx", *msg);
471539
return flux_send_new (p->h_broker_end, msg, 0);
472540
}
473541

@@ -528,6 +596,7 @@ void module_destroy (module_t *p)
528596
flux_msglist_destroy (p->rmmod_requests);
529597
flux_msglist_destroy (p->insmod_requests);
530598
flux_msglist_destroy (p->deferred_messages);
599+
flux_msglist_destroy (p->trace_requests);
531600
subhash_destroy (p->sub);
532601
free (p);
533602
errno = saved_errno;
@@ -715,6 +784,18 @@ ssize_t module_get_recv_queue_count (module_t *p)
715784
return count;
716785
}
717786

787+
int module_trace (module_t *p, const flux_msg_t *msg)
788+
{
789+
if (flux_msglist_append (p->trace_requests, msg) < 0)
790+
return -1;
791+
return 0;
792+
}
793+
794+
void module_trace_disconnect (module_t *p, const flux_msg_t *msg)
795+
{
796+
(void)flux_msglist_disconnect (p->trace_requests, msg);
797+
}
798+
718799
/*
719800
* vi:tabstop=4 shiftwidth=4 expandtab
720801
*/

src/broker/module.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ int module_event_cast (module_t *p, const flux_msg_t *msg);
9999
ssize_t module_get_send_queue_count (module_t *p);
100100
ssize_t module_get_recv_queue_count (module_t *p);
101101

102+
int module_trace (module_t *p, const flux_msg_t *msg);
103+
void module_trace_disconnect (module_t *p, const flux_msg_t *msg);
104+
102105
#endif /* !_BROKER_MODULE_H */
103106

104107
/*

0 commit comments

Comments
 (0)