Skip to content

Commit 15cae8d

Browse files
committed
Better handling of stale servers
1 parent 0fcc882 commit 15cae8d

File tree

2 files changed

+48
-16
lines changed

2 files changed

+48
-16
lines changed

src/modules/ReplayModule.cpp

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,11 @@ void ReplayModule::requestReplay(ReplayServerInfo *server)
504504
request &= server->priority;
505505
if (!request.any())
506506
return; // Nothing to request
507+
if (server->last_advert_millis + REPLAY_SERVER_STALE_SECS * 1000 < millis()) {
508+
LOG_DEBUG("Replay: Cancelling requests for missing packets from stale server=0x%08x", server->id);
509+
invalidateServer(server);
510+
return;
511+
}
507512
unsigned long request_millis = millis() + REPLAY_REQUEST_TIMEOUT_SECS * 1000;
508513
ReplayRequestInfo *requests[REPLAY_BUFFER_SIZE] = {};
509514
for (int i = 0; i < REPLAY_BUFFER_SIZE; i++) {
@@ -731,21 +736,21 @@ void ReplayModule::handleAdvertisement(const meshtastic_MeshPacket *p)
731736
for (unsigned int i = 0; i < REPLAY_TRACK_SERVERS; i++) {
732737
if (servers[i].id == p->from) {
733738
server = &servers[i];
739+
if (server->last_advert_millis + REPLAY_SERVER_STALE_SECS * 1000 < millis()) {
740+
LOG_INFO("Replay: Stale server 0x%08x has become active again after %u seconds", server->id,
741+
(millis() - server->last_advert_millis) / 1000);
742+
invalidateServer(server);
743+
}
734744
break;
735745
}
736746
}
737747
server->last_advert_millis = millis();
738748
server->flag_priority = wire->header.priority;
739749
server->flag_router = wire->header.router;
740750

741-
if (wire->header.boot) {
751+
if (wire->header.boot)
742752
// The server has rebooted, so reset its availability state
743-
server->available.reset();
744-
server->priority.reset();
745-
server->missing.reset();
746-
server->last_sequence = 0;
747-
server->missing_sequence = 0;
748-
}
753+
invalidateServer(server);
749754

750755
switch (wire->header.type) {
751756
case REPLAY_ADVERT_TYPE_AVAILABLE:
@@ -852,15 +857,22 @@ void ReplayModule::handleAvailabilityAdvertisement(ReplayWire *wire, unsigned ch
852857
}
853858
if (this_sequence <= server->last_sequence - 15)
854859
this_sequence += REPLAY_SEQUENCE_MASK + 1; // This is a forward wrap, not a reference to an old sequence
855-
if (this_sequence < server->last_sequence && !wire->header.aggregate) {
856-
// If the sequence number went backwards, then we have likely missed many intervening
857-
// adverts and should reset our tracking state & start with a blank slate. Do not ask
858-
// for missing adverts, because we have missed way too much for this to be sensible.
859-
LOG_WARN("Replay: Advertisement sequence went backwards from server=0x%08x seq=%u, last_seq=%u", server->id,
860-
this_sequence, server->last_sequence);
861-
server->available.reset();
862-
server->priority.reset();
863-
server->missing.reset();
860+
if (!wire->header.aggregate &&
861+
((this_sequence < server->last_sequence) ||
862+
(server->max_sequence > server->last_sequence && server->max_sequence - server->last_sequence > 15))) {
863+
if (this_sequence < server->last_sequence)
864+
// If the sequence number went backwards, then we have likely missed many intervening
865+
// adverts and should reset our tracking state & start with a blank slate. Do not ask
866+
// for missing adverts, because we have missed way too much for this to be sensible.
867+
LOG_WARN("Replay: Advertisement sequence went backwards from server=0x%08x seq=%u, last_seq=%u", server->id,
868+
this_sequence, server->last_sequence);
869+
else if (server->max_sequence - server->last_sequence > 15)
870+
// If we have missed so many adverts that we are this far behind, we are probably never
871+
// going to catch up via aggregates, so reset our tracking state & start with a blank slate.
872+
LOG_WARN("Replay: Too many missed adverts from server=0x%08x seq=%u, last_seq=%u, max_seq=%u", server->id,
873+
this_sequence, server->last_sequence, server->max_sequence);
874+
875+
invalidateServer(server);
864876
server->last_sequence = REPLAY_SEQUENCE_MASK + 1 + wire->header.sequence;
865877
this_sequence = (server->last_sequence & ~REPLAY_SEQUENCE_MASK) | wire->header.sequence;
866878
server->max_sequence = this_sequence;
@@ -1040,6 +1052,25 @@ meshtastic_MeshPacket *ReplayModule::queuePop()
10401052
return p;
10411053
}
10421054

1055+
/**
1056+
* Invalidate a server record's state and prepare it for reuse
1057+
*/
1058+
void ReplayModule::invalidateServer(ReplayServerInfo *server, bool stats)
1059+
{
1060+
server->last_sequence = 0;
1061+
server->max_sequence = 0;
1062+
server->missing_sequence = 0;
1063+
server->available.reset();
1064+
server->priority.reset();
1065+
server->missing.reset();
1066+
1067+
if (stats) {
1068+
server->adverts_received = 0;
1069+
server->replays_requested = 0;
1070+
server->last_advert_millis = 0;
1071+
}
1072+
}
1073+
10431074
/**
10441075
* Handle thread notifications
10451076
*/

src/modules/ReplayModule.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ class ReplayModule : public SinglePortModule, private concurrency::NotifiedWorke
212212
ReplayServerInfo *server);
213213
ReplayRequestInfo *requestInfo(ReplayHash hash);
214214
bool queuePush(ReplayCursor idx);
215+
void invalidateServer(ReplayServerInfo *server, bool stats = false);
215216
void onNotify(uint32_t notification);
216217
};
217218

0 commit comments

Comments
 (0)