Skip to content

Commit 0fbc62c

Browse files
committed
flux-module: add trace subcommand
Problem: there is no tool for observing the messages that a broker module sends and receives. Add 'flux module trace'.
1 parent ff0cfd1 commit 0fbc62c

File tree

1 file changed

+348
-0
lines changed

1 file changed

+348
-0
lines changed

src/cmd/flux-module.c

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111
#if HAVE_CONFIG_H
1212
#include "config.h"
1313
#endif
14+
#include <time.h>
15+
#include <math.h>
1416
#include <stdio.h>
17+
#include <signal.h>
1518
#include <getopt.h>
1619
#include <flux/core.h>
1720
#include <flux/optparse.h>
@@ -31,7 +34,10 @@
3134
#include "src/common/libutil/log.h"
3235
#include "src/common/libutil/oom.h"
3336
#include "src/common/libutil/jpath.h"
37+
#include "src/common/libutil/ansi_color.h"
38+
#include "src/common/libutil/parse_size.h"
3439
#include "ccan/str/str.h"
40+
#include "ccan/array_size/array_size.h"
3541

3642
const int max_idle = 99;
3743

@@ -41,6 +47,7 @@ int cmd_load (optparse_t *p, int argc, char **argv);
4147
int cmd_reload (optparse_t *p, int argc, char **argv);
4248
int cmd_stats (optparse_t *p, int argc, char **argv);
4349
int cmd_debug (optparse_t *p, int argc, char **argv);
50+
int cmd_trace (optparse_t *p, int argc, char **argv);
4451

