1212# include <config.h>
1313#endif
1414#include <unistd.h>
15+ #include <time.h>
16+ #include <math.h>
1517#include <jansson.h>
1618#include <flux/core.h>
1719
2325#include "src/common/librlist/rlist.h"
2426#include "src/common/libutil/errprintf.h"
2527#include "src/common/libutil/ansi_color.h"
28+ #include "src/common/libutil/parse_size.h"
2629#include "src/common/libczmqcontainers/czmq_containers.h"
2730#include "ccan/str/str.h"
31+ #include "ccan/array_size/array_size.h"
2832
2933#include "builtin.h"
3034
@@ -85,6 +89,27 @@ static struct optparse_option disconnect_opts[] = {
8589 OPTPARSE_TABLE_END
8690};
8791
92+ static struct optparse_option trace_opts [] = {
93+ { .name = "rank" , .key = 'r' , .has_arg = 1 , .arginfo = "NODEID" ,
94+ .usage = "Filter output by peer rank" ,
95+ },
96+ { .name = "type" , .key = 't' , .has_arg = 1 ,
97+ .flags = OPTPARSE_OPT_AUTOSPLIT ,
98+ .arginfo = "TYPE,..." ,
99+ .usage = "Filter output by message type "
100+ "(request, response, event, control)" ,
101+ },
102+ { .name = "color" , .key = 'L' , .has_arg = 2 , .arginfo = "WHEN" ,
103+ .flags = OPTPARSE_OPT_SHORTOPT_OPTIONAL_ARG ,
104+ .usage = "Colorize output when supported; WHEN can be 'always' "
105+ "(default if omitted), 'never', or 'auto' (default)." },
106+ { .name = "human" , .key = 'H' , .has_arg = 0 ,
107+ .usage = "Human readable output" , },
108+ { .name = "delta" , .key = 'd' , .has_arg = 0 ,
109+ .usage = "With --human, show timestamp delta between messages" , },
110+ OPTPARSE_TABLE_END ,
111+ };
112+
88113struct status {
89114 flux_t * h ;
90115 int verbose ;
@@ -115,6 +140,15 @@ struct status_node {
115140 flux_error_t error ;
116141};
117142
143+ struct trace_ctx {
144+ int color ;
145+ int nodeid ;
146+ struct flux_match match ;
147+ int delta ;
148+ double last_sec ;
149+ double last_timestamp ;
150+ };
151+
118152static json_t * overlay_topology ;
119153static struct hostlist * overlay_hostmap ;
120154
@@ -669,6 +703,7 @@ static bool validate_wait (const char *wait)
669703 return true;
670704}
671705
706+ // N.B. used by both status and trace subcommands
672707static int status_use_color (optparse_t * p )
673708{
674709 const char * when ;
@@ -1065,6 +1100,254 @@ static int subcmd_disconnect (optparse_t *p, int ac, char *av[])
10651100
10661101 return 0 ;
10671102}
1103+
1104+ struct typemap {
1105+ const char * s ;
1106+ int type ;
1107+ };
1108+
1109+ static struct typemap typemap [] = {
1110+ { ">" , FLUX_MSGTYPE_REQUEST },
1111+ { "<" , FLUX_MSGTYPE_RESPONSE },
1112+ { "e" , FLUX_MSGTYPE_EVENT },
1113+ { "c" , FLUX_MSGTYPE_CONTROL },
1114+ };
1115+
1116+ static const char * type2str (int type )
1117+ {
1118+ int i ;
1119+
1120+ for (i = 0 ; i < ARRAY_SIZE (typemap ); i ++ )
1121+ if ((type & typemap [i ].type ))
1122+ return typemap [i ].s ;
1123+ return "?" ;
1124+ }
1125+
1126+ enum {
1127+ TRACE_COLOR_EVENT = FLUX_MSGTYPE_EVENT ,
1128+ TRACE_COLOR_REQUEST = FLUX_MSGTYPE_REQUEST ,
1129+ TRACE_COLOR_RESPONSE = FLUX_MSGTYPE_RESPONSE ,
1130+ TRACE_COLOR_CONTROL = FLUX_MSGTYPE_CONTROL ,
1131+ TRACE_COLOR_TIME = 0x10 ,
1132+ TRACE_COLOR_TIMEBREAK = 0x11 ,
1133+ };
1134+
1135+ static const char * trace_colors [] = {
1136+ [TRACE_COLOR_EVENT ] = ANSI_COLOR_YELLOW ,
1137+ [TRACE_COLOR_REQUEST ] = ANSI_COLOR_BOLD_BLUE ,
1138+ [TRACE_COLOR_RESPONSE ] = ANSI_COLOR_CYAN ,
1139+ [TRACE_COLOR_CONTROL ] = ANSI_COLOR_BOLD ,
1140+ [TRACE_COLOR_TIME ] = ANSI_COLOR_GREEN ,
1141+ [TRACE_COLOR_TIMEBREAK ] = ANSI_COLOR_BOLD ANSI_COLOR_GREEN
1142+ };
1143+
1144+ static const char * months [] = {
1145+ "Jan" ,
1146+ "Feb" ,
1147+ "Mar" ,
1148+ "Apr" ,
1149+ "May" ,
1150+ "Jun" ,
1151+ "Jul" ,
1152+ "Aug" ,
1153+ "Sep" ,
1154+ "Oct" ,
1155+ "Nov" ,
1156+ "Dec" ,
1157+ NULL
1158+ };
1159+
1160+ static const char * trace_color (struct trace_ctx * ctx , int type )
1161+ {
1162+ if (ctx -> color )
1163+ return trace_colors [type ];
1164+ return "" ;
1165+ }
1166+
1167+ static const char * trace_color_reset (struct trace_ctx * ctx )
1168+ {
1169+ if (ctx -> color )
1170+ return ANSI_COLOR_RESET ;
1171+ return "" ;
1172+ }
1173+
1174+
1175+ static void trace_print_human_timestamp (struct trace_ctx * ctx ,
1176+ double timestamp )
1177+ {
1178+
1179+ if ((time_t )(timestamp /60 ) == (time_t )(ctx -> last_timestamp /60 )) {
1180+ /* Within same minute, print offset in sec */
1181+ printf ("%s[%+11.6f]%s " ,
1182+ trace_color (ctx , TRACE_COLOR_TIME ),
1183+ timestamp - ctx -> last_timestamp
1184+ + fmod (ctx -> last_timestamp , 60. ),
1185+ trace_color_reset (ctx ));
1186+ }
1187+ else {
1188+ struct tm tm ;
1189+ time_t t = timestamp ;
1190+
1191+ localtime_r (& t , & tm );
1192+
1193+ /* New minute, print datetime */
1194+ printf ("%s[%s%02d %02d:%02d]%s " ,
1195+ trace_color (ctx , TRACE_COLOR_TIMEBREAK ),
1196+ months [tm .tm_mon ],
1197+ tm .tm_mday ,
1198+ tm .tm_hour ,
1199+ tm .tm_min ,
1200+ trace_color_reset (ctx ));
1201+ ctx -> last_timestamp = timestamp ;
1202+ }
1203+ }
1204+
1205+ static void trace_print_human (struct trace_ctx * ctx ,
1206+ double timestamp ,
1207+ int message_type ,
1208+ const char * s )
1209+ {
1210+ trace_print_human_timestamp (ctx , timestamp );
1211+ printf (" %s%s%s\n" ,
1212+ trace_color (ctx , message_type ),
1213+ s ,
1214+ trace_color_reset (ctx ));
1215+ }
1216+
1217+ static void trace_print_timestamp (struct trace_ctx * ctx , double timestamp )
1218+ {
1219+ time_t t = timestamp ;
1220+ struct tm tm ;
1221+ char buf [64 ] = "" ;
1222+
1223+ localtime_r (& t , & tm );
1224+
1225+ strftime (buf , sizeof (buf ), "%Y-%m-%dT%T" , & tm );
1226+ printf ("%s%s.%.3d%s" ,
1227+ trace_color (ctx , TRACE_COLOR_TIME ),
1228+ buf ,
1229+ (int )(1e3 * (timestamp - t )),
1230+ trace_color_reset (ctx ));
1231+ }
1232+
1233+ static void trace_print (struct trace_ctx * ctx ,
1234+ double timestamp ,
1235+ int message_type ,
1236+ const char * s )
1237+ {
1238+ trace_print_timestamp (ctx , timestamp );
1239+ printf (" %s%s%s\n" ,
1240+ trace_color (ctx , message_type ),
1241+ s ,
1242+ trace_color_reset (ctx ));
1243+ }
1244+
1245+ static void trace_ctx_init (struct trace_ctx * ctx ,
1246+ optparse_t * p ,
1247+ int ac ,
1248+ char * av [])
1249+ {
1250+ int n = optparse_option_index (p );
1251+
1252+ memset (ctx , 0 , sizeof (* ctx ));
1253+
1254+ if (n < ac - 1 ) {
1255+ optparse_print_usage (p );
1256+ exit (1 );
1257+ }
1258+ if (optparse_hasopt (p , "delta" )) {
1259+ if (!optparse_hasopt (p , "human" ))
1260+ log_msg_exit ("--delta can only be used with --human" );
1261+ ctx -> delta = 1 ;
1262+ }
1263+ ctx -> nodeid = optparse_get_int (p , "rank" , FLUX_NODEID_ANY );
1264+ ctx -> match = FLUX_MATCH_ANY ;
1265+ if (n < ac )
1266+ ctx -> match .topic_glob = av [n ++ ];
1267+ else
1268+ ctx -> match .topic_glob = "*" ;
1269+ ctx -> color = status_use_color (p ); // borrowed from status subcommand
1270+ if (optparse_hasopt (p , "type" )) {
1271+ const char * arg ;
1272+
1273+ optparse_getopt_iterator_reset (p , "type" );
1274+ ctx -> match .typemask = 0 ;
1275+ while ((arg = optparse_getopt_next (p , "type" ))) {
1276+ if (streq (arg , "request" ))
1277+ ctx -> match .typemask |= FLUX_MSGTYPE_REQUEST ;
1278+ else if (streq (arg , "response" ))
1279+ ctx -> match .typemask |= FLUX_MSGTYPE_RESPONSE ;
1280+ else if (streq (arg , "event" ))
1281+ ctx -> match .typemask |= FLUX_MSGTYPE_EVENT ;
1282+ else if (streq (arg , "control" ))
1283+ ctx -> match .typemask |= FLUX_MSGTYPE_CONTROL ;
1284+ else
1285+ log_msg_exit ("valid types: request, response, event, control" );
1286+ }
1287+ }
1288+ }
1289+
1290+ static int subcmd_trace (optparse_t * p , int ac , char * av [])
1291+ {
1292+ flux_t * h = builtin_get_flux_handle (p );
1293+ flux_future_t * f ;
1294+ struct trace_ctx ctx ;
1295+
1296+ trace_ctx_init (& ctx , p , ac , av );
1297+
1298+ if (!(f = flux_rpc_pack (h ,
1299+ "overlay.trace" ,
1300+ FLUX_NODEID_ANY ,
1301+ FLUX_RPC_STREAMING ,
1302+ "{s:i s:s s:i}" ,
1303+ "typemask" , ctx .match .typemask ,
1304+ "topic_glob" , ctx .match .topic_glob ,
1305+ "nodeid" , ctx .nodeid )))
1306+ log_err_exit ("error sending overlay.trace request" );
1307+ do {
1308+ double timestamp ;
1309+ const char * prefix ;
1310+ int rank ;
1311+ int type ;
1312+ const char * topic ;
1313+ int payload_size ;
1314+ char buf [160 ];
1315+
1316+ if (flux_rpc_get_unpack (f ,
1317+ "{s:F s:s s:i s:i s:s s:i}" ,
1318+ "timestamp" , & timestamp ,
1319+ "prefix" , & prefix ,
1320+ "rank" , & rank ,
1321+ "type" , & type ,
1322+ "topic" , & topic ,
1323+ "payload_size" , & payload_size ) < 0 )
1324+ log_err_exit ("%s" , future_strerror (f , errno ));
1325+
1326+ char rankstr [16 ];
1327+ if (rank < 0 )
1328+ snprintf (rankstr , sizeof (rankstr ), "*" );
1329+ else
1330+ snprintf (rankstr , sizeof (rankstr ), "%d" , rank );
1331+
1332+ snprintf (buf ,
1333+ sizeof (buf ),
1334+ "%s %s %s %s [%s]" ,
1335+ prefix ,
1336+ rankstr ,
1337+ type2str (type ),
1338+ topic ,
1339+ encode_size (payload_size ));
1340+
1341+ if (optparse_hasopt (p , "human" ))
1342+ trace_print_human (& ctx , timestamp , type , buf );
1343+ else
1344+ trace_print (& ctx , timestamp , type , buf );
1345+ fflush (stdout );
1346+
1347+ flux_future_reset (f );
1348+ } while (1 );
1349+ }
1350+
10681351int cmd_overlay (optparse_t * p , int argc , char * argv [])
10691352{
10701353 log_init ("flux-overlay" );
@@ -1078,6 +1361,13 @@ int cmd_overlay (optparse_t *p, int argc, char *argv[])
10781361}
10791362
10801363static struct optparse_subcommand overlay_subcmds [] = {
1364+ { "trace" ,
1365+ "[OPTIONS] [topic-glob]" ,
1366+ "Trace messages received on overlay network" ,
1367+ subcmd_trace ,
1368+ 0 ,
1369+ trace_opts ,
1370+ },
10811371 { "errors" ,
10821372 "[OPTIONS]" ,
10831373 "Summarize overlay errors" ,
0 commit comments