Skip to content

Commit ac81900

Browse files
committed
Add stats packet
1 parent 208dc41 commit ac81900

File tree

2 files changed

+239
-14
lines changed

2 files changed

+239
-14
lines changed

src/modules/ReplayModule.cpp

Lines changed: 196 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,11 @@
1010
* - What should the tunable default values be?
1111
* - Scale replay rate based on modem settings
1212
* - Prioritise replay of packets requested by routers
13+
* - Cache replay stats packets normally
1314
* - Lots of testing (and likely a bunch of bugfixes)
1415
* - WARN | 23:30:46 4214 [Router] Replay: Advertisement sequence went backwards from server=0x056191db seq=36, last_seq=48
1516
* - Back off repeated replay requests?
16-
* - Implement a periodic stats packet that includes:
17-
* - Number of adverts sent
18-
* - Number of replays sent
19-
* - Number of replay requests received
20-
* - Number of replays requested
21-
* - Number of adverts received
22-
* - For each server we are tracking:
23-
* - Age of last advert
24-
* - Number of adverts received
25-
* - Number of packets requested from this server
26-
* - Number of packets requested by this server
27-
* - router flag
17+
* - Frequent reboots since implementing stats
2818
*/
2919

3020
ReplayModule *replayModule = NULL;
@@ -198,6 +188,10 @@ void ReplayModule::adopt(meshtastic_MeshPacket *p)
198188
if (!entry)
199189
return; // Already cached
200190

191+
metrics.packets_rebroadcast++;
192+
if (p->priority >= REPLAY_CHUTIL_PRIORITY)
193+
metrics.packets_rebroadcast_prio++;
194+
201195
LOG_DEBUG("Replay: Adopting packet from=0x%08x id=0x%08x priority=%u packets=%u cached=%u cache_bytes=%u", p->from, p->id,
202196
p->priority, buffer.getLength(), buffer.getNumCached(), buffer.getNumCached() * sizeof(meshtastic_MeshPacket));
203197
unsigned int idx = buffer.getHeadCursor() & REPLAY_BUFFER_MASK;
@@ -382,6 +376,10 @@ void ReplayModule::advertise(bool aggregate, unsigned int from_sequence, ReplayM
382376
packets_since_advert -= packets;
383377
service->sendToMesh(p);
384378

379+
metrics.adverts_sent++;
380+
if (aggregate)
381+
metrics.adverts_sent_agg++;
382+
385383
if (again) {
386384
advertise();
387385
}
@@ -420,6 +418,7 @@ void ReplayModule::advertiseExpired()
420418

421419
service->sendToMesh(p);
422420
last_expired_millis = millis();
421+
metrics.adverts_sent_expired++;
423422
want_replay_expired = false;
424423
}
425424

@@ -489,6 +488,9 @@ void ReplayModule::replay()
489488
} else {
490489
to_send->last_replay_millis = millis();
491490
to_send->replay_count++;
491+
metrics.packets_replayed++;
492+
if (to_send->p->priority >= REPLAY_CHUTIL_PRIORITY)
493+
metrics.packets_replayed_prio++;
492494
want_replay.reset(to_send_idx);
493495
}
494496
} else {
@@ -580,6 +582,9 @@ void ReplayModule::requestReplay(ReplayServerInfo *server)
580582
*map |= (1 << i);
581583
*payload |= (1 << j);
582584
server->replays_requested++;
585+
metrics.packets_requested++;
586+
if (server->priority.test(idx))
587+
metrics.packets_requested_prio++;
583588
}
584589
}
585590
if (*map & (1 << i))
@@ -590,6 +595,7 @@ void ReplayModule::requestReplay(ReplayServerInfo *server)
590595
LOG_INFO("Replay: Requesting %u missing packets server=0x%08x prio=%u ranges=%u size=%u", request.count(), server->id,
591596
wire.header.priority, (uint16_t)*map, p->decoded.payload.size);
592597
service->sendToMesh(p);
598+
getStats(server->id)->requests_to++;
593599
}
594600

