@@ -2650,6 +2650,214 @@ static void test_session_ordered(void)
26502650 instrumented_allocator_reset (& alloc_payload );
26512651}
26522652
2653+ static void test_session_unordered (void )
2654+ {
2655+ // Initialize the memory resources.
2656+ instrumented_allocator_t alloc_frag = { 0 };
2657+ instrumented_allocator_new (& alloc_frag );
2658+ const udpard_mem_resource_t mem_frag = instrumented_allocator_make_resource (& alloc_frag );
2659+
2660+ instrumented_allocator_t alloc_session = { 0 };
2661+ instrumented_allocator_new (& alloc_session );
2662+ const udpard_mem_resource_t mem_session = instrumented_allocator_make_resource (& alloc_session );
2663+
2664+ instrumented_allocator_t alloc_payload = { 0 };
2665+ instrumented_allocator_new (& alloc_payload );
2666+ const udpard_mem_resource_t mem_payload = instrumented_allocator_make_resource (& alloc_payload );
2667+ const udpard_mem_deleter_t del_payload = instrumented_allocator_make_deleter (& alloc_payload );
2668+
2669+ const udpard_rx_memory_resources_t rx_mem = { .fragment = mem_frag , .session = mem_session };
2670+
2671+ // Initialize the shared RX instance.
2672+ const uint64_t local_uid = 0xC3C8E4974254E1F5ULL ;
2673+ udpard_rx_t rx ;
2674+ TEST_ASSERT (udpard_rx_new (& rx , local_uid , rx_mem , & on_message , & on_collision , & on_ack_mandate ));
2675+ callback_result_t cb_result = { 0 };
2676+ rx .user = & cb_result ;
2677+ TEST_ASSERT_EQUAL (local_uid , rx .p2p_port .topic_hash );
2678+ TEST_ASSERT_EQUAL (UDPARD_REORDERING_WINDOW_UNORDERED , rx .p2p_port .reordering_window );
2679+
2680+ // Construct the session instance using the p2p port.
2681+ udpard_us_t now = 0 ;
2682+ const uint64_t remote_uid = 0xA1B2C3D4E5F60718ULL ;
2683+ rx .p2p_port .invoked = true; // simulate being invoked
2684+ rx_session_t * const ses = rx_session_new (& rx .p2p_port , & rx .list_session_by_animation , remote_uid , now );
2685+
2686+ // Verify construction outcome.
2687+ TEST_ASSERT_NOT_NULL (ses );
2688+ TEST_ASSERT_EQUAL_PTR (rx .list_session_by_animation .head , & ses -> list_by_animation );
2689+ TEST_ASSERT_EQUAL_PTR (rx .p2p_port .index_session_by_remote_uid , & ses -> index_remote_uid );
2690+ TEST_ASSERT_EQUAL (1 , alloc_session .allocated_fragments );
2691+
2692+ // Feed a valid single-frame transfer and ensure immediate ejection (no reordering delay).
2693+ meta_t meta = { .priority = udpard_prio_high ,
2694+ .flag_ack = true,
2695+ .transfer_payload_size = 5 ,
2696+ .transfer_id = 100 ,
2697+ .sender_uid = remote_uid ,
2698+ .topic_hash = local_uid }; // P2P uses UID as the topic hash
2699+ now += 1000 ;
2700+ rx_session_update (ses ,
2701+ & rx ,
2702+ now ,
2703+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2704+ make_frame (meta , mem_payload , "hello" , 0 , 5 ),
2705+ del_payload ,
2706+ 0 );
2707+
2708+ // Transfer is ejected immediately in UNORDERED mode.
2709+ TEST_ASSERT_EQUAL (1 , cb_result .message .count );
2710+ TEST_ASSERT_EQUAL_PTR (& rx , cb_result .rx );
2711+ TEST_ASSERT_NULL (cb_result .sub ); // p2p transfers have NULL subscription
2712+ TEST_ASSERT_EQUAL (1000 , cb_result .message .history [0 ].timestamp );
2713+ TEST_ASSERT_EQUAL (udpard_prio_high , cb_result .message .history [0 ].priority );
2714+ TEST_ASSERT_EQUAL (100 , cb_result .message .history [0 ].transfer_id );
2715+ TEST_ASSERT (transfer_payload_verify (& cb_result .message .history [0 ], 5 , "hello" , 5 ));
2716+
2717+ // ACK mandate should be generated.
2718+ TEST_ASSERT_EQUAL (1 , cb_result .ack_mandate .count );
2719+ TEST_ASSERT_EQUAL (100 , cb_result .ack_mandate .am .transfer_id );
2720+ TEST_ASSERT_EQUAL_size_t (5 , cb_result .ack_mandate .am .payload_head .size );
2721+ TEST_ASSERT_EQUAL_MEMORY ("hello" , cb_result .ack_mandate .am .payload_head .data , 5 );
2722+
2723+ // Free the transfer payload.
2724+ udpard_fragment_free_all (cb_result .message .history [0 ].payload_head , mem_frag );
2725+ TEST_ASSERT_EQUAL (0 , alloc_frag .allocated_fragments );
2726+ TEST_ASSERT_EQUAL (0 , alloc_payload .allocated_fragments );
2727+
2728+ // Feed out-of-order transfers: 103, then 102. Both should be ejected immediately in UNORDERED mode.
2729+ meta .transfer_id = 103 ;
2730+ meta .transfer_payload_size = 6 ;
2731+ meta .priority = udpard_prio_low ;
2732+ now += 1000 ;
2733+ rx_session_update (ses ,
2734+ & rx ,
2735+ now ,
2736+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2737+ make_frame (meta , mem_payload , "tid103" , 0 , 6 ),
2738+ del_payload ,
2739+ 0 );
2740+ TEST_ASSERT_EQUAL (2 , cb_result .message .count );
2741+ TEST_ASSERT_EQUAL (103 , cb_result .message .history [0 ].transfer_id );
2742+ TEST_ASSERT (transfer_payload_verify (& cb_result .message .history [0 ], 6 , "tid103" , 6 ));
2743+ udpard_fragment_free_all (cb_result .message .history [0 ].payload_head , mem_frag );
2744+
2745+ meta .transfer_id = 102 ;
2746+ meta .priority = udpard_prio_nominal ;
2747+ now += 1000 ;
2748+ rx_session_update (ses ,
2749+ & rx ,
2750+ now ,
2751+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2752+ make_frame (meta , mem_payload , "tid102" , 0 , 6 ),
2753+ del_payload ,
2754+ 0 );
2755+ // In UNORDERED mode, 102 is accepted even though it's "late" (arrives after 103).
2756+ TEST_ASSERT_EQUAL (3 , cb_result .message .count );
2757+ TEST_ASSERT_EQUAL (102 , cb_result .message .history [0 ].transfer_id );
2758+ TEST_ASSERT (transfer_payload_verify (& cb_result .message .history [0 ], 6 , "tid102" , 6 ));
2759+ udpard_fragment_free_all (cb_result .message .history [0 ].payload_head , mem_frag );
2760+
2761+ // Verify that duplicates are still rejected.
2762+ meta .transfer_id = 103 ; // repeat of a received transfer
2763+ now += 1000 ;
2764+ rx_session_update (ses ,
2765+ & rx ,
2766+ now ,
2767+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2768+ make_frame (meta , mem_payload , "dup103" , 0 , 6 ),
2769+ del_payload ,
2770+ 0 );
2771+ TEST_ASSERT_EQUAL (3 , cb_result .message .count ); // no new message
2772+ TEST_ASSERT_EQUAL (0 , alloc_payload .allocated_fragments ); // payload was freed
2773+
2774+ // Repeat duplicate should still trigger ACK if requested on first frame.
2775+ TEST_ASSERT_EQUAL (4 , cb_result .ack_mandate .count ); // ACK generated for duplicate
2776+ TEST_ASSERT_EQUAL (103 , cb_result .ack_mandate .am .transfer_id );
2777+
2778+ // Test multi-frame transfer in UNORDERED mode.
2779+ meta .transfer_id = 200 ;
2780+ meta .transfer_payload_size = 10 ;
2781+ meta .priority = udpard_prio_fast ;
2782+ meta .flag_ack = true;
2783+ now += 1000 ;
2784+ const udpard_us_t ts_200 = now ;
2785+ // Send second frame first.
2786+ rx_session_update (ses ,
2787+ & rx ,
2788+ now ,
2789+ (udpard_udpip_ep_t ){ .ip = 0x0A000002 , .port = 0x5678 },
2790+ make_frame (meta , mem_payload , "0123456789" , 5 , 5 ),
2791+ del_payload ,
2792+ 1 );
2793+ TEST_ASSERT_EQUAL (3 , cb_result .message .count ); // not complete yet
2794+ TEST_ASSERT_EQUAL (1 , alloc_frag .allocated_fragments );
2795+ TEST_ASSERT_EQUAL (1 , alloc_payload .allocated_fragments );
2796+
2797+ // Send first frame to complete the transfer.
2798+ now += 500 ;
2799+ rx_session_update (ses ,
2800+ & rx ,
2801+ now ,
2802+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2803+ make_frame (meta , mem_payload , "0123456789" , 0 , 5 ),
2804+ del_payload ,
2805+ 0 );
2806+ // Transfer is completed and ejected immediately.
2807+ TEST_ASSERT_EQUAL (4 , cb_result .message .count );
2808+ TEST_ASSERT_EQUAL (ts_200 , cb_result .message .history [0 ].timestamp ); // earliest frame timestamp
2809+ TEST_ASSERT_EQUAL (udpard_prio_fast , cb_result .message .history [0 ].priority );
2810+ TEST_ASSERT_EQUAL (200 , cb_result .message .history [0 ].transfer_id );
2811+ TEST_ASSERT (transfer_payload_verify (& cb_result .message .history [0 ], 10 , "0123456789" , 10 ));
2812+ // Return path discovered from both interfaces.
2813+ TEST_ASSERT_EQUAL (0x0A000001 , cb_result .message .history [0 ].remote .endpoints [0 ].ip );
2814+ TEST_ASSERT_EQUAL (0x0A000002 , cb_result .message .history [0 ].remote .endpoints [1 ].ip );
2815+ TEST_ASSERT_EQUAL (0x1234 , cb_result .message .history [0 ].remote .endpoints [0 ].port );
2816+ TEST_ASSERT_EQUAL (0x5678 , cb_result .message .history [0 ].remote .endpoints [1 ].port );
2817+ udpard_fragment_free_all (cb_result .message .history [0 ].payload_head , mem_frag );
2818+
2819+ // ACK mandate generated upon completion.
2820+ TEST_ASSERT_EQUAL (5 , cb_result .ack_mandate .count );
2821+ TEST_ASSERT_EQUAL (200 , cb_result .ack_mandate .am .transfer_id );
2822+
2823+ // Verify that polling doesn't affect UNORDERED mode (no reordering window processing).
2824+ TEST_ASSERT_EQUAL (0 , alloc_frag .allocated_fragments );
2825+ TEST_ASSERT_EQUAL (0 , alloc_payload .allocated_fragments );
2826+ rx .p2p_port .invoked = false;
2827+ udpard_rx_poll (& rx , now + 1000000 ); // advance time significantly
2828+ TEST_ASSERT_EQUAL (4 , cb_result .message .count ); // no change
2829+ rx .p2p_port .invoked = true;
2830+
2831+ // Test that transfer-ID window works correctly in UNORDERED mode.
2832+ // Transfers far outside the window (very old) should still be rejected as duplicates if within the window,
2833+ // but truly old ones outside the window are treated as new (since they wrapped around).
2834+ // The head is now at 200 (most recently ejected). Sending 200 again should be rejected as duplicate.
2835+ meta .transfer_id = 200 ;
2836+ meta .transfer_payload_size = 5 ;
2837+ now += 1000 ;
2838+ rx_session_update (ses ,
2839+ & rx ,
2840+ now ,
2841+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
2842+ make_frame (meta , mem_payload , "dup00" , 0 , 5 ),
2843+ del_payload ,
2844+ 0 );
2845+ TEST_ASSERT_EQUAL (4 , cb_result .message .count ); // duplicate rejected, count unchanged
2846+ TEST_ASSERT_EQUAL (0 , alloc_payload .allocated_fragments ); // payload was freed
2847+
2848+ // Verify session cleanup on timeout.
2849+ now += SESSION_LIFETIME ;
2850+ rx .p2p_port .invoked = false;
2851+ udpard_rx_poll (& rx , now );
2852+ TEST_ASSERT_EQUAL (0 , alloc_frag .allocated_fragments );
2853+ TEST_ASSERT_EQUAL (0 , alloc_session .allocated_fragments );
2854+ TEST_ASSERT_EQUAL (0 , alloc_payload .allocated_fragments );
2855+
2856+ instrumented_allocator_reset (& alloc_frag );
2857+ instrumented_allocator_reset (& alloc_session );
2858+ instrumented_allocator_reset (& alloc_payload );
2859+ }
2860+
26532861void setUp (void ) {}
26542862
26552863void tearDown (void ) {}
@@ -2669,6 +2877,7 @@ int main(void)
26692877 RUN_TEST (test_rx_slot_update );
26702878
26712879 RUN_TEST (test_session_ordered );
2880+ RUN_TEST (test_session_unordered );
26722881
26732882 return UNITY_END ();
26742883}
0 commit comments