@@ -798,6 +798,26 @@ static size_t rx_fragment_tree_update_covered_prefix(udpard_tree_t* const root,
798798 return out ;
799799}
800800
801+ /// If NULL, the payload ownership could not be transferred due to OOM. The caller still owns the payload.
802+ static udpard_fragment_t * rx_fragment_new (const udpard_mem_resource_t memory ,
803+ const udpard_mem_deleter_t payload_deleter ,
804+ const rx_frame_base_t frame )
805+ {
806+ udpard_fragment_t * const mew = mem_alloc (memory , sizeof (udpard_fragment_t ));
807+ if (mew != NULL ) {
808+ mem_zero (sizeof (* mew ), mew );
809+ mew -> index_offset = (udpard_tree_t ){ NULL , { NULL , NULL }, 0 };
810+ mew -> next = NULL ;
811+ mew -> view .data = frame .payload .data ;
812+ mew -> view .size = frame .payload .size ;
813+ mew -> origin .data = frame .origin .data ;
814+ mew -> origin .size = frame .origin .size ;
815+ mew -> offset = frame .offset ;
816+ mew -> payload_deleter = payload_deleter ;
817+ }
818+ return mew ;
819+ }
820+
801821typedef enum
802822{
803823 rx_fragment_tree_rejected , ///< The newly received fragment was not needed for the tree and was freed.
@@ -884,19 +904,11 @@ static rx_fragment_tree_update_result_t rx_fragment_tree_update(udpard_tree_t**
884904 }
885905
886906 // Ensure we can allocate the fragment header for the new frame before pruning the tree to avoid data loss.
887- udpard_fragment_t * const mew = mem_alloc (fragment_memory , sizeof ( udpard_fragment_t ) );
907+ udpard_fragment_t * const mew = rx_fragment_new (fragment_memory , payload_deleter , frame );
888908 if (mew == NULL ) {
889909 mem_free_payload (payload_deleter , frame .origin );
890910 return rx_fragment_tree_oom ; // Cannot allocate fragment header. Maybe we will succeed later.
891911 }
892- mem_zero (sizeof (* mew ), mew );
893- mew -> next = NULL ;
894- mew -> view .data = frame .payload .data ;
895- mew -> view .size = frame .payload .size ;
896- mew -> origin .data = frame .origin .data ;
897- mew -> origin .size = frame .origin .size ;
898- mew -> offset = frame .offset ;
899- mew -> payload_deleter = payload_deleter ;
900912
901913 // The addition of a new fragment that joins adjacent fragments together into a larger contiguous block may
902914 // render smaller fragments crossing its boundaries redundant.
@@ -1648,6 +1660,76 @@ void udpard_rx_port_free(udpard_rx_t* const rx, udpard_rx_port_t* const port)
16481660 }
16491661}
16501662
1663+ /// Takes ownership of the frame payload.
1664+ static void rx_port_accept_stateful (udpard_rx_t * const rx ,
1665+ udpard_rx_port_t * const port ,
1666+ const udpard_us_t timestamp ,
1667+ const udpard_udpip_ep_t source_ep ,
1668+ const rx_frame_t frame ,
1669+ const udpard_mem_deleter_t payload_deleter ,
1670+ const uint_fast8_t redundant_iface_index )
1671+ {
1672+ rx_session_factory_args_t fac_args = {
1673+ .owner = port ,
1674+ .sessions_by_animation = & rx -> list_session_by_animation ,
1675+ .remote_uid = frame .meta .sender_uid ,
1676+ .now = timestamp ,
1677+ };
1678+ rx_session_t * const ses = // Will find an existing one or create a new one.
1679+ (rx_session_t * )cavl2_find_or_insert (& port -> index_session_by_remote_uid ,
1680+ & frame .meta .sender_uid ,
1681+ & cavl_compare_rx_session_by_remote_uid ,
1682+ & fac_args ,
1683+ & cavl_factory_rx_session_by_remote_uid );
1684+ if (ses != NULL ) {
1685+ rx_session_update (ses , rx , timestamp , source_ep , frame , payload_deleter , redundant_iface_index );
1686+ } else {
1687+ mem_free_payload (payload_deleter , frame .base .origin );
1688+ ++ rx -> errors_oom ;
1689+ }
1690+ }
1691+
1692+ /// Takes ownership of the frame payload.
1693+ static void rx_port_accept_stateless (udpard_rx_t * const rx ,
1694+ udpard_rx_port_t * const port ,
1695+ const udpard_us_t timestamp ,
1696+ const udpard_udpip_ep_t source_ep ,
1697+ const rx_frame_t frame ,
1698+ const udpard_mem_deleter_t payload_deleter ,
1699+ const uint_fast8_t redundant_iface_index )
1700+ {
1701+ const size_t required_size = smaller (port -> extent , frame .meta .transfer_payload_size );
1702+ const bool full_transfer = (frame .base .offset == 0 ) && (frame .base .payload .size >= required_size );
1703+ if (full_transfer ) {
1704+ // The fragment allocation is only needed to uphold the callback protocol.
1705+ // Maybe we could do something about it in the future to avoid this allocation.
1706+ udpard_fragment_t * const frag = rx_fragment_new (port -> memory .fragment , payload_deleter , frame .base );
1707+ if (frag != NULL ) {
1708+ udpard_remote_t remote = { .uid = frame .meta .sender_uid };
1709+ remote .endpoints [redundant_iface_index ] = source_ep ;
1710+ // The CRC is validated by the frame parser for the first frame of any transfer. It is certainly correct.
1711+ UDPARD_ASSERT (frame .base .crc == crc_full (frame .base .payload .size , frame .base .payload .data ));
1712+ const udpard_rx_transfer_t transfer = {
1713+ .timestamp = timestamp ,
1714+ .priority = frame .meta .priority ,
1715+ .transfer_id = frame .meta .transfer_id ,
1716+ .remote = remote ,
1717+ .payload_size_stored = required_size ,
1718+ .payload_size_wire = frame .meta .transfer_payload_size ,
1719+ .payload_head = frag ,
1720+ .payload_root = frag ,
1721+ };
1722+ rx -> on_message (rx , port , transfer );
1723+ } else {
1724+ mem_free_payload (payload_deleter , frame .base .origin );
1725+ ++ rx -> errors_oom ;
1726+ }
1727+ } else {
1728+ mem_free_payload (payload_deleter , frame .base .origin );
1729+ ++ rx -> errors_transfer_malformed ; // The stateless mode expects only single-frame transfers.
1730+ }
1731+ }
1732+
16511733bool udpard_rx_port_push (udpard_rx_t * const rx ,
16521734 udpard_rx_port_t * const port ,
16531735 const udpard_us_t timestamp ,
@@ -1661,7 +1743,6 @@ bool udpard_rx_port_push(udpard_rx_t* const rx,
16611743 (redundant_iface_index < UDPARD_NETWORK_INTERFACE_COUNT_MAX ) && (!port -> invoked );
16621744 if (ok ) {
16631745 port -> invoked = true;
1664-
16651746 // Parse and validate the frame.
16661747 udpard_bytes_mut_t payload = { 0 };
16671748 rx_frame_t frame = { 0 };
@@ -1671,41 +1752,22 @@ bool udpard_rx_port_push(udpard_rx_t* const rx,
16711752 header_deserialize (datagram_payload , & frame .meta , & frame_index , & offset_32 , & frame .base .crc , & payload );
16721753 frame .base .offset = (size_t )offset_32 ;
16731754 (void )frame_index ; // currently not used by this reassembler implementation.
1674-
1755+ UDPARD_ASSERT ((frame .base .origin .data == datagram_payload .data ) &&
1756+ (frame .base .origin .size == datagram_payload .size ));
16751757 // Process the frame.
16761758 if (frame_valid ) {
16771759 if (frame .meta .topic_hash == port -> topic_hash ) {
1678- if (port -> reordering_window != UDPARD_REORDERING_WINDOW_STATELESS ) {
1679- // The normal reassembly mode, either ORDERED or UNORDERED. Requires state per remote sender.
1680- rx_session_factory_args_t fac_args = {
1681- .owner = port ,
1682- .sessions_by_animation = & rx -> list_session_by_animation ,
1683- .remote_uid = frame .meta .sender_uid ,
1684- .now = timestamp ,
1685- };
1686- rx_session_t * const ses = // Will find an existing one or create a new one.
1687- (rx_session_t * )cavl2_find_or_insert (& port -> index_session_by_remote_uid ,
1688- & frame .meta .sender_uid ,
1689- & cavl_compare_rx_session_by_remote_uid ,
1690- & fac_args ,
1691- & cavl_factory_rx_session_by_remote_uid );
1692- if (ses != NULL ) {
1693- rx_session_update (ses , rx , timestamp , source_ep , frame , payload_deleter , redundant_iface_index );
1694- } else {
1695- mem_free_payload (payload_deleter , datagram_payload );
1696- ++ rx -> errors_oom ;
1697- }
1698- } else {
1699- (void )NULL ; // TODO FIXME
1700- }
1760+ const bool stateful = (port -> reordering_window != UDPARD_REORDERING_WINDOW_STATELESS );
1761+ (stateful ? rx_port_accept_stateful : rx_port_accept_stateless )(
1762+ rx , port , timestamp , source_ep , frame , payload_deleter , redundant_iface_index );
17011763 } else { // Collisions are discovered early so that we don't attempt to allocate sessions for them.
1702- mem_free_payload (payload_deleter , datagram_payload );
1764+ mem_free_payload (payload_deleter , frame . base . origin );
17031765 udpard_remote_t remote = { .uid = frame .meta .sender_uid };
17041766 remote .endpoints [redundant_iface_index ] = source_ep ;
17051767 rx -> on_collision (rx , port , remote );
17061768 }
17071769 } else {
1708- mem_free_payload (payload_deleter , datagram_payload );
1770+ mem_free_payload (payload_deleter , frame . base . origin );
17091771 ++ rx -> errors_frame_malformed ;
17101772 }
17111773 port -> invoked = false;
0 commit comments