595601
/**
@@ -658,6 +664,13 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p)
658664
client->bucket = REPLAY_CLIENT_BURST;
659665
client->last_request_millis = millis();
660666

667+
ReplayStats *stats = getStats(p->from);
668+
stats->requests_from++;
669+
if (wire->header.router)
670+
stats->is_router = true;
671+
if (wire->header.priority)
672+
stats->priority = true;
673+
661674
switch (wire->header.type) {
662675
case REPLAY_REQUEST_TYPE_ADVERTISEMENT: {
663676
if (payload_words < 2) {
@@ -671,6 +684,7 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p)
671684
}
672685
LOG_INFO("Replay: Advertisement request from=0x%08x seq=%u missing=%u", p->from, wire->header.sequence, missing);
673686
advertise(true, wire->header.sequence, missing);
687+
stats->replays_for++;
674688
} break;
675689
case REPLAY_REQUEST_TYPE_PACKETS: {
676690
if (payload_words < 3 || payload_words < 1 /*header*/ + 1 /*map*/ + __builtin_popcount(payload[1]) /*ranges*/) {
@@ -693,6 +707,7 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p)
693707
continue; // Don't replay packets that are already in our TX queue
694708
if (!wire->header.priority || (entry->p && entry->p->priority >= REPLAY_CHUTIL_PRIORITY)) {
695709
want_replay.set(idx);
710+
stats->replays_for++;
696711
requested++;
697712
client->bucket--;
698713
LOG_INFO("Replay: Request for %s packet hash=0x%04x client=0x%08x", entry->p ? "cached" : "expired",
@@ -704,8 +719,10 @@ void ReplayModule::handleRequest(const meshtastic_MeshPacket *p)
704719
}
705720
range++;
706721
}
707-
if (!client->bucket)
722+
if (!client->bucket) {
708723
LOG_WARN("Replay: Client 0x%08x is being rate limited", client->id);
724+
stats->throttled = true;
725+
}
709726
replay_from = buffer.getHeadCursor();
710727
LOG_INFO("Replay: Pending replay of %u packets, requested=%u, want_expired=%u", want_replay.count(), requested,
711728
want_replay_expired);
@@ -768,6 +785,18 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p)
768785
handleExpiredAdvertisement(wire, (unsigned char *)payload,
769786
((unsigned char *)p->decoded.payload.bytes) + p->decoded.payload.size, server);
770787
break;
788+
case REPLAY_ADVERT_TYPE_STATISTICS: {
789+
meshtastic_ReplayStats rs = {};
790+
bool success = pb_decode_from_bytes((unsigned char *)payload, p->decoded.payload.size - sizeof(ReplayWire),
791+
meshtastic_ReplayStats_fields, &rs);
792+
if (!success)
793+
LOG_WARN("Replay: Failed to decode invalid stats advertisement from=0x%08x", p->from);
794+
else {
795+
LOG_INFO("Replay: Received stats summary from=0x%08x", p->from);
796+
printStats(&rs);
797+
}
798+
}
799+
return; // Stats packets aren't a normal advertisement, so don't set up tracking for this node
771800
default:
772801
LOG_WARN("Replay: Unknown advertisement type %u", wire->header.type);
773802
return;
@@ -809,6 +838,13 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p)
809838
void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end,
810839
ReplayServerInfo *server)
811840
{
841+
ReplayStats *stats = getStats(server->id);
842+
stats->adverts_from++;
843+
if (wire->header.router)
844+
stats->is_router = true;
845+
if (wire->header.priority)
846+
stats->priority = true;
847+
812848
int payload_words = (data_end - data) / sizeof(uint16_t);
813849
if (payload_words < 2 || payload_words < 1 /*map*/ + __builtin_popcount(((uint16_t *)data)[0]) * 2 /*ranges*/) {
814850
LOG_WARN("Replay: Availability advert payload too small");
@@ -839,6 +875,8 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch
839875
if (!isKnown(server->packets[idx])) {
840876
LOG_WARN("Replay: Discovered missing packet hash=0x%04x via=0x%08x", server->packets[idx], server->id);
841877
server->missing.set(idx);
878+
server->packets_missed++;
879+
stats->missed_from++;
842880
} else {
843881
LOG_DEBUG("Replay: Discovered known packet hash=0x%04x via=0x%08x", server->packets[idx], server->id);
844882
server->missing.reset(idx);
@@ -889,6 +927,9 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch
889927
if (seq >= this_sequence)
890928
break;
891929
server->missing_sequence |= (1 << i);
930+
server->packets_missed++;
931+
stats->missed_from++;
932+
metrics.packets_requested++;
892933
LOG_WARN("Replay: Noticed missing advertisement seq=%u from server=0x%08x", seq, server->id);
893934
}
894935
while (server->last_sequence < server->max_sequence && !(server->missing_sequence & 3)) {
@@ -930,8 +971,10 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch
930971
uint8_t *throttled = (uint8_t *)payload;
931972
uint8_t me = nodeDB->getNodeNum() & 0x000F;
932973
while (throttled <= data_end) {
933-
if (*throttled++ == me)
974+
if (*throttled++ == me) {
975+
stats->throttled_from++;
934976
return; // We are being throttled by the server, so don't ask for anything
977+
}
935978
}
936979
}
937980

@@ -958,6 +1001,13 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch
9581001
void ReplayModule::handleExpiredAdvertisement(ReplayWire *wire, unsigned char *data, unsigned char *data_end,
9591002
ReplayServerInfo *server)
9601003
{
1004+
ReplayStats *stats = getStats(server->id);
1005+
stats->expired_from++;
1006+
if (wire->header.router)
1007+
stats->is_router = true;
1008+
if (wire->header.priority)
1009+
stats->priority = true;
1010+
9611011
unsigned int expired = 0;
9621012
uint16_t *payload = (uint16_t *)data;
9631013
ReplayMap map = *payload++;
@@ -1074,6 +1124,132 @@ void ReplayModule::invalidateServer(ReplayServerInfo *server, bool stats)
10741124
}
10751125
}
10761126

1127+
/**
1128+
* Get the current stats object for a node
1129+
*/
1130+
ReplayStats *ReplayModule::getStats(NodeNum id)
1131+
{
1132+
for (unsigned int i = 0; i < REPLAY_STATS_SIZE; i++) {
1133+
if (servers[i].id == id)
1134+
return &stats[i];
1135+
}
1136+
ReplayStats *s = &stats[stats_next++ & REPLAY_STATS_MASK];
1137+
*s = {};
1138+
s->id = id;
1139+
return s;
1140+
}
1141+
1142+
/**
1143+
* Reset stats for all nodes
1144+
*/
1145+
void ReplayModule::resetStats()
1146+
{
1147+
metrics = {};
1148+
stats_next = 0;
1149+
memset(stats, 0, sizeof(stats));
1150+
metrics.window_start_millis = millis();
1151+
}
1152+
1153+
/**
1154+
* Broadcast a stats packet to the mesh
1155+
*/
1156+
void ReplayModule::sendStats()
1157+
{
1158+
if (!metrics.adverts_sent && !IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER,
1159+
meshtastic_Config_DeviceConfig_Role_ROUTER_LATE)) {
1160+
LOG_DEBUG("Replay: Skipping stats broadcast because no adverts sent and not a router");
1161+
resetStats();
1162+
return;
1163+
}
1164+
1165+
ReplayWire wire = {};
1166+
wire.header.type = REPLAY_ADVERT_TYPE_STATISTICS;
1167+
wire.header.priority = airTime->channelUtilizationPercent() >= REPLAY_CHUTIL_THRESHOLD_PCT;
1168+
wire.header.router = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER,
1169+
meshtastic_Config_DeviceConfig_Role_ROUTER_LATE);
1170+
1171+
meshtastic_ReplayStats rs = {};
1172+
rs.window_length_secs = (millis() - metrics.window_start_millis) / 1000;
1173+
rs.current_size = buffer.getLength();
1174+
rs.current_cached = buffer.getNumCached();
1175+
rs.adverts_sent = metrics.adverts_sent;
1176+
rs.expired_sent = metrics.adverts_sent_expired;
1177+
rs.requests_sent_packets = metrics.packets_requested;
1178+
rs.requests_sent_packets_prio = metrics.packets_requested_prio;
1179+
rs.packets_replayed = metrics.packets_replayed;
1180+
rs.packets_replayed_prio = metrics.packets_replayed_prio;
1181+
rs.packets_rebroadcast = metrics.packets_rebroadcast;
1182+
rs.packets_rebroadcast_prio = metrics.packets_rebroadcast_prio;
1183+
1184+
for (unsigned int i = 0; i < REPLAY_STATS_SIZE; i++) {
1185+
ReplayStats *s = &stats[i];
1186+
rs.expired_received += s->expired_from;
1187+
rs.requests_sent += s->requests_to;
1188+
rs.packets_missed += s->missed_from;
1189+
1190+
if (s->adverts_from) {
1191+
rs.unique_advertisers++;
1192+
rs.adverts_received += s->adverts_from;
1193+
}
1194+
if (s->requests_from) {
1195+
rs.requests_received += s->requests_from;
1196+
rs.unique_requestors++;
1197+
}
1198+
if (s->throttled)
1199+
rs.throttled_requestors++;
1200+
}
1201+
1202+
for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) {
1203+
ReplayServerInfo *server = &servers[i];
1204+
if (!server->is_tracked)
1205+
continue;
1206+
meshtastic_ReplayServerStats *ss = &rs.servers[rs.servers_count++];
1207+
ss->id = server->id;
1208+
ss->adverts_received = server->adverts_received;
1209+
ss->requests_sent = server->replays_requested;
1210+
ss->packets_missed = server->packets_missed;
1211+
ss->last_advert_secs = (millis() - server->last_advert_millis) / 1000;
1212+
ss->is_router = server->flag_router;
1213+
ss->priority = server->flag_priority;
1214+
}
1215+
1216+
meshtastic_MeshPacket *p = allocDataPacket();
1217+
assert(p);
1218+
unsigned char *pos = p->decoded.payload.bytes;
1219+
p->to = NODENUM_BROADCAST;
1220+
p->priority = meshtastic_MeshPacket_Priority_DEFAULT;
1221+
memcpy(pos, &wire.header.bitfield, sizeof(wire.header.bitfield));
1222+
pos += sizeof(wire.header.bitfield);
1223+
pos += pb_encode_to_bytes(pos, sizeof(p->decoded.payload.bytes) - (pos - p->decoded.payload.bytes),
1224+
meshtastic_ReplayStats_fields, &rs);
1225+
p->decoded.payload.size = pos - p->decoded.payload.bytes;
1226+
1227+
LOG_INFO("Replay: Broadcasting statistics to mesh");
1228+
printStats(&rs);
1229+
1230+
service->sendToMesh(p);
1231+
resetStats();
1232+
last_stats_millis = millis();
1233+
}
1234+
1235+
void ReplayModule::printStats(meshtastic_ReplayStats *rs)
1236+
{
1237+
LOG_INFO("Replay statistics (last %u seconds):", rs->window_length_secs);
1238+
LOG_INFO(" Buffer: size=%u cached=%u", rs->current_size, rs->current_cached);
1239+
LOG_INFO(" Advertisements: sent=%u expired=%u received=%u advertisers=%u missed_packets=%u", rs->adverts_sent,
1240+
rs->expired_sent, rs->adverts_received, rs->unique_advertisers, rs->packets_missed);
1241+
LOG_INFO(" Requests: sent=%u packets=%u prio=%u received=%u requestors=%u throttled=%u", rs->requests_sent,
1242+
rs->requests_sent_packets, rs->requests_sent_packets_prio, rs->requests_received, rs->unique_requestors,
1243+
rs->throttled_requestors);
1244+
LOG_INFO(" Replays: packets=%u prio=%u", rs->packets_replayed, rs->packets_replayed_prio);
1245+
LOG_INFO(" Rebroadcasts: packets=%u prio=%u", rs->packets_rebroadcast, rs->packets_rebroadcast_prio);
1246+
for (unsigned int i = 0; i < rs->servers_count; i++) {
1247+
meshtastic_ReplayServerStats *s = &rs->servers[i];
1248+
LOG_INFO(" Server 0x%08x: adverts=%u requests=%u missed=%u last_advert=%us router=%u prio=%u", s->id,
1249+
s->adverts_received, s->requests_sent, s->packets_missed, s->last_advert_secs, s->is_router, s->priority);
1250+
}
1251+
}
1252+
10771253
/**
10781254
* Handle thread notifications
10791255
*/
@@ -1092,10 +1268,16 @@ void ReplayModule::onNotify(uint32_t notification)
10921268
if (packets_since_advert > REPLAY_FLUSH_PACKETS || deadline <= now)
10931269
advertise();
10941270

1271+
if (last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000 <= now)
1272+
sendStats();
1273+
10951274
if (replay_from >= buffer.getTailCursor() && replay_from) {
10961275
// We still have packets pending replay
10971276
notifyLater(REPLAY_SPACING_MS, REPLAY_NOTIFY_REPLAY, true);
10981277
} else if (deadline > now) {
1278+
if (last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000 < deadline)
1279+
deadline = last_stats_millis + REPLAY_STATS_INTERVAL_SECS * 1000;
1280+
10991281
// Sleep until the next advert deadline
11001282
LOG_DEBUG("Sleep to deadline %ld", deadline - now);
11011283
notifyLater(deadline - now, REPLAY_NOTIFY_INTERVAL, false);

0 commit comments

Comments
 (0)