@@ -1124,6 +1124,42 @@ typedef struct
11241124 rx_slot_t slots [RX_SLOT_COUNT ];
11251125} rx_session_t ;
11261126
1127+ static void rx_session_on_ack_mandate (const rx_session_t * const self ,
1128+ udpard_rx_t * const rx ,
1129+ const udpard_prio_t priority ,
1130+ const uint64_t transfer_id ,
1131+ const udpard_bytes_t payload_head )
1132+ {
1133+ udpard_rx_subscription_t * const subscription =
1134+ (self -> owner == & rx -> p2p_port ) ? NULL : (udpard_rx_subscription_t * )self -> owner ;
1135+ const udpard_rx_ack_mandate_t mandate = {
1136+ .remote = self -> remote , .priority = priority , .transfer_id = transfer_id , .payload_head = payload_head
1137+ };
1138+ UDPARD_ASSERT (payload_head .data != NULL || payload_head .size == 0U );
1139+ UDPARD_ASSERT (rx -> on_ack_mandate != NULL );
1140+ rx -> on_ack_mandate (rx , subscription , mandate );
1141+ }
1142+
1143+ /// The payload ownership is transferred to the application.
1144+ static void rx_session_on_message (const rx_session_t * const self , udpard_rx_t * const rx , rx_slot_t * const slot )
1145+ {
1146+ udpard_rx_subscription_t * const subscription =
1147+ (self -> owner == & rx -> p2p_port ) ? NULL : (udpard_rx_subscription_t * )self -> owner ;
1148+ const udpard_rx_transfer_t transfer = {
1149+ .timestamp = slot -> ts_min ,
1150+ .priority = slot -> priority ,
1151+ .transfer_id = slot -> transfer_id ,
1152+ .remote = self -> remote ,
1153+ .payload_size_stored = slot -> covered_prefix ,
1154+ .payload_size_wire = slot -> total_size ,
1155+ .payload_head = (udpard_fragment_t * )cavl2_min (slot -> fragments ),
1156+ .payload_root = (udpard_fragment_t * )slot -> fragments ,
1157+ };
1158+ slot -> fragments = NULL ; // Transfer ownership to the application.
1159+ UDPARD_ASSERT (rx -> on_message != NULL );
1160+ rx -> on_message (rx , subscription , transfer );
1161+ }
1162+
11271163static int32_t cavl_compare_rx_session_remote_uid (const void * const user , const udpard_tree_t * const node )
11281164{
11291165 const uint64_t uid_a = * (const uint64_t * )user ;
@@ -1244,21 +1280,6 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
12441280 break ; // No more slots can be ejected at this time.
12451281 }
12461282
1247- // Invoke the reception callback, transferring the payload ownership to the application.
1248- UDPARD_ASSERT ((slot != NULL ) && (slot -> state == rx_slot_done ));
1249- udpard_rx_subscription_t * const subscription = (self -> owner == & rx -> p2p_port ) // P2P special case
1250- ? NULL
1251- : (udpard_rx_subscription_t * )self -> owner ;
1252- const udpard_rx_transfer_t transfer = { .timestamp = slot -> ts_min ,
1253- .priority = slot -> priority ,
1254- .transfer_id = slot -> transfer_id ,
1255- .remote = self -> remote ,
1256- .payload_size_stored = slot -> covered_prefix ,
1257- .payload_size_wire = slot -> total_size ,
1258- .payload_head = (udpard_fragment_t * )cavl2_min (slot -> fragments ),
1259- .payload_root = (udpard_fragment_t * )slot -> fragments };
1260- rx -> on_message (rx , subscription , transfer );
1261-
12621283 // Slide the transfer-ID window to prevent duplicates and out-of-order transfers.
12631284 // Mark the current transfer as received.
12641285 // We always pick the next transfer to eject with the nearest transfer-ID, which guarantees that the other
@@ -1267,8 +1288,10 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
12671288 rx_transfer_id_window_slide (& self -> received , slot -> transfer_id );
12681289 rx_transfer_id_window_set (& self -> received , slot -> transfer_id );
12691290
1270- // Reset the slot, but don't free the payload because it's been moved to the application.
1271- slot -> fragments = NULL ;
1291+ // Invoke the reception callback, transferring the payload ownership to the application, then reset the slot.
1292+ UDPARD_ASSERT ((slot != NULL ) && (slot -> state == rx_slot_done ));
1293+ rx_session_on_message (self , rx , slot );
1294+ UDPARD_ASSERT (slot -> fragments == NULL ); // Payload ownership transferred to the application.
12721295 rx_slot_reset (slot , self -> owner -> memory .fragment );
12731296 }
12741297
@@ -1321,9 +1344,6 @@ static void rx_session_update(rx_session_t* const self,
13211344{
13221345 UDPARD_ASSERT (self -> remote .uid == frame .meta .sender_uid );
13231346 UDPARD_ASSERT (frame .meta .topic_hash == self -> owner -> topic_hash ); // must be checked by the caller beforehand
1324- udpard_rx_subscription_t * const subscription = (self -> owner == & rx -> p2p_port ) // P2P is a single special case port.
1325- ? NULL
1326- : (udpard_rx_subscription_t * )self -> owner ;
13271347
13281348 // Animate the session to prevent it from being retired.
13291349 enlist_head (& rx -> list_session_by_animation , & self -> list_by_animation );
@@ -1348,11 +1368,7 @@ static void rx_session_update(rx_session_t* const self,
13481368 // - We don't want to flood the network with duplicate ACKs for every fragment of a transfer.
13491369 // - The application may need to look at the head of the transfer to handle acks, which is in the first frame.
13501370 if ((status == rx_session_transfer_acknowledged ) && frame .meta .flag_ack && (frame .base .offset == 0U )) {
1351- const udpard_rx_ack_mandate_t mandate = { .remote = self -> remote ,
1352- .priority = frame .meta .priority ,
1353- .transfer_id = frame .meta .transfer_id ,
1354- .payload_head = frame .base .payload };
1355- rx -> on_ack_mandate (rx , subscription , mandate );
1371+ rx_session_on_ack_mandate (self , rx , frame .meta .priority , frame .meta .transfer_id , frame .base .payload );
13561372 }
13571373 // If the transfer is lost, we will never acknowledge it because we haven't received it,
13581374 // but some other subscriber might!
@@ -1421,16 +1437,11 @@ static void rx_session_update(rx_session_t* const self,
14211437 if (slot -> state == rx_slot_done ) {
14221438 UDPARD_ASSERT (rx_session_transfer_acknowledged == rx_session_check_transfer_status (self , slot -> transfer_id ));
14231439 if (frame .meta .flag_ack ) {
1424- const udpard_rx_ack_mandate_t mandate = {
1425- .remote = self -> remote ,
1426- .priority = slot -> priority ,
1427- .transfer_id = frame .meta .transfer_id ,
1428- .payload_head = ((udpard_fragment_t * )cavl2_min (slot -> fragments ))-> view ,
1429- };
1430- rx -> on_ack_mandate (rx , subscription , mandate );
1440+ rx_session_on_ack_mandate (
1441+ self , rx , slot -> priority , slot -> transfer_id , ((udpard_fragment_t * )cavl2_min (slot -> fragments ))-> view );
14311442 }
1432- // The final ejection procedure is a little complicated because we need to manage the reordering window
1433- // and possible obsolescence of other in-progress slots.
1443+ // In the ORDERED mode, the final ejection procedure is somewhat convoluted because we need to manage the
1444+ // reordering window and possible obsolescence of other in-progress slots.
14341445 rx_session_ordered_scan_slots (self , rx , ts , false);
14351446 }
14361447}
0 commit comments