@@ -684,6 +684,17 @@ static tx_transfer_t* tx_transfer_find(udpard_tx_t* const tx, const uint64_t top
684684 cavl2_find (tx -> index_transfer , & key , & tx_cavl_compare_transfer ), tx_transfer_t , index_transfer );
685685}
686686
687+ /// True iff listed in at least one interface queue.
688+ static bool tx_is_pending (const udpard_tx_t * const tx , const tx_transfer_t * const tr )
689+ {
690+ for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
691+ if (is_listed (& tx -> queue [i ][tr -> priority ], & tr -> queue [i ])) {
692+ return true;
693+ }
694+ }
695+ return false;
696+ }
697+
687698static udpard_tx_feedback_t tx_make_feedback (const tx_transfer_t * const tr , const bool success )
688699{
689700 const udpard_tx_feedback_t fb = { .topic_hash = tr -> topic_hash ,
@@ -710,7 +721,7 @@ static tx_frame_t* tx_spool(udpard_tx_t* const tx,
710721 do {
711722 // Compute the size of the next frame, allocate it and link it up in the chain.
712723 const size_t progress = smaller (payload .size - offset , mtu );
713- tx_frame_t * const item = tx_frame_new (tx , memory , progress );
724+ tx_frame_t * const item = tx_frame_new (tx , memory , progress + HEADER_SIZE_BYTES );
714725 if (NULL == head ) {
715726 head = item ;
716727 } else {
@@ -861,6 +872,7 @@ static uint32_t tx_push(udpard_tx_t* const tx,
861872 }
862873 UDPARD_ASSERT ((tx -> enqueued_frames_count - enqueued_frames_before ) == n_frames );
863874 UDPARD_ASSERT (tx -> enqueued_frames_count <= tx -> enqueued_frames_limit );
875+ (void )enqueued_frames_before ;
864876
865877 // Enqueue for transmission immediately.
866878 for (uint_fast8_t i = 0 ; i < UDPARD_IFACE_COUNT_MAX ; i ++ ) {
@@ -887,7 +899,8 @@ static uint32_t tx_push(udpard_tx_t* const tx,
887899 if (out_transfer != NULL ) {
888900 * out_transfer = tr ;
889901 }
890- return n_frames ;
902+ UDPARD_ASSERT (n_frames <= UINT32_MAX );
903+ return (uint32_t )n_frames ;
891904}
892905
893906/// Handle an ACK received from a remote node.
@@ -926,7 +939,7 @@ static void tx_send_ack(udpard_rx_t* const rx,
926939 const uint32_t new_ep_mask = valid_ep_mask (remote .endpoints );
927940 const bool new_better = (new_ep_mask & (~prior_ep_mask )) != 0U ;
928941 if (!new_better ) {
929- return ; // Can we get a new ack? We have ack at home!
942+ return ; // Can we get an ack? We have ack at home!
930943 }
931944 if (prior != NULL ) {
932945 tx_transfer_free (tx , prior ); // avoid redundant acks for the same transfer
@@ -1108,8 +1121,10 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
11081121 while (true) {
11091122 // Find the highest-priority pending transfer.
11101123 tx_transfer_t * tr = NULL ;
1111- for (size_t prio = 0 ; prio < UDPARD_PRIORITY_COUNT ; prio ++ ) { // dear compiler, please unroll
1112- tx_transfer_t * const candidate = LIST_TAIL (self -> queue [ifindex ][prio ], tx_transfer_t , queue );
1124+ for (size_t prio = 0 ; prio < UDPARD_PRIORITY_COUNT ; prio ++ ) {
1125+ tx_transfer_t * const candidate = // This pointer arithmetic is ugly and perhaps should be improved
1126+ unbias_ptr (self -> queue [ifindex ][prio ].tail ,
1127+ offsetof(tx_transfer_t , queue ) + (sizeof (udpard_list_member_t ) * ifindex ));
11131128 if (candidate != NULL ) {
11141129 tr = candidate ;
11151130 break ;
@@ -1118,7 +1133,8 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
11181133 if (tr == NULL ) {
11191134 break ; // No pending transfers at the moment. Find something else to do.
11201135 }
1121- UDPARD_ASSERT (tr -> cursor != NULL ); // cannot be pending without payload, doesn't make sense
1136+ UDPARD_ASSERT (tr -> cursor [ifindex ] != NULL ); // cannot be pending without payload, doesn't make sense
1137+ UDPARD_ASSERT (tr -> priority < UDPARD_PRIORITY_COUNT );
11221138
11231139 // Eject the frame.
11241140 const tx_frame_t * const frame = tr -> cursor [ifindex ];
@@ -1128,6 +1144,7 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
11281144 const udpard_tx_ejection_t ejection = {
11291145 .now = now ,
11301146 .deadline = tr -> deadline ,
1147+ .iface_index = ifindex ,
11311148 .dscp = self -> dscp_value_per_priority [tr -> priority ],
11321149 .destination = tr -> destination [ifindex ],
11331150 .datagram = tx_frame_view (frame ),
@@ -1139,7 +1156,7 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
11391156
11401157 // Frame ejected successfully. Update the transfer state to get ready for the next frame.
11411158 if (last_attempt ) { // no need to keep frames that we will no longer use; free early to reduce pressure
1142- UDPARD_ASSERT (tr -> head == tr -> cursor ); // They go together on the last attempt.
1159+ UDPARD_ASSERT (tr -> head [ ifindex ] == tr -> cursor [ ifindex ]);
11431160 tr -> head [ifindex ] = frame_next ;
11441161 udpard_tx_refcount_dec (ejection .datagram );
11451162 }
@@ -1149,10 +1166,10 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now, con
11491166 if (last_frame ) {
11501167 tr -> cursor [ifindex ] = tr -> head [ifindex ];
11511168 delist (& self -> queue [ifindex ][tr -> priority ], & tr -> queue [ifindex ]); // no longer pending for transmission
1152- if (last_attempt && !tr -> reliable ) { // Best-effort transfers are removed immediately, no ack to wait for.
1153- tx_transfer_free (self , tr ); // We can invoke the feedback callback here if needed.
1169+ UDPARD_ASSERT (!last_attempt || (tr -> head [ifindex ] == NULL )); // this iface is done with the payload
1170+ if (last_attempt && !tr -> reliable && !tx_is_pending (self , tr )) { // remove early once all ifaces are done
1171+ tx_transfer_free (self , tr );
11541172 }
1155- UDPARD_ASSERT (!last_attempt || (tr -> head == NULL )); // the payload is no longer needed
11561173 }
11571174 }
11581175}
0 commit comments