@@ -3002,6 +3002,78 @@ static void test_rx_session_unordered_reject_old(void)
30023002 instrumented_allocator_reset (& alloc_payload );
30033003}
30043004
3005+ /// UNORDERED mode should drop duplicates while accepting earlier arrivals regardless of ordering.
3006+ static void test_rx_session_unordered_duplicates (void )
3007+ {
3008+ instrumented_allocator_t alloc_frag = { 0 };
3009+ instrumented_allocator_new (& alloc_frag );
3010+ const udpard_mem_resource_t mem_frag = instrumented_allocator_make_resource (& alloc_frag );
3011+ instrumented_allocator_t alloc_session = { 0 };
3012+ instrumented_allocator_new (& alloc_session );
3013+ const udpard_mem_resource_t mem_session = instrumented_allocator_make_resource (& alloc_session );
3014+ instrumented_allocator_t alloc_payload = { 0 };
3015+ instrumented_allocator_new (& alloc_payload );
3016+ const udpard_mem_resource_t mem_payload = instrumented_allocator_make_resource (& alloc_payload );
3017+ const udpard_mem_deleter_t del_payload = instrumented_allocator_make_deleter (& alloc_payload );
3018+ const udpard_rx_mem_resources_t rx_mem = { .fragment = mem_frag , .session = mem_session };
3019+ udpard_rx_t rx ;
3020+ callback_result_t cb_result = { 0 };
3021+ TEST_ASSERT (udpard_rx_new (& rx , & on_message , & on_collision , & on_ack_mandate ));
3022+ rx .user = & cb_result ;
3023+ udpard_rx_port_t port ;
3024+ const uint64_t topic_hash = 0x1111222233334444ULL ;
3025+ TEST_ASSERT (udpard_rx_port_new (& port , topic_hash , SIZE_MAX , UDPARD_RX_REORDERING_WINDOW_UNORDERED , rx_mem ));
3026+ const uint64_t remote_uid = 0xAABBCCDDEEFF0011ULL ;
3027+ rx_session_factory_args_t fac_args = {
3028+ .owner = & port ,
3029+ .sessions_by_animation = & rx .list_session_by_animation ,
3030+ .remote_uid = remote_uid ,
3031+ .now = 0 ,
3032+ };
3033+ rx_session_t * const ses = (rx_session_t * )cavl2_find_or_insert (& port .index_session_by_remote_uid ,
3034+ & remote_uid ,
3035+ & cavl_compare_rx_session_by_remote_uid ,
3036+ & fac_args ,
3037+ & cavl_factory_rx_session_by_remote_uid );
3038+ TEST_ASSERT_NOT_NULL (ses );
3039+ // Feed a mix of fresh transfers followed by duplicates; only the first four should be accepted.
3040+ meta_t meta = { .priority = udpard_prio_fast ,
3041+ .flag_ack = false,
3042+ .transfer_payload_size = 4 ,
3043+ .transfer_id = 1100 ,
3044+ .sender_uid = remote_uid ,
3045+ .topic_hash = topic_hash };
3046+ udpard_us_t now = 0 ;
3047+ const uint64_t tids [] = { 1100 , 1000 , 4000 , 4100 , 1000 , 1100 };
3048+ for (size_t i = 0 ; i < sizeof (tids ) / sizeof (tids [0 ]); i ++ ) {
3049+ meta .transfer_id = tids [i ];
3050+ char payload [4 ] = { (char )('A' + (int )(i % 26 )), (char )('a' + (int )(i % 26 )), 'X' , '\0' };
3051+ now += 100 ;
3052+ rx_session_update (ses ,
3053+ & rx ,
3054+ now ,
3055+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
3056+ make_frame (meta , mem_payload , payload , 0 , 4 ),
3057+ del_payload ,
3058+ 0 );
3059+ }
3060+ TEST_ASSERT_EQUAL (4 , cb_result .message .count );
3061+ TEST_ASSERT_EQUAL (1100 , cb_result .message .history [3 ].transfer_id );
3062+ TEST_ASSERT_EQUAL (1000 , cb_result .message .history [2 ].transfer_id );
3063+ TEST_ASSERT_EQUAL (4000 , cb_result .message .history [1 ].transfer_id );
3064+ TEST_ASSERT_EQUAL (4100 , cb_result .message .history [0 ].transfer_id );
3065+ for (size_t i = 0 ; i < cb_result .message .count ; i ++ ) {
3066+ udpard_fragment_free_all (cb_result .message .history [i ].payload , mem_frag );
3067+ }
3068+ TEST_ASSERT_EQUAL_size_t (0 , alloc_frag .allocated_fragments );
3069+ TEST_ASSERT_EQUAL_size_t (0 , alloc_payload .allocated_fragments );
3070+ udpard_rx_port_free (& rx , & port );
3071+ TEST_ASSERT_EQUAL_size_t (0 , alloc_session .allocated_fragments );
3072+ instrumented_allocator_reset (& alloc_frag );
3073+ instrumented_allocator_reset (& alloc_session );
3074+ instrumented_allocator_reset (& alloc_payload );
3075+ }
3076+
30053077/// Send transfers 1, 3, 10000, 2 in the ORDERED mode; ensure 2 is rejected because it's late after 3.
30063078static void test_rx_session_ordered_reject_stale_after_jump (void )
30073079{
@@ -3123,6 +3195,77 @@ static void test_rx_session_ordered_reject_stale_after_jump(void)
31233195 instrumented_allocator_reset (& alloc_payload );
31243196}
31253197
3198+ /// ORDERED mode with zero reordering delay should accept only strictly increasing IDs.
3199+ static void test_rx_session_ordered_zero_reordering_window (void )
3200+ {
3201+ instrumented_allocator_t alloc_frag = { 0 };
3202+ instrumented_allocator_new (& alloc_frag );
3203+ const udpard_mem_resource_t mem_frag = instrumented_allocator_make_resource (& alloc_frag );
3204+ instrumented_allocator_t alloc_session = { 0 };
3205+ instrumented_allocator_new (& alloc_session );
3206+ const udpard_mem_resource_t mem_session = instrumented_allocator_make_resource (& alloc_session );
3207+ instrumented_allocator_t alloc_payload = { 0 };
3208+ instrumented_allocator_new (& alloc_payload );
3209+ const udpard_mem_resource_t mem_payload = instrumented_allocator_make_resource (& alloc_payload );
3210+ const udpard_mem_deleter_t del_payload = instrumented_allocator_make_deleter (& alloc_payload );
3211+ const udpard_rx_mem_resources_t rx_mem = { .fragment = mem_frag , .session = mem_session };
3212+ udpard_rx_t rx ;
3213+ callback_result_t cb_result = { 0 };
3214+ TEST_ASSERT (udpard_rx_new (& rx , & on_message , & on_collision , & on_ack_mandate ));
3215+ rx .user = & cb_result ;
3216+ udpard_rx_port_t port ;
3217+ const uint64_t topic_hash = 0x9999888877776666ULL ;
3218+ TEST_ASSERT (udpard_rx_port_new (& port , topic_hash , SIZE_MAX , 0 , rx_mem ));
3219+ const uint64_t remote_uid = 0x0A0B0C0D0E0F1011ULL ;
3220+ rx_session_factory_args_t fac_args = {
3221+ .owner = & port ,
3222+ .sessions_by_animation = & rx .list_session_by_animation ,
3223+ .remote_uid = remote_uid ,
3224+ .now = 0 ,
3225+ };
3226+ rx_session_t * const ses = (rx_session_t * )cavl2_find_or_insert (& port .index_session_by_remote_uid ,
3227+ & remote_uid ,
3228+ & cavl_compare_rx_session_by_remote_uid ,
3229+ & fac_args ,
3230+ & cavl_factory_rx_session_by_remote_uid );
3231+ TEST_ASSERT_NOT_NULL (ses );
3232+ // Zero reordering window: out-of-order IDs are rejected, so only 120, 140, 1120 are accepted.
3233+ meta_t meta = { .priority = udpard_prio_nominal ,
3234+ .flag_ack = false,
3235+ .transfer_payload_size = 3 ,
3236+ .transfer_id = 120 ,
3237+ .sender_uid = remote_uid ,
3238+ .topic_hash = topic_hash };
3239+ udpard_us_t now = 0 ;
3240+ const uint64_t tids [] = { 120 , 110 , 140 , 1120 , 130 };
3241+ for (size_t i = 0 ; i < sizeof (tids ) / sizeof (tids [0 ]); i ++ ) {
3242+ meta .transfer_id = tids [i ];
3243+ char payload [3 ] = { (char )('k' + (int )i ), (char )('K' + (int )i ), '\0' };
3244+ now += 50 ;
3245+ rx_session_update (ses ,
3246+ & rx ,
3247+ now ,
3248+ (udpard_udpip_ep_t ){ .ip = 0x0A000002 , .port = 0x2222 },
3249+ make_frame (meta , mem_payload , payload , 0 , 3 ),
3250+ del_payload ,
3251+ 0 );
3252+ }
3253+ TEST_ASSERT_EQUAL (3 , cb_result .message .count );
3254+ TEST_ASSERT_EQUAL (1120 , cb_result .message .history [0 ].transfer_id );
3255+ TEST_ASSERT_EQUAL (140 , cb_result .message .history [1 ].transfer_id );
3256+ TEST_ASSERT_EQUAL (120 , cb_result .message .history [2 ].transfer_id );
3257+ for (size_t i = 0 ; i < cb_result .message .count ; i ++ ) {
3258+ udpard_fragment_free_all (cb_result .message .history [i ].payload , mem_frag );
3259+ }
3260+ TEST_ASSERT_EQUAL_size_t (0 , alloc_frag .allocated_fragments );
3261+ TEST_ASSERT_EQUAL_size_t (0 , alloc_payload .allocated_fragments );
3262+ udpard_rx_port_free (& rx , & port );
3263+ TEST_ASSERT_EQUAL_size_t (0 , alloc_session .allocated_fragments );
3264+ instrumented_allocator_reset (& alloc_frag );
3265+ instrumented_allocator_reset (& alloc_session );
3266+ instrumented_allocator_reset (& alloc_payload );
3267+ }
3268+
31263269// --------------------------------------------- RX PORT ---------------------------------------------
31273270
31283271/// Exercises udpard_rx_port_push() across ORDERED and STATELESS ports, covering single- and multi-frame transfers.
@@ -3867,6 +4010,8 @@ int main(void)
38674010 RUN_TEST (test_rx_session_unordered );
38684011 RUN_TEST (test_rx_session_unordered_reject_old );
38694012 RUN_TEST (test_rx_session_ordered_reject_stale_after_jump );
4013+ RUN_TEST (test_rx_session_unordered_duplicates );
4014+ RUN_TEST (test_rx_session_ordered_zero_reordering_window );
38704015
38714016 RUN_TEST (test_rx_port );
38724017 RUN_TEST (test_rx_port_timeouts );
0 commit comments