4552
static struct optparse_option list_opts[] = {
4653
{ .name = "long", .key = 'l', .has_arg = 0,
@@ -105,6 +112,28 @@ static struct optparse_option debug_opts[] = {
105112
OPTPARSE_TABLE_END,
106113
};
107114

115+
static struct optparse_option trace_opts[] = {
116+
{ .name = "topic", .key = 'T', .has_arg = 1,
117+
.arginfo = "GLOB",
118+
.usage = "Filter output by message topic glob",
119+
},
120+
{ .name = "type", .key = 't', .has_arg = 1,
121+
.flags = OPTPARSE_OPT_AUTOSPLIT,
122+
.arginfo = "TYPE,...",
123+
.usage = "Filter output by message type "
124+
"(request, response, event, control)",
125+
},
126+
{ .name = "color", .key = 'L', .has_arg = 2, .arginfo = "WHEN",
127+
.flags = OPTPARSE_OPT_SHORTOPT_OPTIONAL_ARG,
128+
.usage = "Colorize output when supported; WHEN can be 'always' "
129+
"(default if omitted), 'never', or 'auto' (default)." },
130+
{ .name = "human", .key = 'H', .has_arg = 0,
131+
.usage = "Human readable output", },
132+
{ .name = "delta", .key = 'd', .has_arg = 0,
133+
.usage = "With --human, show timestamp delta between messages", },
134+
OPTPARSE_TABLE_END,
135+
};
136+
108137
static struct optparse_subcommand subcommands[] = {
109138
{ "list",
110139
"[OPTIONS]",
@@ -155,6 +184,13 @@ static struct optparse_subcommand subcommands[] = {
155184
0,
156185
debug_opts,
157186
},
187+
{ "trace",
188+
"[OPTIONS] module [module...]",
189+
"Trace module messages",
190+
cmd_trace,
191+
0,
192+
trace_opts,
193+
},
158194
OPTPARSE_SUBCMD_END
159195
};
160196

@@ -691,6 +727,318 @@ int cmd_debug (optparse_t *p, int argc, char **argv)
691727
return (0);
692728
}
693729

730+
struct typemap {
731+
const char *s;
732+
int type;
733+
};
734+
735+
static struct typemap typemap[] = {
736+
{ ">", FLUX_MSGTYPE_REQUEST },
737+
{ "<", FLUX_MSGTYPE_RESPONSE},
738+
{ "e", FLUX_MSGTYPE_EVENT},
739+
{ "c", FLUX_MSGTYPE_CONTROL},
740+
};
741+
742+
static const char *type2str (int type)
743+
{
744+
int i;
745+
746+
for (i = 0; i < ARRAY_SIZE (typemap); i++)
747+
if ((type & typemap[i].type))
748+
return typemap[i].s;
749+
return "?";
750+
}
751+
752+
enum {
753+
TRACE_COLOR_EVENT = FLUX_MSGTYPE_EVENT,
754+
TRACE_COLOR_REQUEST = FLUX_MSGTYPE_REQUEST,
755+
TRACE_COLOR_RESPONSE = FLUX_MSGTYPE_RESPONSE,
756+
TRACE_COLOR_CONTROL = FLUX_MSGTYPE_CONTROL,
757+
TRACE_COLOR_TIME = 0x10,
758+
TRACE_COLOR_TIMEBREAK = 0x11,
759+
};
760+
761+
static const char *trace_colors[] = {
762+
[TRACE_COLOR_EVENT] = ANSI_COLOR_YELLOW,
763+
[TRACE_COLOR_REQUEST] = ANSI_COLOR_BOLD_BLUE,
764+
[TRACE_COLOR_RESPONSE] = ANSI_COLOR_CYAN,
765+
[TRACE_COLOR_CONTROL] = ANSI_COLOR_BOLD,
766+
[TRACE_COLOR_TIME] = ANSI_COLOR_GREEN,
767+
[TRACE_COLOR_TIMEBREAK] = ANSI_COLOR_BOLD ANSI_COLOR_GREEN
768+
};
769+
770+
static const char *months[] = {
771+
"Jan",
772+
"Feb",
773+
"Mar",
774+
"Apr",
775+
"May",
776+
"Jun",
777+
"Jul",
778+
"Aug",
779+
"Sep",
780+
"Oct",
781+
"Nov",
782+
"Dec",
783+
NULL
784+
};
785+
786+
struct trace_ctx {
787+
int color;
788+
json_t *names;
789+
int max_name_len;
790+
struct flux_match match;
791+
int delta;
792+
double last_sec;
793+
double last_timestamp;
794+
};
795+
796+
static const char *trace_color (struct trace_ctx *ctx, int type)
797+
{
798+
if (ctx->color)
799+
return trace_colors[type];
800+
return "";
801+
}
802+
803+
static const char *trace_color_reset (struct trace_ctx *ctx)
804+
{
805+
if (ctx->color)
806+
return ANSI_COLOR_RESET;
807+
return "";
808+
}
809+
810+
static void trace_print_human_timestamp (struct trace_ctx *ctx,
811+
double timestamp)
812+
{
813+
814+
if ((time_t)(timestamp/60) == (time_t)(ctx->last_timestamp/60)) {
815+
/* Within same minute, print offset in sec */
816+
printf ("%s[%+11.6f]%s ",
817+
trace_color (ctx, TRACE_COLOR_TIME),
818+
timestamp - ctx->last_timestamp
819+
+ fmod (ctx->last_timestamp, 60.),
820+
trace_color_reset (ctx));
821+
}
822+
else {
823+
struct tm tm;
824+
time_t t = timestamp;
825+
826+
localtime_r (&t, &tm);
827+
828+
/* New minute, print datetime */
829+
printf ("%s[%s%02d %02d:%02d]%s ",
830+
trace_color (ctx, TRACE_COLOR_TIMEBREAK),
831+
months [tm.tm_mon],
832+
tm.tm_mday,
833+
tm.tm_hour,
834+
tm.tm_min,
835+
trace_color_reset (ctx));
836+
ctx->last_timestamp = timestamp;
837+
}
838+
}
839+
840+
static void trace_print_human (struct trace_ctx *ctx,
841+
double timestamp,
842+
int message_type,
843+
const char *s)
844+
{
845+
trace_print_human_timestamp (ctx, timestamp);
846+
printf (" %s%s%s\n",
847+
trace_color (ctx, message_type),
848+
s,
849+
trace_color_reset (ctx));
850+
}
851+
852+
static void trace_print_timestamp (struct trace_ctx *ctx, double timestamp)
853+
{
854+
time_t t = timestamp;
855+
struct tm tm;
856+
char buf[64] = "";
857+
858+
localtime_r (&t, &tm);
859+
860+
strftime (buf, sizeof (buf), "%Y-%m-%dT%T", &tm);
861+
printf ("%s%s.%.3d%s",
862+
trace_color (ctx, TRACE_COLOR_TIME),
863+
buf,
864+
(int)(1e3 * (timestamp - t)),
865+
trace_color_reset (ctx));
866+
}
867+
868+
869+
static void trace_print (struct trace_ctx *ctx,
870+
double timestamp,
871+
int message_type,
872+
const char *s)
873+
{
874+
trace_print_timestamp (ctx, timestamp);
875+
printf (" %s%s%s\n",
876+
trace_color (ctx, message_type),
877+
s,
878+
trace_color_reset (ctx));
879+
}
880+
881+
static int trace_use_color (optparse_t *p)
882+
{
883+
const char *when;
884+
int color;
885+
886+
if (!(when = optparse_get_str (p, "color", "auto")))
887+
when = "always";
888+
if (streq (when, "always"))
889+
color = 1;
890+
else if (streq (when, "never"))
891+
color = 0;
892+
else if (streq (when, "auto"))
893+
color = isatty (STDOUT_FILENO) ? 1 : 0;
894+
else
895+
log_msg_exit ("Invalid argument to --color: '%s'", when);
896+
return color;
897+
}
898+
899+
static int parse_name_list (json_t **result, int ac, char **av)
900+
{
901+
json_t *a;
902+
int maxlen = 0;
903+
904+
if (!(a = json_array ()))
905+
goto nomem;
906+
for (int i = 0; i < ac; i++) {
907+
json_t *o;
908+
if (!(o = json_string (av[i]))
909+
|| json_array_append_new (a, o) < 0) {
910+
json_decref (o);
911+
goto nomem;
912+
}
913+
size_t len = strlen (av[i]);
914+
if (maxlen < len)
915+
maxlen = len;
916+
}
917+
*result = a;
918+
return maxlen;
919+
nomem:
920+
json_decref (a);
921+
errno = ENOMEM;
922+
return -1;
923+
}
924+
925+
static void trace_ctx_init (struct trace_ctx *ctx,
926+
optparse_t *p,
927+
int ac,
928+
char *av[])
929+
{
930+
int n = optparse_option_index (p);
931+
932+
memset (ctx, 0, sizeof (*ctx));
933+
934+
if (n == ac) {
935+
optparse_print_usage (p);
936+
exit (1);
937+
}
938+
ctx->max_name_len = parse_name_list (&ctx->names, ac - n, av + n);
939+
if (ctx->max_name_len < 0)
940+
log_err_exit ("could not parse module name list");
941+
if (optparse_hasopt (p, "delta")) {
942+
if (!optparse_hasopt (p, "human"))
943+
log_msg_exit ("--delta can only be used with --human");
944+
ctx->delta = 1;
945+
}
946+
ctx->match = FLUX_MATCH_ANY;
947+
ctx->match.topic_glob = optparse_get_str (p, "topic", "*");
948+
ctx->color = trace_use_color (p); // borrowed from status subcommand
949+
if (optparse_hasopt (p, "type")) {
950+
const char *arg;
951+
952+
optparse_getopt_iterator_reset (p, "type");
953+
ctx->match.typemask = 0;
954+
while ((arg = optparse_getopt_next (p, "type"))) {
955+
if (streq (arg, "request"))
956+
ctx->match.typemask |= FLUX_MSGTYPE_REQUEST;
957+
else if (streq (arg, "response"))
958+
ctx->match.typemask |= FLUX_MSGTYPE_RESPONSE;
959+
else if (streq (arg, "event"))
960+
ctx->match.typemask |= FLUX_MSGTYPE_EVENT;
961+
else if (streq (arg, "control"))
962+
ctx->match.typemask |= FLUX_MSGTYPE_CONTROL;
963+
else
964+
log_msg_exit ("valid types: request, response, event, control");
965+
}
966+
}
967+
}
968+
969+
static void sighandler (int arg)
970+
{
971+
exit (0);
972+
}
973+
974+
int cmd_trace (optparse_t *p, int ac, char *av[])
975+
{
976+
flux_t *h;
977+
flux_future_t *f;
978+
struct trace_ctx ctx;
979+
980+
trace_ctx_init (&ctx, p, ac, av);
981+
982+
if (!(h = flux_open (NULL, 0)))
983+
log_err_exit ("flux_open");
984+
985+
if (!(f = flux_rpc_pack (h,
986+
"module.trace",
987+
FLUX_NODEID_ANY,
988+
FLUX_RPC_STREAMING,
989+
"{s:i s:s s:O}",
990+
"typemask", ctx.match.typemask,
991+
"topic_glob", ctx.match.topic_glob,
992+
"names", ctx.names)))
993+
log_err_exit ("error sending module.trace request");
994+
995+
signal (SIGINT, sighandler);
996+
signal (SIGTERM, sighandler);
997+
998+
do {
999+
double timestamp;
1000+
const char *prefix;
1001+
const char *name;
1002+
int type;
1003+
const char *topic;
1004+
int payload_size;
1005+
char buf[160];
1006+
1007+
if (flux_rpc_get_unpack (f,
1008+
"{s:F s:s s:s s:i s:s s:i}",
1009+
"timestamp", &timestamp,
1010+
"prefix", &prefix,
1011+
"name", &name,
1012+
"type", &type,
1013+
"topic", &topic,
1014+
"payload_size", &payload_size) < 0)
1015+
log_msg_exit ("%s", future_strerror (f, errno));
1016+
1017+
snprintf (buf,
1018+
sizeof (buf),
1019+
"%*s %s %s %s [%s]",
1020+
ctx.max_name_len,
1021+
name,
1022+
prefix,
1023+
type2str (type),
1024+
topic,
1025+
encode_size (payload_size));
1026+
1027+
if (optparse_hasopt (p, "human"))
1028+
trace_print_human (&ctx, timestamp, type, buf);
1029+
else
1030+
trace_print (&ctx, timestamp, type, buf);
1031+
fflush (stdout);
1032+
1033+
flux_future_reset (f);
1034+
} while (1);
1035+
1036+
json_decref (ctx.names);
1037+
flux_future_destroy (f);
1038+
flux_close (h);
1039+
1040+
}
1041+
6941042
/*
6951043
* vi: ts=4 sw=4 expandtab
6961044
*/

0 commit comments

Comments
 (0)