Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/mesh/FloodingRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ bool FloodingRouter::roleAllowsCancelingDupe(const meshtastic_MeshPacket *p)

void FloodingRouter::perhapsCancelDupe(const meshtastic_MeshPacket *p)
{
if (p->is_replay_cached) {
// This is a replayed packet, so we have already transmitted it before, and any further retransmissions
// are explicitly requested by a replay client and therefore should not be cancelled or delayed.
return;
}
if (p->transport_mechanism == meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA && roleAllowsCancelingDupe(p)) {
// cancel rebroadcast of this message *if* there was already one, unless we're a router/repeater!
// But only LoRa packets should be able to trigger this.
Expand Down
46 changes: 46 additions & 0 deletions src/mesh/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ template <class T> class MemoryDynamic : public Allocator<T>
}
};

/**
* A version of MemoryDynamic that plays nicely with the replay cache
*/
template <class T> class MemoryDynamicReplayAware : public MemoryDynamic<T>
{
public:
virtual void release(T *p) override
{
if (p->is_replay_cached)
// Don't free packets that are in the replay cache
return;
MemoryDynamic<T>::release(p);
}

T *allocCopy(const T &src, TickType_t maxWait = portMAX_DELAY)
{
T *p = MemoryDynamic<T>::allocCopy(src, maxWait);
if (p)
p->is_replay_cached = false;
return p;
}
};

/**
* A static memory pool that uses a fixed buffer instead of heap allocation
*/
Expand Down Expand Up @@ -141,6 +164,14 @@ template <class T, int MaxSize> class MemoryPool : public Allocator<T>
}
}

T *allocCopy(const T &src, TickType_t maxWait = portMAX_DELAY)
{
T *p = MemoryPool<T, MaxSize>::allocCopy(src, maxWait);
if (p)
p->is_replay_cached = false;
return p;
}

protected:
// Alloc some storage from our static pool
virtual T *alloc(TickType_t maxWait) override
Expand All @@ -159,3 +190,18 @@ template <class T, int MaxSize> class MemoryPool : public Allocator<T>
return nullptr;
}
};

/**
* A version of MemoryPool that plays nicely with the replay cache
*/
template <class T, int MaxSize> class MemoryPoolReplayAware : public MemoryPool<T, MaxSize>
{
public:
virtual void release(T *p) override
{
if (p->is_replay_cached)
// Don't free packets that are in the replay cache
return;
MemoryPool<T, MaxSize>::release(p);
}
};
6 changes: 6 additions & 0 deletions src/mesh/RadioInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "Router.h"
#include "configuration.h"
#include "main.h"
#if !MESHTASTIC_EXCLUDE_REPLAY
#include "modules/ReplayModule.h"
#endif
#include "sleep.h"
#include <assert.h>
#include <pb_decode.h>
Expand Down Expand Up @@ -355,6 +358,9 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p)
std::string out =
DEBUG_PORT.mt_sprintf("%s (id=0x%08x fr=0x%08x to=0x%08x, transport = %u, WantAck=%d, HopLim=%d Ch=0x%x", prefix, p->id,
p->from, p->to, p->transport_mechanism, p->want_ack, p->hop_limit, p->channel);
#if !MESHTASTIC_EXCLUDE_REPLAY
out += DEBUG_PORT.mt_sprintf(" hash=0x%04x", REPLAY_HASH(p->from, p->id));
#endif
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
auto &s = p->decoded;

Expand Down
67 changes: 61 additions & 6 deletions src/mesh/RadioLibInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <pb_decode.h>
#include <pb_encode.h>

#if !MESHTASTIC_EXCLUDE_REPLAY
#include "modules/ReplayModule.h"
#endif

#if ARCH_PORTDUINO
#include "PortduinoGlue.h"
#include "meshUtils.h"
Expand Down Expand Up @@ -260,11 +264,11 @@ void RadioLibInterface::onNotify(uint32_t notification)

