@@ -530,21 +530,19 @@ typedef struct
530530} tx_transfer_key_t ;
531531
532532/// The transmission scheduler maintains several indexes for the transfers in the pipeline.
533- /// All index operations are logarithmic in the number of scheduled transfers except for the pending queue,
534- /// where the complexity is constant.
535533///
536534/// The segregated priority queue only contains transfers that are ready for transmission.
537- /// The staged index contains only transfers that will be ready for transmission later, ordered by readiness time.
538- /// Transfers that will no longer be transmitted but are retained waiting for the ack are in neither of these.
539- ///
535+ /// The staged index contains transfers ordered by readiness time;
536+ /// transfers that will no longer be transmitted but are retained waiting for the ack are in neither of these.
540537/// The deadline index contains ALL transfers, ordered by their deadlines, used for purging expired transfers.
541538/// The transfer index contains ALL transfers, used for lookup by (topic_hash, transfer_id).
542539typedef struct tx_transfer_t
543540{
544- udpard_tree_t index_staged ; ///< Soonest to be ready on the left. Key: retry_at
541+ udpard_tree_t index_staged ; ///< Soonest to be ready on the left. Key: staged_until
545542 udpard_tree_t index_deadline ; ///< Soonest to expire on the left. Key: deadline
546543 udpard_tree_t index_transfer ; ///< Specific transfer lookup for ack management. Key: tx_transfer_key_t
547544 udpard_list_member_t queue [UDPARD_IFACE_COUNT_MAX ]; ///< Listed when ready for transmission.
545+ udpard_list_member_t agewise ; ///< Listed when created; oldest at the tail.
548546
549547 /// We always keep a pointer to the head, plus a cursor that scans the frames during transmission.
550548 /// Both are NULL if the payload is destroyed.
@@ -554,8 +552,8 @@ typedef struct tx_transfer_t
554552
555553 /// Mutable transmission state. All other fields, except for the index handles, are immutable.
556554 tx_frame_t * cursor [UDPARD_IFACE_COUNT_MAX ];
557- uint_fast8_t attempts [ UDPARD_IFACE_COUNT_MAX ]; ///< Does not overflow due to exponential backoff.
558- udpard_us_t retry_at ; ///< If retry_at >=deadline, this is the last attempt; frames can be freed as they go out .
555+ uint_fast8_t epoch ; ///< Does not overflow due to exponential backoff.
556+ udpard_us_t staged_until ; ///< If staged_until >=deadline, this is the last attempt; frames can be freed as leave .
559557
560558 /// Constant transfer properties supplied by the client.
561559 uint64_t topic_hash ;
@@ -586,7 +584,7 @@ static void tx_transfer_free_payload(tx_transfer_t* const tr)
586584 const tx_frame_t * frame = tr -> head [i ];
587585 while (frame != NULL ) {
588586 const tx_frame_t * const next = frame -> next ;
589- udpard_tx_refcount_dec (tx_frame_view (frame ));
587+ udpard_tx_refcount_dec (tx_frame_view (frame )); // TODO FIXME frame counting!
590588 frame = next ;
591589 }
592590 tr -> head [i ] = NULL ;
@@ -601,6 +599,7 @@ static void tx_transfer_free(udpard_tx_t* const tx, tx_transfer_t* const tr)
601599 for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
602600 delist (& tx -> queue [i ][tr -> priority ], & tr -> queue [i ]);
603601 }
602+ delist (& tx -> agewise , & tr -> agewise );
604603 if (cavl2_is_inserted (tx -> index_staged , & tr -> index_staged )) {
605604 cavl2_remove (& tx -> index_staged , & tr -> index_staged );
606605 }
@@ -610,35 +609,30 @@ static void tx_transfer_free(udpard_tx_t* const tx, tx_transfer_t* const tr)
610609}
611610
612611/// When the queue is exhausted, finds a transfer to sacrifice using simple heuristics and returns it.
613- /// The heuristics are subject to review and improvement.
614612/// Will return NULL if there are no transfers worth sacrificing (no queue space can be reclaimed).
615- static tx_transfer_t * tx_sacrifice ( udpard_tx_t * const tx )
616- {
617- uint16_t max_attempts = 0 ;
618- tx_transfer_t * out = NULL ;
619- // Scanning from the earliest deadline, meaning we prefer to sacrifice transfers that are the soonest to expire.
620- for ( tx_transfer_t * tr = CAVL2_TO_OWNER ( cavl2_min ( tx -> index_deadline ), tx_transfer_t , index_deadline ); tr != NULL ;
621- tr = CAVL2_TO_OWNER ( cavl2_next_greater ( & tr -> index_deadline ), tx_transfer_t , index_deadline )) {
622- uint16_t attempts = 0 ;
623- for ( uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
624- attempts += tr -> attempts [ i ];
625- }
626- if (( attempts > 0 ) && ! tr -> reliable ) { // Prefer non-reliable transfers that have been transmitted once.
627- out = tr ;
613+ /// We cannot simply stop accepting new transfers when the queue is full, because it may be caused by a single
614+ /// stalled interface holding back progress for all transfers.
615+ /// The heuristics are subject to review and improvement.
616+ static tx_transfer_t * tx_sacrifice ( udpard_tx_t * const tx ) { return LIST_TAIL ( tx -> agewise , tx_transfer_t , agewise ); }
617+
618+ static bool tx_ensure_queue_space ( udpard_tx_t * const tx , const size_t total_frames_needed )
619+ {
620+ if ( total_frames_needed > tx -> enqueued_frames_limit ) {
621+ return false; // not gonna happen
622+ }
623+ while ( total_frames_needed > ( tx -> enqueued_frames_limit - tx -> enqueued_frames_count )) {
624+ tx_transfer_t * const victim = tx_sacrifice ( tx );
625+ if ( victim == NULL ) {
628626 break ;
629627 }
630- if (attempts > max_attempts ) {
631- tr = out ;
632- max_attempts = attempts ;
633- }
634- return out ;
628+ tx_transfer_free (tx , victim );
635629 }
636- return out ;
630+ return total_frames_needed <= ( tx -> enqueued_frames_limit - tx -> enqueued_frames_count ) ;
637631}
638632
639633static int32_t tx_cavl_compare_staged (const void * const user , const udpard_tree_t * const node )
640634{
641- return ((* (const udpard_us_t * )user ) >= CAVL2_TO_OWNER (node , tx_transfer_t , index_staged )-> retry_at ) ? +1 : -1 ;
635+ return ((* (const udpard_us_t * )user ) >= CAVL2_TO_OWNER (node , tx_transfer_t , index_staged )-> staged_until ) ? +1 : -1 ;
642636}
643637static int32_t tx_cavl_compare_deadline (const void * const user , const udpard_tree_t * const node )
644638{
@@ -664,14 +658,10 @@ static tx_transfer_t* tx_transfer_find(udpard_tx_t* const tx, const uint64_t top
664658
665659static udpard_tx_feedback_t tx_make_feedback (const tx_transfer_t * const tr , const bool success )
666660{
667- udpard_tx_feedback_t fb = { .topic_hash = tr -> topic_hash ,
668- .transfer_id = tr -> transfer_id ,
669- .user_transfer_reference = tr -> user_transfer_reference ,
670- .success = success };
671- for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
672- fb .remote_ep [i ] = tr -> destination [i ];
673- fb .attempts [i ] = tr -> attempts [i ];
674- }
661+ const udpard_tx_feedback_t fb = { .topic_hash = tr -> topic_hash ,
662+ .transfer_id = tr -> transfer_id ,
663+ .user_transfer_reference = tr -> user_transfer_reference ,
664+ .success = success };
675665 return fb ;
676666}
677667
@@ -747,17 +737,14 @@ static uint32_t tx_push(udpard_tx_t* const tx,
747737 const size_t payload_size = payload .size ;
748738 const size_t mtu = larger (tx -> mtu , UDPARD_MTU_MIN );
749739 const size_t n_frames = larger (1 , (payload_size + mtu - 1U ) / mtu );
750- // TODO: tx_sacrifice()
740+ // TODO: tx_sacrifice() and choose duplication --- find matching allocations in the existing queues to reuse.
751741 if ((tx -> enqueued_frames_count + n_frames ) > tx -> enqueued_frames_limit ) {
752742 tx -> errors_capacity ++ ;
753743 } else {
754744 tx_transfer_t * const tr = mem_alloc (tx -> memory .transfer , sizeof (tx_transfer_t ));
755745 if (tr != NULL ) {
756746 mem_zero (sizeof (* tr ), tr );
757- tr -> attempts = 0 ;
758- tr -> retry_at = meta .flag_ack //
759- ? (now + tx_ack_timeout (tx -> ack_baseline_timeout , meta .priority , 0 ))
760- : HEAT_DEATH ;
747+ tr -> epoch = 0 ;
761748 tr -> topic_hash = meta .topic_hash ;
762749 tr -> transfer_id = meta .transfer_id ;
763750 tr -> deadline = deadline ;
@@ -766,10 +753,22 @@ static uint32_t tx_push(udpard_tx_t* const tx,
766753 tr -> destination = endpoint ;
767754 tr -> user_transfer_reference = user_transfer_reference ;
768755 tr -> feedback = feedback ;
756+ tr -> staged_until =
757+ meta .flag_ack ? (now + tx_ack_timeout (tx -> ack_baseline_timeout , tr -> priority , tr -> epoch )) : HEAT_DEATH ;
769758 tr -> head = tr -> cursor = tx_spool (tx -> memory , mtu , meta , payload );
770759 if (tr -> head != NULL ) {
771- // Schedule the transfer for transmission.
772- enlist_head (& tx -> queue [tr -> priority ], & tr -> queue );
760+ for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
761+ if (udpard_is_valid_endpoint (tr -> destination [i ])) {
762+ enlist_head (& tx -> queue [i ][tr -> priority ], & tr -> queue [i ]);
763+ }
764+ }
765+ if (tr -> deadline > tr -> staged_until ) { // only if retransmissions are possible
766+ (void )cavl2_find_or_insert (& tx -> index_staged ,
767+ & tr -> staged_until ,
768+ tx_cavl_compare_staged ,
769+ & tr -> index_staged ,
770+ cavl2_trivial_factory );
771+ }
773772 const tx_transfer_key_t key = { .topic_hash = tr -> topic_hash , .transfer_id = tr -> transfer_id };
774773 (void )cavl2_find_or_insert (& tx -> index_transfer , //
775774 & key ,
@@ -781,6 +780,7 @@ static uint32_t tx_push(udpard_tx_t* const tx,
781780 tx_cavl_compare_deadline ,
782781 & tr -> index_deadline ,
783782 cavl2_trivial_factory );
783+ enlist_head (& tx -> agewise , & tr -> agewise );
784784 tx -> enqueued_frames_count += n_frames ;
785785 UDPARD_ASSERT (tx -> enqueued_frames_count <= tx -> enqueued_frames_limit );
786786 out = (uint32_t )n_frames ;
@@ -942,14 +942,30 @@ static void tx_promote_staged(udpard_tx_t* const self, const udpard_us_t now)
942942{
943943 while (true) { // we can use next_greater instead of doing min search every time
944944 tx_transfer_t * const tr = CAVL2_TO_OWNER (cavl2_min (self -> index_staged ), tx_transfer_t , index_staged );
945- if ((tr != NULL ) && (now >= tr -> retry_at )) {
945+ if ((tr != NULL ) && (now >= tr -> staged_until )) {
946946 UDPARD_ASSERT (tr -> cursor != NULL ); // cannot stage without payload, doesn't make sense
947- // Update the state for the next retransmission.
948- tr -> retry_at += tx_ack_timeout (self -> ack_baseline_timeout , tr -> priority , tr -> attempts );
949- UDPARD_ASSERT (tr -> cursor == tr -> head );
950- // Remove from the staged index and add to the transmission queue.
951- enlist_head (& self -> queue [tr -> priority ], & tr -> queue );
947+
948+ // Reinsert into the staged index at the new position, when the next attempt is due.
949+ // Do not insert if this is the last attempt -- no point doing that since it will not be transmitted again.
952950 cavl2_remove (& self -> index_staged , & tr -> index_staged );
951+ tr -> epoch ++ ;
952+ tr -> staged_until += tx_ack_timeout (self -> ack_baseline_timeout , tr -> priority , tr -> epoch );
953+ if (tr -> deadline > tr -> staged_until ) {
954+ (void )cavl2_find_or_insert (& self -> index_staged ,
955+ & tr -> staged_until ,
956+ tx_cavl_compare_staged ,
957+ & tr -> index_staged ,
958+ cavl2_trivial_factory );
959+ }
960+
961+ // Enqueue for transmission unless it's been there since the last attempt (stalled interface?)
962+ for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
963+ UDPARD_ASSERT (tr -> cursor [i ] == tr -> head [i ]);
964+ if (udpard_is_valid_endpoint (tr -> destination [i ]) &&
965+ !is_listed (& self -> queue [i ][tr -> priority ], & tr -> queue [i ])) {
966+ enlist_head (& self -> queue [i ][tr -> priority ], & tr -> queue [i ]);
967+ }
968+ }
953969 } else {
954970 break ;
955971 }
@@ -974,13 +990,11 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
974990 UDPARD_ASSERT (!cavl2_is_inserted (self -> index_staged , & tr -> index_staged ));
975991 UDPARD_ASSERT (tr -> cursor != NULL ); // cannot be pending without payload, doesn't make sense
976992
977- // Compute the auxiliary states that will guide the ejection.
978- tx_frame_t * const frame = tr -> cursor [ifindex ];
979- tx_frame_t * const frame_next = frame -> next ;
980- const bool last_attempt = tr -> deadline <= tr -> retry_at ;
981- const bool last_frame = frame_next == NULL ; // if not last attempt we will have to rewind to head
982-
983993 // Eject the frame.
994+ const tx_frame_t * const frame = tr -> cursor [ifindex ];
995+ tx_frame_t * const frame_next = frame -> next ;
996+ const bool last_attempt = tr -> deadline <= tr -> staged_until ;
997+ const bool last_frame = frame_next == NULL ; // if not last attempt we will have to rewind to head.
984998 const udpard_tx_ejection_t ejection = {
985999 .now = now ,
9861000 .deadline = tr -> deadline ,
@@ -1005,17 +1019,11 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
10051019 // Finalize the transmission if this was the last frame of the transfer.
10061020 if (last_frame ) {
10071021 tr -> cursor [ifindex ] = tr -> head [ifindex ];
1008- tr -> attempts [ifindex ]++ ;
10091022 delist (& self -> queue [ifindex ][tr -> priority ], & tr -> queue [ifindex ]); // no longer pending for transmission
1010- if (last_attempt ) {
1011- if (!tr -> reliable ) { // Best-effort transfers are removed immediately.
1012- tx_transfer_free (self , tr ); // We can invoke the feedback callback here if needed.
1013- }
1014- // If this is the last attempt of a reliable transfer, it will wait for ack or expiration.
1015- } else { // Reinsert into the staged index for later retransmission if not acknowledged.
1016- cavl2_find_or_insert (
1017- & self -> index_staged , & tr -> retry_at , tx_cavl_compare_staged , & tr -> index_staged , cavl2_trivial_factory );
1023+ if (last_attempt && !tr -> reliable ) { // Best-effort transfers are removed immediately, no ack to wait for.
1024+ tx_transfer_free (self , tr ); // We can invoke the feedback callback here if needed.
10181025 }
1026+ UDPARD_ASSERT (!last_attempt || (tr -> head == NULL )); // the payload is no longer needed
10191027 }
10201028 }
10211029}
0 commit comments