@@ -116,6 +116,8 @@ static int32_t cavl_compare_fragment_end(const void* const user, const udpard_tr
116116 return 0 ; // clang-format on
117117}
118118
119+ static bool validate_ep (const udpard_udpip_ep_t ep ) { return (ep .port != 0 ) && (ep .ip != 0 ) && (ep .ip != UINT32_MAX ); }
120+
119121udpard_udpip_ep_t udpard_make_subject_endpoint (const uint32_t subject_id )
120122{
121123 return (udpard_udpip_ep_t ){ .ip = IPv4_MCAST_PREFIX | (subject_id & IPv4_MCAST_SUFFIX_MASK ), .port = UDP_PORT };
@@ -1647,3 +1649,50 @@ void udpard_rx_port_free(udpard_rx_t* const rx, udpard_rx_port_t* const port)
16471649 }
16481650 }
16491651}
1652+
1653+ bool udpard_rx_port_push (udpard_rx_t * const rx ,
1654+ udpard_rx_port_t * const port ,
1655+ const udpard_us_t timestamp ,
1656+ const udpard_udpip_ep_t source_ep ,
1657+ const udpard_bytes_mut_t datagram_payload ,
1658+ const udpard_mem_deleter_t payload_deleter ,
1659+ const uint_fast8_t redundant_iface_index )
1660+ {
1661+ const bool ok = (rx != NULL ) && (port != NULL ) && (timestamp >= 0 ) && validate_ep (source_ep ) &&
1662+ (datagram_payload .data != NULL ) && (payload_deleter .free != NULL ) &&
1663+ (redundant_iface_index < UDPARD_NETWORK_INTERFACE_COUNT_MAX ) && (!port -> invoked );
1664+ if (ok ) {
1665+ port -> invoked = true;
1666+
1667+ // Parse and validate the frame.
1668+ udpard_bytes_mut_t payload = { 0 };
1669+ rx_frame_t frame = { 0 };
1670+ uint32_t frame_index = 0 ;
1671+ uint32_t offset_32 = 0 ;
1672+ const bool frame_valid =
1673+ header_deserialize (datagram_payload , & frame .meta , & frame_index , & offset_32 , & frame .base .crc , & payload );
1674+ frame .base .offset = (size_t )offset_32 ;
1675+ (void )frame_index ; // currently not used by this reassembler implementation.
1676+
1677+ // Process the frame.
1678+ if (frame_valid ) {
1679+ if (frame .meta .topic_hash == port -> topic_hash ) {
1680+ if (port -> reordering_window != UDPARD_REORDERING_WINDOW_STATELESS ) {
1681+ (void )NULL ; // TODO FIXME
1682+ } else {
1683+ (void )NULL ; // TODO FIXME
1684+ }
1685+ } else { // Collisions are discovered early so that we don't attempt to allocate sessions for them.
1686+ mem_free_payload (payload_deleter , datagram_payload );
1687+ udpard_remote_t remote = { .uid = frame .meta .sender_uid , .endpoints = { 0 } };
1688+ remote .endpoints [redundant_iface_index ] = source_ep ;
1689+ rx -> on_collision (rx , port , remote );
1690+ }
1691+ } else {
1692+ mem_free_payload (payload_deleter , datagram_payload );
1693+ ++ rx -> errors_frame_malformed ;
1694+ }
1695+ port -> invoked = false;
1696+ }
1697+ return ok ;
1698+ }
0 commit comments