// If we are not currently in receive mode, then restart the random delay (this can happen if the main thread
// has placed the unit into standby) FIXME, how will this work if the chipset is in sleep mode?
if (!txQueue.empty()) {
if (getNextTXPacket()) {
if (!canSendImmediately()) {
setTransmitDelay(); // currently Rx/Tx-ing: reset random delay
} else {
meshtastic_MeshPacket *txp = txQueue.getFront();
meshtastic_MeshPacket *txp = getNextTXPacket();
assert(txp);
long delay_remaining = txp->tx_after ? txp->tx_after - millis() : 0;
if (delay_remaining > 0) {
Expand All @@ -277,7 +281,7 @@ void RadioLibInterface::onNotify(uint32_t notification)
} else {
// Send any outgoing packets we have ready as fast as possible to keep the time between channel scan and
// actual transmission as short as possible
txp = txQueue.dequeue();
txp = getNextTXPacket(true);
assert(txp);
bool sent = startSend(txp);
if (sent) {
Expand All @@ -286,6 +290,9 @@ void RadioLibInterface::onNotify(uint32_t notification)
airTime->logAirtime(TX_LOG, xmitMsec);
}
LOG_DEBUG("%d packets remain in the TX queue", txQueue.getMaxLen() - txQueue.getFree());
#if !MESHTASTIC_EXCLUDE_REPLAY
LOG_DEBUG("%u packets remain in the replay queue", replayModule->queueLength());
#endif
}
}
}
Expand All @@ -302,6 +309,10 @@ void RadioLibInterface::setTransmitDelay()
{
meshtastic_MeshPacket *p = txQueue.getFront();
if (!p) {
#if !MESHTASTIC_EXCLUDE_REPLAY
if (replayModule->queuePeek())
notify(TRANSMIT_DELAY_COMPLETED, true);
#endif
return; // noop if there's nothing in the queue
}

Expand All @@ -313,7 +324,13 @@ void RadioLibInterface::setTransmitDelay()
unsigned long add_delay = p->rx_rssi ? getTxDelayMsecWeighted(p) : getTxDelayMsec();
unsigned long now = millis();
p->tx_after = min(max(p->tx_after + add_delay, now + add_delay), now + 2 * getTxDelayMsecWeightedWorst(p->rx_snr));
notifyLater(p->tx_after - now, TRANSMIT_DELAY_COMPLETED, false);
#if !MESHTASTIC_EXCLUDE_REPLAY
// If the head of the queue is delayed, but there are replay packets waiting, notify TX immediately
if (replayModule->queuePeek())
notify(TRANSMIT_DELAY_COMPLETED, true);
else
#endif
notifyLater(p->tx_after - now, TRANSMIT_DELAY_COMPLETED, false);
} else if (p->rx_snr == 0 && p->rx_rssi == 0) {
/* We assume if rx_snr = 0 and rx_rssi = 0, the packet was generated locally.
* This assumption is valid because of the offset generated by the radio to account for the noise
Expand Down Expand Up @@ -380,8 +397,14 @@ void RadioLibInterface::completeSending()

if (p) {
txGood++;
if (!isFromUs(p))
if (!isFromUs(p)) {
txRelay++;
#if !MESHTASTIC_EXCLUDE_REPLAY
replayModule->adopt(p); // If we relayed it, then we might be asked to replay it later
} else {
replayModule->remember(p); // If we sent it, remember it so we don't ask for someone else to replay it
#endif
}
printPacket("Completed sending", p);

// We are done sending that packet, release it
Expand Down Expand Up @@ -471,9 +494,18 @@ void RadioLibInterface::handleReceiveInterrupt()
memcpy(mp->encrypted.bytes, radioBuffer.payload, payloadLen);
mp->encrypted.size = payloadLen;

airTime->logAirtime(RX_LOG, xmitMsec);

printPacket("Lora RX", mp);

airTime->logAirtime(RX_LOG, xmitMsec);
#if !MESHTASTIC_EXCLUDE_REPLAY
if (REPLAY_FAKE_PACKET_LOSS_PERCENT && (rand() % 100 < REPLAY_FAKE_PACKET_LOSS_PERCENT)) {
packetPool.release(mp);
return; // Fake some packet loss to test replay functionality
}

replayModule->remember(mp);
#endif

deliverToReceiver(mp);
}
Expand Down Expand Up @@ -531,4 +563,27 @@ bool RadioLibInterface::startSend(meshtastic_MeshPacket *txp)

return res == RADIOLIB_ERR_NONE;
}
}

meshtastic_MeshPacket *RadioLibInterface::getNextTXPacket(bool dequeue)
{
meshtastic_MeshPacket *p = txQueue.getFront();
if (p && p->tx_after <= millis()) {
if (dequeue) {
p = txQueue.dequeue();
}
return p;
} else {
#if !MESHTASTIC_EXCLUDE_REPLAY
// If there's nothing ready to send in the main TX queue, see if there's a replay packet pending
p = replayModule->queuePeek();
if (p) {
if (dequeue) {
replayModule->queuePop();
}
return p;
}
#endif
return NULL;
}
}
2 changes: 2 additions & 0 deletions src/mesh/RadioLibInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified

meshtastic_QueueStatus getQueueStatus();

meshtastic_MeshPacket *getNextTXPacket(bool dequeue = false);

protected:
uint32_t activeReceiveStart = 0;

Expand Down
6 changes: 3 additions & 3 deletions src/mesh/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
(MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \
2) // max number of packets which can be in flight (either queued from reception or queued for sending)

static MemoryDynamic<meshtastic_MeshPacket> dynamicPool;
static MemoryDynamicReplayAware<meshtastic_MeshPacket> dynamicPool;
Allocator<meshtastic_MeshPacket> &packetPool = dynamicPool;
#else
// Embedded targets use static memory pools with compile-time constants
#define MAX_PACKETS_STATIC \
(MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \
2) // max number of packets which can be in flight (either queued from reception or queued for sending)

static MemoryPool<meshtastic_MeshPacket, MAX_PACKETS_STATIC> staticPool;
static MemoryPoolReplayAware<meshtastic_MeshPacket, MAX_PACKETS_STATIC> staticPool;
Allocator<meshtastic_MeshPacket> &packetPool = staticPool;
#endif

Expand Down Expand Up @@ -692,7 +692,7 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
meshtastic_PortNum_POSITION_APP, meshtastic_PortNum_NODEINFO_APP, meshtastic_PortNum_ROUTING_APP,
meshtastic_PortNum_TELEMETRY_APP, meshtastic_PortNum_ADMIN_APP, meshtastic_PortNum_ALERT_APP,
meshtastic_PortNum_KEY_VERIFICATION_APP, meshtastic_PortNum_WAYPOINT_APP,
meshtastic_PortNum_STORE_FORWARD_APP, meshtastic_PortNum_TRACEROUTE_APP)) {
meshtastic_PortNum_STORE_FORWARD_APP, meshtastic_PortNum_TRACEROUTE_APP, meshtastic_PortNum_REPLAY_APP)) {
LOG_DEBUG("Ignore packet on non-standard portnum for CORE_PORTNUMS_ONLY");
cancelSending(p->from, p->id);
skipHandle = true;
Expand Down
6 changes: 6 additions & 0 deletions src/modules/Modules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
#if !MESHTASTIC_EXCLUDE_POWERSTRESS
#include "modules/PowerStressModule.h"
#endif
#if !MESHTASTIC_EXCLUDE_REPLAY
#include "modules/ReplayModule.h"
#endif
#include "modules/RoutingModule.h"
#include "modules/TextMessageModule.h"
#if !MESHTASTIC_EXCLUDE_TRACEROUTE
Expand Down Expand Up @@ -171,6 +174,9 @@ void setupModules()
#endif
#if !MESHTASTIC_EXCLUDE_POWERSTRESS
new PowerStressModule();
#endif
#if !MESHTASTIC_EXCLUDE_REPLAY
replayModule = new ReplayModule();
#endif
// Example: Put your module here
// new ReplyModule();
Expand Down
Loading
Loading