@@ -1198,7 +1198,7 @@ static void rx_session_on_message(const rx_session_t* const self, udpard_rx_t* c
11981198 rx -> on_message (rx , self -> owner , transfer );
11991199}
12001200
1201- static int32_t cavl_compare_rx_session_remote_uid (const void * const user , const udpard_tree_t * const node )
1201+ static int32_t cavl_compare_rx_session_by_remote_uid (const void * const user , const udpard_tree_t * const node )
12021202{
12031203 const uint64_t uid_a = * (const uint64_t * )user ;
12041204 const uint64_t uid_b = ((const rx_session_t * )(const void * )node )-> remote .uid ; // clang-format off
@@ -1207,20 +1207,25 @@ static int32_t cavl_compare_rx_session_remote_uid(const void* const user, const
12071207 return 0 ; // clang-format on
12081208}
12091209
1210- static int32_t cavl_compare_rx_session_reordering_deadline (const void * const user , const udpard_tree_t * const node )
1210+ static int32_t cavl_compare_rx_session_by_reordering_deadline (const void * const user , const udpard_tree_t * const node )
12111211{
12121212 const udpard_us_t dl_a = * (const udpard_us_t * )user ;
12131213 const udpard_us_t dl_b = CAVL2_TO_OWNER (node , rx_session_t , index_reordering_window )-> reordering_window_deadline ;
12141214 return (dl_a >= dl_b ) ? +1 : -1 ;
12151215}
12161216
1217- /// Fully initializes a new session instance, making it ready to accept frames out of the box. NULL if OOM.
1218- static rx_session_t * rx_session_new (udpard_rx_port_t * const owner ,
1219- udpard_list_t * const sessions_by_animation ,
1220- const uint64_t remote_uid ,
1221- const udpard_us_t now )
1217+ typedef struct
1218+ {
1219+ udpard_rx_port_t * owner ;
1220+ udpard_list_t * sessions_by_animation ;
1221+ uint64_t remote_uid ;
1222+ udpard_us_t now ;
1223+ } rx_session_factory_args_t ;
1224+
1225+ static udpard_tree_t * cavl_factory_rx_session_by_remote_uid (void * const user )
12221226{
1223- rx_session_t * out = mem_alloc (owner -> memory .session , sizeof (rx_session_t ));
1227+ const rx_session_factory_args_t * const args = (const rx_session_factory_args_t * )user ;
1228+ rx_session_t * const out = mem_alloc (args -> owner -> memory .session , sizeof (rx_session_t ));
12241229 if (out != NULL ) {
12251230 mem_zero (sizeof (* out ), out );
12261231 out -> index_remote_uid = (udpard_tree_t ){ NULL , { NULL , NULL }, 0 };
@@ -1229,22 +1234,15 @@ static rx_session_t* rx_session_new(udpard_rx_port_t* const owner,
12291234 out -> list_by_animation = (udpard_list_member_t ){ NULL , NULL };
12301235 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
12311236 out -> slots [i ].fragments = NULL ;
1232- rx_slot_reset (& out -> slots [i ], owner -> memory .fragment );
1237+ rx_slot_reset (& out -> slots [i ], args -> owner -> memory .fragment );
12331238 }
1234- out -> remote .uid = remote_uid ;
1235- out -> owner = owner ;
1236- out -> last_animated_ts = now ;
1237- const udpard_tree_t * res = cavl2_find_or_insert (& owner -> index_session_by_remote_uid ,
1238- & out -> remote .uid ,
1239- & cavl_compare_rx_session_remote_uid ,
1240- & out -> index_remote_uid ,
1241- & cavl2_trivial_factory );
1242- UDPARD_ASSERT (res == & out -> index_remote_uid );
1243- (void )res ;
1244- out -> initialized = false;
1245- enlist_head (sessions_by_animation , & out -> list_by_animation );
1239+ out -> remote .uid = args -> remote_uid ;
1240+ out -> owner = args -> owner ;
1241+ out -> last_animated_ts = args -> now ;
1242+ out -> initialized = false;
1243+ enlist_head (args -> sessions_by_animation , & out -> list_by_animation );
12461244 }
1247- return out ;
1245+ return ( udpard_tree_t * ) out ;
12481246}
12491247
12501248/// Removes the instance from all indexes and frees all associated memory.
@@ -1309,7 +1307,7 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
13091307 self -> reordering_window_deadline = slot -> ts_min + self -> owner -> reordering_window ;
13101308 const udpard_tree_t * res = cavl2_find_or_insert (& rx -> index_session_by_reordering ,
13111309 & self -> reordering_window_deadline ,
1312- & cavl_compare_rx_session_reordering_deadline ,
1310+ & cavl_compare_rx_session_by_reordering_deadline ,
13131311 & self -> index_reordering_window ,
13141312 & cavl2_trivial_factory );
13151313 UDPARD_ASSERT (res == & self -> index_reordering_window );
@@ -1678,13 +1676,31 @@ bool udpard_rx_port_push(udpard_rx_t* const rx,
16781676 if (frame_valid ) {
16791677 if (frame .meta .topic_hash == port -> topic_hash ) {
16801678 if (port -> reordering_window != UDPARD_REORDERING_WINDOW_STATELESS ) {
1681- (void )NULL ; // TODO FIXME
1679+ // The normal reassembly mode, either ORDERED or UNORDERED. Requires state per remote sender.
1680+ rx_session_factory_args_t fac_args = {
1681+ .owner = port ,
1682+ .sessions_by_animation = & rx -> list_session_by_animation ,
1683+ .remote_uid = frame .meta .sender_uid ,
1684+ .now = timestamp ,
1685+ };
1686+ rx_session_t * const ses = // Will find an existing one or create a new one.
1687+ (rx_session_t * )cavl2_find_or_insert (& port -> index_session_by_remote_uid ,
1688+ & frame .meta .sender_uid ,
1689+ & cavl_compare_rx_session_by_remote_uid ,
1690+ & fac_args ,
1691+ & cavl_factory_rx_session_by_remote_uid );
1692+ if (ses != NULL ) {
1693+ rx_session_update (ses , rx , timestamp , source_ep , frame , payload_deleter , redundant_iface_index );
1694+ } else {
1695+ mem_free_payload (payload_deleter , datagram_payload );
1696+ ++ rx -> errors_oom ;
1697+ }
16821698 } else {
16831699 (void )NULL ; // TODO FIXME
16841700 }
16851701 } else { // Collisions are discovered early so that we don't attempt to allocate sessions for them.
16861702 mem_free_payload (payload_deleter , datagram_payload );
1687- udpard_remote_t remote = { .uid = frame .meta .sender_uid , . endpoints = { 0 } };
1703+ udpard_remote_t remote = { .uid = frame .meta .sender_uid };
16881704 remote .endpoints [redundant_iface_index ] = source_ep ;
16891705 rx -> on_collision (rx , port , remote );
16901706 }
0 commit comments