@@ -62,6 +62,7 @@ typedef unsigned char byte_t;
6262#define TAIL_TOGGLE 32U
6363
6464#define FOREACH_IFACE (i ) for (size_t i = 0; (i) < CANARD_IFACE_COUNT; (i)++)
65+ #define FOREACH_PRIO (i ) for (size_t i = 0; (i) < CANARD_PRIO_COUNT; (i)++)
6566
6667#if CANARD_IFACE_COUNT <= 2
6768#define IFACE_INDEX_BIT_LENGTH 1U
@@ -1179,11 +1180,6 @@ static byte_t rx_parse(const uint32_t can_id,
11791180// This could be made configurable but it is not a tuning-sensitive parameter.
11801181#define RX_SESSION_TIMEOUT (30 * MEGA)
11811182
1182- // The maximum preemption depth is the number of priority levels minus one. We need one extra slot for the preempted
1183- // transfer itself. The worst case scenario is when we have a transfer at the lowest priority level preempted
1184- // by a transfer at the priority one higher, and so on up to the maximum.
1185- #define FOREACH_SLOT (i ) for (size_t i = 0; (i) < CANARD_PRIO_COUNT; (i)++)
1186-
11871183// Reassembly state at a specific priority level.
11881184// Maintaining separate state per priority level allows preemption of higher-priority transfers without loss.
11891185// Interface affinity is required because frames duplicated across redundant interfaces may arrive with a significant
@@ -1233,6 +1229,10 @@ static void rx_slot_advance(rx_slot_t* const slot, const size_t extent, const ca
12331229 slot -> expected_toggle ^= 1U ;
12341230}
12351231
1232+ // This value is unreachable even if seqno is incremented by full transfer-ID period every frame at 5k frames/s;
1233+ // at that rate, it would take about 56 years to wrap around.
1234+ #define RX_SEQNO_MAX ((UINT64_C(1) << 48U) - 1U)
1235+
12361236// A compact representation is needed because we need to store an array of these in dynamic memory.
12371237typedef struct
12381238{
@@ -1298,7 +1298,7 @@ typedef struct
12981298{
12991299 canard_tree_t index ;
13001300 canard_listed_t list_animation ; // On update, session moved to the tail; oldest pushed to the head.
1301- canard_us_t last_admitted_start_ts ;
1301+ canard_us_t last_admission_ts ;
13021302 rx_slot_t * slots [CANARD_PRIO_COUNT ]; // Indexed by priority level to allow preemption.
13031303 canard_subscription_t * owner ;
13041304 rx_seqno_packed_t seqno_frontier [CANARD_PRIO_COUNT ];
@@ -1326,21 +1326,21 @@ static canard_tree_t* rx_session_factory(void* const user)
13261326 if (ses == NULL ) {
13271327 return NULL ;
13281328 }
1329- FOREACH_SLOT (i ) {
1329+ FOREACH_PRIO (i ) {
13301330 ses -> slots [i ] = NULL ;
13311331 }
1332- ses -> last_admitted_start_ts = BIG_BANG ;
1333- ses -> owner = ctx -> owner ;
1334- ses -> iface_index = ctx -> iface_index ;
1335- ses -> node_id = ctx -> node_id ;
1332+ ses -> last_admission_ts = BIG_BANG ;
1333+ ses -> owner = ctx -> owner ;
1334+ ses -> iface_index = ctx -> iface_index ;
1335+ ses -> node_id = ctx -> node_id ;
13361336 enlist_tail (& ctx -> owner -> owner -> rx .list_session_by_animation , & ses -> list_animation );
13371337 return & ses -> index ;
13381338}
13391339
13401340static void rx_session_destroy (rx_session_t * const ses )
13411341{
13421342 canard_subscription_t * const sub = ses -> owner ;
1343- FOREACH_SLOT (i ) {
1343+ FOREACH_PRIO (i ) {
13441344 rx_slot_destroy (sub , ses -> slots [i ]);
13451345 }
13461346 CANARD_ASSERT (cavl2_is_inserted (sub -> sessions , & ses -> index ));
@@ -1351,17 +1351,16 @@ static void rx_session_destroy(rx_session_t* const ses)
13511351}
13521352
13531353// Checks the state and purges stale slots to reclaim memory early. Returns the number of in-progress slots remaining.
1354- static size_t rx_session_scan (rx_session_t * const ses , const canard_us_t now )
1354+ static size_t rx_session_cleanup (rx_session_t * const ses , const canard_us_t now )
13551355{
13561356 const canard_us_t deadline = now - later (RX_SESSION_TIMEOUT , ses -> owner -> transfer_id_timeout );
13571357 size_t n_slots = 0 ;
1358- FOREACH_SLOT (i ) {
1358+ FOREACH_PRIO (i ) {
13591359 const rx_slot_t * const slot = ses -> slots [i ];
13601360 if (slot == NULL ) {
13611361 continue ;
13621362 }
1363- CANARD_ASSERT (slot -> start_ts >= 0 );
1364- CANARD_ASSERT (ses -> last_admitted_start_ts >= slot -> start_ts );
1363+ CANARD_ASSERT ((0 <= slot -> start_ts ) && (slot -> start_ts <= ses -> last_admission_ts ));
13651364 if (slot -> start_ts < deadline ) { // Too old, destroy even if in progress -- unlikely to complete anyway.
13661365 rx_slot_destroy (ses -> owner , ses -> slots [i ]);
13671366 ses -> slots [i ] = NULL ;
@@ -1372,53 +1371,71 @@ static size_t rx_session_scan(rx_session_t* const ses, const canard_us_t now)
13721371 return n_slots ;
13731372}
13741373
1374+ // Maximum seqno seen from the given highest priority level (numerically lowest, inclusive) and down.
1375+ static uint64_t rx_session_seqno_frontier (const rx_session_t * const ses , const canard_prio_t highest_priority )
1376+ {
1377+ uint64_t seqno = 0 ;
1378+ for (size_t i = (size_t )highest_priority ; i < CANARD_PRIO_COUNT ; i ++ ) {
1379+ seqno = max_u64 (seqno , rx_seqno_unpack (ses -> seqno_frontier [i ]));
1380+ }
1381+ return seqno ;
1382+ }
1383+
13751384static void rx_session_record_admission (rx_session_t * const ses ,
13761385 const canard_prio_t priority ,
13771386 const uint64_t seqno ,
13781387 const canard_us_t ts ,
13791388 const byte_t iface_index )
13801389{
1390+ // Seqno per priority cannot go back. When we reset it on a transfer-ID timeout, we simply bump the seqno state by
1391+ // a multiple of transfer-ID overflow periods to ensure monotonicity. Earlier I tried dumb direct approaches
1392+ // where we erase the seqno states to zero on tid-timeout, or introduce epoch counters, or per-seqno timestamp,
1393+ // but this solution is so much simpler while achieves the same goal: leave older seqnos behind, start new epoch.
1394+ CANARD_ASSERT (seqno > rx_seqno_unpack (ses -> seqno_frontier [priority ]));
13811395 ses -> seqno_frontier [priority ] = rx_seqno_pack (seqno ); // nothing older than this at this & higher prio from now on
1382- ses -> last_admitted_start_ts = ts ;
1396+ ses -> last_admission_ts = ts ;
13831397 ses -> iface_index = iface_index ;
13841398}
13851399
1386- // Maximum seqno seen from the given highest priority level (numerically lowest, inclusive) and down.
1387- static uint64_t rx_session_seqno_frontier (const rx_session_t * const ses , const canard_prio_t highest_priority )
1388- {
1389- uint64_t seqno = 0 ;
1390- for (size_t i = (size_t )highest_priority ; i < CANARD_PRIO_COUNT ; i ++ ) {
1391- seqno = max_u64 (seqno , rx_seqno_unpack (ses -> seqno_frontier [i ]));
1392- }
1393- return seqno ;
1394- }
1400+ #define RX_SESSION_ADMISSION_REJECTED (UINT64_MAX)
1401+ #define RX_SESSION_ADMISSION_CONTINUATION (UINT64_MAX - 1)
13951402
1396- // Frame admittance state machine update . A complex piece, redesigned after v4 to support priority preemption.
1403+ // Frame admittance solver . A complex piece, redesigned after v4 to support priority preemption.
13971404// Key ideas: 1. Separate reassembly state per priority level. 2. TID is linearized into seqno.
13981405// Once we admit a transfer at some priority with a certain seqno, we know that any older seqno at this or higher
13991406// priority would be stale, since only higher priority transfers can preempt lower priority ones.
1400- static bool rx_session_should_admit (const rx_session_t * const ses ,
1401- const canard_us_t ts ,
1402- const canard_prio_t priority ,
1403- const bool start ,
1404- const bool toggle ,
1405- const uint64_t seqno ,
1406- const byte_t iface_index )
1407+ // On a transfer-ID timeout the seqno is bumped by a full transfer-ID timeout to ensure that it becomes the new
1408+ // frontier matching the current transfer-ID while maintaining monotonicity.
1409+ static uint64_t rx_session_solve_admission (const rx_session_t * const ses ,
1410+ const canard_us_t ts ,
1411+ const canard_prio_t priority ,
1412+ const bool start ,
1413+ const bool toggle ,
1414+ const byte_t transfer_id ,
1415+ const byte_t iface_index )
14071416{
14081417 // Continuation frames cannot create new state so their handling is simpler.
14091418 // They are only accepted if there is a slot with an exact match of all transfer parameters.
14101419 // We ignore the transfer-ID timeout to avoid breaking transfers that are preempted for a long time,
14111420 // and especially to allow reassembly of multi-frame transfers even when the transfer-ID timeout is zero.
14121421 if (!start ) {
14131422 const rx_slot_t * const slot = ses -> slots [priority ];
1414- return (slot != NULL ) && (slot -> transfer_id == (seqno & CANARD_TRANSFER_ID_MAX )) &&
1415- (slot -> iface_index == iface_index ) && (slot -> expected_toggle == toggle );
1423+ const bool admit = (slot != NULL ) && (slot -> transfer_id == transfer_id ) && (slot -> iface_index == iface_index ) &&
1424+ (slot -> expected_toggle == toggle );
1425+ return admit ? RX_SESSION_ADMISSION_CONTINUATION : RX_SESSION_ADMISSION_REJECTED ;
14161426 }
1427+
14171428 // This is a start frame, but before we allocate new state for it, we must ensure that it is of the correct version.
14181429 const bool start_toggle = kind_is_v1 (ses -> owner -> kind ) ? 1 : 0 ;
14191430 if (toggle != start_toggle ) {
1420- return false ; // Wrong protocol version.
1431+ return RX_SESSION_ADMISSION_REJECTED ; // Wrong protocol version.
14211432 }
1433+
1434+ // It is best to postpone seqno derivation until the last moment because it is costly.
1435+ // Life would have been so much easier if we could just use normal non-wrapping IDs like we have in Cyphal/UDP!
1436+ const uint64_t frontier_global = rx_session_seqno_frontier (ses , canard_prio_exceptional );
1437+ const uint64_t seqno = rx_seqno_linearize (frontier_global , transfer_id );
1438+
14221439 // Duplicate start frames do not require special treatment because a duplicate frame can only follow the original
14231440 // without any frames belonging to the same transfer in between (see the assumptions). If we get a duplicate start,
14241441 // with a nonzero TID timeout it will be rejected as not-new; even if the timeout is zero, accepting the duplicate
@@ -1441,10 +1458,28 @@ static bool rx_session_should_admit(const rx_session_t* const ses,
14411458 // survives) we will still reject a new transfer arriving from a different interface if it happened to roll the
14421459 // same transfer-ID timeout. This is not an issue because we would still accept new transfers on the same iface,
14431460 // and after the RX_SESSION_TIMEOUT the session is destroyed and all new transfers will be accepted unconditionally.
1444- const bool seqno_new = seqno > rx_session_seqno_frontier (ses , priority );
1445- const bool iface_match = ses -> iface_index == iface_index ;
1446- const bool timed_out = ts > (ses -> last_admitted_start_ts + ses -> owner -> transfer_id_timeout );
1447- return (seqno_new && iface_match ) || (iface_match && timed_out ) || (timed_out && seqno_new );
1461+ const uint64_t frontier_priority = rx_session_seqno_frontier (ses , priority );
1462+ const bool seqno_new = seqno > frontier_priority ;
1463+ const bool iface_match = ses -> iface_index == iface_index ;
1464+ const bool timed_out = ts > (ses -> last_admission_ts + ses -> owner -> transfer_id_timeout );
1465+ const bool admit = (seqno_new && iface_match ) || (iface_match && timed_out ) || (timed_out && seqno_new );
1466+ if (!admit ) {
1467+ return RX_SESSION_ADMISSION_REJECTED ;
1468+ }
1469+
1470+ // It is vital that seqno is monotonically increasing even across timeouts, otherwise following a transfer-ID
1471+ // timeout further arrivals on higher priority levels may be rejected if their seqnos are greater.
1472+ // Instead of sweeping seqnos across all priority levels, we ensure monotonicity without breaking transfer-ID
1473+ // matching by bumping the seqno by the minimal required number of overflow periods.
1474+ uint64_t admitted_seqno = seqno ;
1475+ if (!seqno_new ) {
1476+ static const uint64_t mod = CANARD_TRANSFER_ID_MODULO ;
1477+ CANARD_ASSERT (frontier_priority >= admitted_seqno );
1478+ const uint64_t periods = ((frontier_priority - admitted_seqno ) + mod ) / mod ;
1479+ admitted_seqno += periods * mod ;
1480+ }
1481+ CANARD_ASSERT (admitted_seqno > frontier_priority );
1482+ return admitted_seqno ;
14481483}
14491484
14501485// Returns false on OOM, no other failure modes.
@@ -1473,26 +1508,33 @@ static bool rx_session_update(canard_subscription_t* const sub,
14731508 }
14741509
14751510 // Decide admit or drop.
1476- const uint64_t seqno =
1477- rx_seqno_linearize ( rx_session_seqno_frontier ( ses , canard_prio_exceptional ) , frame -> transfer_id );
1478- if (! rx_session_should_admit ( ses , ts , frame -> priority , frame -> start , frame -> toggle , seqno , iface_index ) ) {
1511+ const uint64_t seqno = rx_session_solve_admission (
1512+ ses , ts , frame -> priority , frame -> start , frame -> toggle , frame -> transfer_id , iface_index );
1513+ if (seqno == RX_SESSION_ADMISSION_REJECTED ) {
14791514 return true; // Rejection is not a failure.
14801515 }
14811516
14821517 // The frame must be accepted. If this is the start of a new transfer, we must update state.
1483- enlist_tail (& sub -> owner -> rx .list_session_by_animation , & ses -> list_animation );
1484- if (frame -> start ) {
1518+ if (seqno != RX_SESSION_ADMISSION_CONTINUATION ) {
1519+ CANARD_ASSERT (seqno <= RX_SEQNO_MAX );
1520+ CANARD_ASSERT (frame -> start );
1521+ // Animate only when a new transfer is started to manage load. Correctness-wise there is not much difference.
1522+ enlist_tail (& sub -> owner -> rx .list_session_by_animation , & ses -> list_animation );
1523+ // Destroy the old slot if it exists (if we're discarding a stale transfer).
14851524 if (ses -> slots [frame -> priority ] != NULL ) {
14861525 rx_slot_destroy (sub , ses -> slots [frame -> priority ]);
14871526 ses -> slots [frame -> priority ] = NULL ;
14881527 }
1489- if (!frame -> end ) { // more frames to follow, must store in-progress state
1528+ // If there are more frames to follow, we must store in-progress state for reassembly.
1529+ if (!frame -> end ) {
1530+ (void )rx_session_cleanup (ses , ts ); // Cleanup before allocating a new slot; don't do too often, is costly.
14901531 ses -> slots [frame -> priority ] = rx_slot_new (sub , ts , frame -> transfer_id , iface_index );
14911532 if (ses -> slots [frame -> priority ] == NULL ) {
14921533 sub -> owner -> err .oom ++ ;
14931534 return false;
14941535 }
14951536 }
1537+ // Register the new state only after we have a confirmation that we have memory to store the frame.
14961538 rx_session_record_admission (ses , frame -> priority , seqno , ts , iface_index );
14971539 }
14981540
@@ -1562,8 +1604,8 @@ void canard_poll(canard_t* const self, const uint_least8_t tx_ready_iface_bitmap
15621604 // transfer-ID timeout among all subscriptions, but this is a reasonable tradeoff for the reduced complexity.
15631605 rx_session_t * const ses = LIST_HEAD (self -> rx .list_session_by_animation , rx_session_t , list_animation );
15641606 if (ses != NULL ) {
1565- const size_t in_progress_slots = rx_session_scan (ses , now );
1566- if ((in_progress_slots == 0 ) && (ses -> last_admitted_start_ts < (now - ses -> owner -> transfer_id_timeout ))) {
1607+ const size_t in_progress_slots = rx_session_cleanup (ses , now );
1608+ if ((in_progress_slots == 0 ) && (ses -> last_admission_ts < (now - ses -> owner -> transfer_id_timeout ))) {
15671609 rx_session_destroy (ses );
15681610 }
15691611 }
0 commit comments