@@ -2999,6 +2999,127 @@ static void test_rx_session_history(void)
29992999 instrumented_allocator_reset (& alloc_payload );
30003000}
30013001
3002+ // Send transfers 1, 3, 10000, 2 in the ORDERED mode; ensure 2 is rejected because it's late after 3.
3003+ static void test_rx_session_ordered_reject_stale_after_jump (void )
3004+ {
3005+ instrumented_allocator_t alloc_frag = { 0 };
3006+ instrumented_allocator_new (& alloc_frag );
3007+ const udpard_mem_resource_t mem_frag = instrumented_allocator_make_resource (& alloc_frag );
3008+ instrumented_allocator_t alloc_session = { 0 };
3009+ instrumented_allocator_new (& alloc_session );
3010+ const udpard_mem_resource_t mem_session = instrumented_allocator_make_resource (& alloc_session );
3011+ instrumented_allocator_t alloc_payload = { 0 };
3012+ instrumented_allocator_new (& alloc_payload );
3013+ const udpard_mem_resource_t mem_payload = instrumented_allocator_make_resource (& alloc_payload );
3014+ const udpard_mem_deleter_t del_payload = instrumented_allocator_make_deleter (& alloc_payload );
3015+ const udpard_rx_mem_resources_t rx_mem = { .fragment = mem_frag , .session = mem_session };
3016+ udpard_rx_t rx ;
3017+ TEST_ASSERT (udpard_rx_new (& rx , & on_message , & on_collision , & on_ack_mandate ));
3018+ callback_result_t cb_result = { 0 };
3019+ rx .user = & cb_result ;
3020+ udpard_rx_port_t port ;
3021+ const uint64_t topic_hash = 0x123456789ABCDEF0ULL ;
3022+ TEST_ASSERT (udpard_rx_port_new (& port , topic_hash , 1000 , 1000 , rx_mem ));
3023+ const uint64_t remote_uid = 0xDEADBEEFDEADBEEFULL ;
3024+ rx_session_factory_args_t fac_args = {
3025+ .owner = & port ,
3026+ .sessions_by_animation = & rx .list_session_by_animation ,
3027+ .remote_uid = remote_uid ,
3028+ .now = 0 ,
3029+ };
3030+ rx_session_t * const ses = (rx_session_t * )cavl2_find_or_insert (& port .index_session_by_remote_uid ,
3031+ & remote_uid ,
3032+ & cavl_compare_rx_session_by_remote_uid ,
3033+ & fac_args ,
3034+ & cavl_factory_rx_session_by_remote_uid );
3035+ TEST_ASSERT_NOT_NULL (ses );
3036+
3037+ // Send transfer #1.
3038+ udpard_us_t now = 0 ;
3039+ meta_t meta = { .priority = udpard_prio_nominal ,
3040+ .flag_ack = true,
3041+ .transfer_payload_size = 1 ,
3042+ .transfer_id = 1 ,
3043+ .sender_uid = remote_uid ,
3044+ .topic_hash = topic_hash };
3045+ now += 100 ;
3046+ rx_session_update (ses ,
3047+ & rx ,
3048+ now ,
3049+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1111 },
3050+ make_frame (meta , mem_payload , "a" , 0 , 1 ),
3051+ del_payload ,
3052+ 0 );
3053+ TEST_ASSERT_EQUAL (1 , cb_result .message .count );
3054+ TEST_ASSERT_EQUAL (1 , cb_result .ack_mandate .count );
3055+
3056+ // Send transfer #3. Transfer #2 is missing, so this one is interned.
3057+ meta .transfer_id = 3 ;
3058+ now += 100 ;
3059+ rx_session_update (ses ,
3060+ & rx ,
3061+ now ,
3062+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1111 },
3063+ make_frame (meta , mem_payload , "b" , 0 , 1 ),
3064+ del_payload ,
3065+ 0 );
3066+ TEST_ASSERT_EQUAL (1 , cb_result .message .count );
3067+ TEST_ASSERT_EQUAL (2 , cb_result .ack_mandate .count ); // all acked
3068+
3069+ // Send transfer #10000. The head is still at #1, so #10000 is interned as well.
3070+ meta .transfer_id = 10000 ;
3071+ meta .transfer_payload_size = 1 ;
3072+ meta .flag_ack = true;
3073+ now += 10 ;
3074+ rx_session_update (ses ,
3075+ & rx ,
3076+ now ,
3077+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1111 },
3078+ make_frame (meta , mem_payload , "c" , 0 , 1 ),
3079+ del_payload ,
3080+ 0 );
3081+ TEST_ASSERT_EQUAL (1 , cb_result .message .count ); // 3 is still interned, 10000 interned too (but acked).
3082+ TEST_ASSERT_EQUAL (3 , cb_result .ack_mandate .count ); // all acked
3083+
3084+ // Some time has passed and the reordering window is now closed. All transfers ejected.
3085+ now += port .reordering_window + 100 ;
3086+ udpard_rx_poll (& rx , now );
3087+ TEST_ASSERT_EQUAL (3 , cb_result .message .count ); // 1, 3, 10000 have been ejected.
3088+ TEST_ASSERT_EQUAL (3 , cb_result .ack_mandate .count );
3089+
3090+ // Send transfer #2. It is stale and must be rejected.
3091+ meta .transfer_id = 2 ;
3092+ meta .flag_ack = true;
3093+ now += 10 ;
3094+ rx_session_update (ses ,
3095+ & rx ,
3096+ now ,
3097+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1111 },
3098+ make_frame (meta , mem_payload , "d" , 0 , 1 ),
3099+ del_payload ,
3100+ 0 );
3101+ TEST_ASSERT_EQUAL (3 , cb_result .message .count ); // transfer 2 not ejected!
3102+ TEST_ASSERT_EQUAL (3 , cb_result .ack_mandate .count ); // transfer 2 must have been rejected!
3103+
3104+ // Make sure it's not ejected later.
3105+ now += port .reordering_window + 100 ;
3106+ udpard_rx_poll (& rx , now );
3107+ TEST_ASSERT_EQUAL (3 , cb_result .message .count );
3108+ TEST_ASSERT_EQUAL (3 , cb_result .ack_mandate .count );
3109+
3110+ // Clean up.
3111+ for (size_t i = 0 ; i < cb_result .message .count ; i ++ ) {
3112+ udpard_fragment_free_all (cb_result .message .history [i ].payload , mem_frag );
3113+ }
3114+ udpard_rx_port_free (& rx , & port );
3115+ TEST_ASSERT_EQUAL_size_t (0 , alloc_frag .allocated_fragments );
3116+ TEST_ASSERT_EQUAL_size_t (0 , alloc_payload .allocated_fragments );
3117+ TEST_ASSERT_EQUAL_size_t (0 , alloc_session .allocated_fragments );
3118+ instrumented_allocator_reset (& alloc_frag );
3119+ instrumented_allocator_reset (& alloc_session );
3120+ instrumented_allocator_reset (& alloc_payload );
3121+ }
3122+
30023123// --------------------------------------------- RX PORT ---------------------------------------------
30033124
30043125/// Exercises udpard_rx_port_push() across ORDERED and STATELESS ports, covering single- and multi-frame transfers.
@@ -3742,6 +3863,7 @@ int main(void)
37423863 RUN_TEST (test_rx_session_ordered );
37433864 RUN_TEST (test_rx_session_unordered );
37443865 RUN_TEST (test_rx_session_history );
3866+ RUN_TEST (test_rx_session_ordered_reject_stale_after_jump );
37453867
37463868 RUN_TEST (test_rx_port );
37473869 RUN_TEST (test_rx_port_timeouts );
0 commit comments