@@ -588,8 +588,8 @@ typedef struct tx_transfer_t
588588
589589 /// Mutable transmission state. All other fields, except for the index handles, are immutable.
590590 tx_frame_t * cursor [UDPARD_IFACE_COUNT_MAX ];
591- uint_fast8_t epoch ; ///< Does not overflow due to exponential backoff; e.g. 1us with epoch=48 => 9 years.
592- udpard_us_t staged_until ; ///< If staged_until>=deadline, this is the last attempt; frames can be freed on the go.
591+ uint_fast8_t epoch ; ///< Does not overflow due to exponential backoff; e.g. 1us with epoch=48 => 9 years.
592+ udpard_us_t staged_until ;
593593
594594 /// Constant transfer properties supplied by the client.
595595 uint64_t topic_hash ;
@@ -789,9 +789,35 @@ static tx_frame_t* tx_spool(udpard_tx_t* const tx,
789789}
790790
791791/// Derives the ack timeout for an outgoing transfer.
792- static udpard_us_t tx_ack_timeout (const udpard_us_t baseline , const udpard_prio_t prio , const uint_fast8_t attempts )
792+ static udpard_us_t tx_ack_timeout (const udpard_us_t baseline , const udpard_prio_t prio , const size_t attempts )
793793{
794- return baseline * (1L << smaller ((size_t )prio + attempts , 62 )); // NOLINT(*-signed-bitwise)
794+ UDPARD_ASSERT (baseline > 0 );
795+ UDPARD_ASSERT (prio <= UDPARD_PRIORITY_MAX );
796+ return baseline * (1LL << smaller ((size_t )prio + attempts , 62 )); // NOLINT(*-signed-bitwise)
797+ }
798+
799+ /// Updates the next attempt time and inserts the transfer into the staged index, unless the next scheduled
800+ /// transmission time is too close to the deadline, in which case no further attempts will be made.
801+ /// When invoking for the first time, staged_until must be set to the time of the first attempt (usually now).
802+ /// Once can deduce whether further attempts are planned by checking if the transfer is in the staged index.
803+ ///
804+ /// The idea is that retransmitting the transfer too close to the deadline is pointless, because
805+ /// the ack may arrive just after the deadline and the transfer would be considered failed anyway.
806+ /// The solution is to add a small margin before the deadline. The margin is derived using a simple heuristic,
807+ /// which is subject to review and improvement later on (this is not an API-visible trait).
808+ static void tx_stage_if (udpard_tx_t * const tx , tx_transfer_t * const tr )
809+ {
810+ UDPARD_ASSERT (!cavl2_is_inserted (tx -> index_staged , & tr -> index_staged ));
811+ const uint_fast8_t epoch = tr -> epoch ++ ;
812+ const udpard_us_t timeout = tx_ack_timeout (tx -> ack_baseline_timeout , tr -> priority , epoch );
813+ tr -> staged_until += timeout ;
814+ if ((tr -> deadline - timeout ) >= tr -> staged_until ) {
815+ (void )cavl2_find_or_insert (& tx -> index_staged , //
816+ & tr -> staged_until ,
817+ tx_cavl_compare_staged ,
818+ & tr -> index_staged ,
819+ cavl2_trivial_factory );
820+ }
795821}
796822
797823/// A transfer can use the same fragments between two interfaces if
@@ -865,15 +891,14 @@ static uint32_t tx_push(udpard_tx_t* const tx,
865891 }
866892 mem_zero (sizeof (* tr ), tr );
867893 tr -> epoch = 0 ;
894+ tr -> staged_until = now ;
868895 tr -> topic_hash = meta .topic_hash ;
869896 tr -> transfer_id = meta .transfer_id ;
870897 tr -> deadline = deadline ;
871898 tr -> reliable = meta .flag_ack ;
872899 tr -> priority = meta .priority ;
873900 tr -> user_transfer_reference = user_transfer_reference ;
874901 tr -> feedback = feedback ;
875- tr -> staged_until =
876- meta .flag_ack ? (now + tx_ack_timeout (tx -> ack_baseline_timeout , tr -> priority , tr -> epoch )) : HEAT_DEATH ;
877902 for (size_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
878903 tr -> destination [i ] = endpoint [i ];
879904 tr -> head [i ] = tr -> cursor [i ] = NULL ;
@@ -927,10 +952,9 @@ static uint32_t tx_push(udpard_tx_t* const tx,
927952 enlist_head (& tx -> queue [i ][tr -> priority ], & tr -> queue [i ]);
928953 }
929954 }
930- // If retransmissions are possible, add to the staged index so that it is re-enqueued later unless acknowledged.
931- if (tr -> deadline > tr -> staged_until ) {
932- (void )cavl2_find_or_insert (
933- & tx -> index_staged , & tr -> staged_until , tx_cavl_compare_staged , & tr -> index_staged , cavl2_trivial_factory );
955+ // Add to the staged index so that it is repeatedly re-enqueued later until acknowledged or expired.
956+ if (meta .flag_ack ) {
957+ tx_stage_if (tx , tr );
934958 }
935959 // Add to the deadline index for expiration management.
936960 (void )cavl2_find_or_insert (
@@ -1172,17 +1196,9 @@ static void tx_promote_staged_transfers(udpard_tx_t* const self, const udpard_us
11721196 tx_transfer_t * const tr = CAVL2_TO_OWNER (cavl2_min (self -> index_staged ), tx_transfer_t , index_staged );
11731197 if ((tr != NULL ) && (now >= tr -> staged_until )) {
11741198 UDPARD_ASSERT (tr -> cursor != NULL ); // cannot stage without payload, doesn't make sense
1175- // Reinsert into the staged index at the new position, when the next attempt is due.
1176- // Do not insert if this is the last attempt -- no point doing that since it will not be transmitted again.
1199+ // Reinsert into the staged index at the new position, when the next attempt is due (if any).
11771200 cavl2_remove (& self -> index_staged , & tr -> index_staged );
1178- tr -> staged_until += tx_ack_timeout (self -> ack_baseline_timeout , tr -> priority , ++ (tr -> epoch ));
1179- if (tr -> deadline > tr -> staged_until ) {
1180- (void )cavl2_find_or_insert (& self -> index_staged ,
1181- & tr -> staged_until ,
1182- tx_cavl_compare_staged ,
1183- & tr -> index_staged ,
1184- cavl2_trivial_factory );
1185- }
1201+ tx_stage_if (self , tr );
11861202 // Enqueue for transmission unless it's been there since the last attempt (stalled interface?)
11871203 for (size_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
11881204 UDPARD_ASSERT (tr -> cursor [i ] == tr -> head [i ]);
0 commit comments