@@ -3256,6 +3256,171 @@ static void test_port(void)
32563256 TEST_ASSERT_EQUAL_size_t (0 , alloc_payload .allocated_fragments );
32573257}
32583258
3259+ /// Starts a few transfers on multiple ports, lets them expire, and ensures cleanup in udpard_rx_poll().
3260+ static void test_port_timeouts (void )
3261+ {
3262+ instrumented_allocator_t alloc_frag = { 0 };
3263+ instrumented_allocator_new (& alloc_frag );
3264+ const udpard_mem_resource_t mem_frag = instrumented_allocator_make_resource (& alloc_frag );
3265+
3266+ instrumented_allocator_t alloc_session = { 0 };
3267+ instrumented_allocator_new (& alloc_session );
3268+ const udpard_mem_resource_t mem_session = instrumented_allocator_make_resource (& alloc_session );
3269+
3270+ instrumented_allocator_t alloc_payload = { 0 };
3271+ instrumented_allocator_new (& alloc_payload );
3272+ const udpard_mem_resource_t mem_payload = instrumented_allocator_make_resource (& alloc_payload );
3273+ const udpard_mem_deleter_t del_payload = instrumented_allocator_make_deleter (& alloc_payload );
3274+ const udpard_rx_memory_resources_t rx_mem = { .fragment = mem_frag , .session = mem_session };
3275+
3276+ udpard_rx_t rx ;
3277+ callback_result_t cb_result = { 0 };
3278+ TEST_ASSERT (udpard_rx_new (& rx , 0x6EC164169C3088B4ULL , rx_mem , & on_message , & on_collision , & on_ack_mandate ));
3279+ rx .user = & cb_result ;
3280+
3281+ udpard_rx_port_t port_a ;
3282+ udpard_rx_port_t port_b ;
3283+ const uint64_t topic_hash_a = 0x1111111111111111ULL ;
3284+ const uint64_t topic_hash_b = 0x2222222222222222ULL ;
3285+ TEST_ASSERT (udpard_rx_port_new (& port_a , topic_hash_a , 1000 , 20000 , rx_mem ));
3286+ TEST_ASSERT (udpard_rx_port_new (& port_b , topic_hash_b , 1000 , 20000 , rx_mem ));
3287+
3288+ udpard_us_t now = 1000 ;
3289+
3290+ // Remote A: start transfer 10 (incomplete) and 11 (complete) so 11 arms the reordering timer.
3291+ {
3292+ meta_t meta = { .priority = udpard_prio_nominal ,
3293+ .flag_ack = false,
3294+ .transfer_payload_size = 10 ,
3295+ .transfer_id = 10 ,
3296+ .sender_uid = 0xAAAAULL ,
3297+ .topic_hash = topic_hash_a };
3298+ const rx_frame_t frame = make_frame (meta , mem_payload , "ABCDEFGHIJ" , 0 , 5 );
3299+ byte_t dgram [HEADER_SIZE_BYTES + 5 ];
3300+ header_serialize (dgram , meta , 0 , 0 , frame .base .crc );
3301+ const byte_t payload_head [5 ] = { 'A' , 'B' , 'C' , 'D' , 'E' };
3302+ memcpy (dgram + HEADER_SIZE_BYTES , payload_head , sizeof (payload_head ));
3303+ mem_free (mem_payload , frame .base .origin .size , frame .base .origin .data );
3304+ void * push_payload = mem_payload .alloc (mem_payload .user , sizeof (dgram ));
3305+ memcpy (push_payload , dgram , sizeof (dgram ));
3306+ TEST_ASSERT (udpard_rx_port_push (& rx ,
3307+ & port_a ,
3308+ now ,
3309+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
3310+ (udpard_bytes_mut_t ){ .data = push_payload , .size = sizeof (dgram ) },
3311+ del_payload ,
3312+ 0 ));
3313+ meta .transfer_payload_size = 4 ;
3314+ meta .transfer_id = 11 ;
3315+ const rx_frame_t done_frame = make_frame (meta , mem_payload , "DONE" , 0 , 4 );
3316+ byte_t done_dgram [HEADER_SIZE_BYTES + 4 ];
3317+ header_serialize (done_dgram , meta , 0 , 0 , done_frame .base .crc );
3318+ const byte_t done_payload [4 ] = { 'D' , 'O' , 'N' , 'E' };
3319+ memcpy (done_dgram + HEADER_SIZE_BYTES , done_payload , sizeof (done_payload ));
3320+ mem_free (mem_payload , done_frame .base .origin .size , done_frame .base .origin .data );
3321+ void * push_done = mem_payload .alloc (mem_payload .user , sizeof (done_dgram ));
3322+ memcpy (push_done , done_dgram , sizeof (done_dgram ));
3323+ now += 1000 ;
3324+ TEST_ASSERT (udpard_rx_port_push (& rx ,
3325+ & port_a ,
3326+ now ,
3327+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
3328+ (udpard_bytes_mut_t ){ .data = push_done , .size = sizeof (done_dgram ) },
3329+ del_payload ,
3330+ 0 ));
3331+ }
3332+
3333+ // Remote B mirrors the same pattern to populate the reordering deadline tree with another entry.
3334+ {
3335+ meta_t meta = { .priority = udpard_prio_nominal ,
3336+ .flag_ack = false,
3337+ .transfer_payload_size = 6 ,
3338+ .transfer_id = 20 ,
3339+ .sender_uid = 0xBBBBULL ,
3340+ .topic_hash = topic_hash_b };
3341+ const rx_frame_t frame = make_frame (meta , mem_payload , "QRSTUV" , 0 , 3 );
3342+ byte_t dgram [HEADER_SIZE_BYTES + 3 ];
3343+ header_serialize (dgram , meta , 0 , 0 , frame .base .crc );
3344+ const byte_t payload_head [3 ] = { 'Q' , 'R' , 'S' };
3345+ memcpy (dgram + HEADER_SIZE_BYTES , payload_head , sizeof (payload_head ));
3346+ mem_free (mem_payload , frame .base .origin .size , frame .base .origin .data );
3347+ void * push_payload = mem_payload .alloc (mem_payload .user , sizeof (dgram ));
3348+ memcpy (push_payload , dgram , sizeof (dgram ));
3349+ now += 1000 ;
3350+ TEST_ASSERT (udpard_rx_port_push (& rx ,
3351+ & port_b ,
3352+ now ,
3353+ (udpard_udpip_ep_t ){ .ip = 0x0B000001 , .port = 0x5678 },
3354+ (udpard_bytes_mut_t ){ .data = push_payload , .size = sizeof (dgram ) },
3355+ del_payload ,
3356+ 0 ));
3357+ meta .transfer_payload_size = 5 ;
3358+ meta .transfer_id = 21 ;
3359+ const rx_frame_t done_frame = make_frame (meta , mem_payload , "READY" , 0 , 5 );
3360+ byte_t done_dgram [HEADER_SIZE_BYTES + 5 ];
3361+ header_serialize (done_dgram , meta , 0 , 0 , done_frame .base .crc );
3362+ const byte_t done_payload [5 ] = { 'R' , 'E' , 'A' , 'D' , 'Y' };
3363+ memcpy (done_dgram + HEADER_SIZE_BYTES , done_payload , sizeof (done_payload ));
3364+ mem_free (mem_payload , done_frame .base .origin .size , done_frame .base .origin .data );
3365+ void * push_done = mem_payload .alloc (mem_payload .user , sizeof (done_dgram ));
3366+ memcpy (push_done , done_dgram , sizeof (done_dgram ));
3367+ now += 1000 ;
3368+ TEST_ASSERT (udpard_rx_port_push (& rx ,
3369+ & port_b ,
3370+ now ,
3371+ (udpard_udpip_ep_t ){ .ip = 0x0B000001 , .port = 0x5678 },
3372+ (udpard_bytes_mut_t ){ .data = push_done , .size = sizeof (done_dgram ) },
3373+ del_payload ,
3374+ 0 ));
3375+ }
3376+
3377+ TEST_ASSERT_EQUAL (0 , cb_result .message .count );
3378+
3379+ // Advance past the session lifetime so the busy slots will be reset on the next arrival.
3380+ now += SESSION_LIFETIME + 5000 ;
3381+ {
3382+ meta_t meta = { .priority = udpard_prio_nominal ,
3383+ .flag_ack = false,
3384+ .transfer_payload_size = 3 ,
3385+ .transfer_id = 30 ,
3386+ .sender_uid = 0xAAAAULL ,
3387+ .topic_hash = topic_hash_a };
3388+ const rx_frame_t frame = make_frame (meta , mem_payload , "NEW" , 0 , 3 );
3389+ byte_t dgram [HEADER_SIZE_BYTES + 3 ];
3390+ header_serialize (dgram , meta , 0 , 0 , frame .base .crc );
3391+ const byte_t payload_head [3 ] = { 'N' , 'E' , 'W' };
3392+ memcpy (dgram + HEADER_SIZE_BYTES , payload_head , sizeof (payload_head ));
3393+ mem_free (mem_payload , frame .base .origin .size , frame .base .origin .data );
3394+ void * push_payload = mem_payload .alloc (mem_payload .user , sizeof (dgram ));
3395+ memcpy (push_payload , dgram , sizeof (dgram ));
3396+ TEST_ASSERT (udpard_rx_port_push (& rx ,
3397+ & port_a ,
3398+ now ,
3399+ (udpard_udpip_ep_t ){ .ip = 0x0A000001 , .port = 0x1234 },
3400+ (udpard_bytes_mut_t ){ .data = push_payload , .size = sizeof (dgram ) },
3401+ del_payload ,
3402+ 0 ));
3403+ }
3404+
3405+ // The late arrival should have ejected the earlier completed transfers.
3406+ TEST_ASSERT (cb_result .message .count >= 1 );
3407+ for (size_t i = 0 ; i < cb_result .message .count ; i ++ ) {
3408+ udpard_fragment_free_all (cb_result .message .history [i ].payload_head , mem_frag );
3409+ }
3410+ cb_result .message .count = 0 ;
3411+
3412+ // Let both sessions expire and be retired from poll.
3413+ udpard_rx_poll (& rx , now );
3414+ now += SESSION_LIFETIME + 1000 ;
3415+ udpard_rx_poll (& rx , now );
3416+
3417+ udpard_rx_free (& rx );
3418+
3419+ TEST_ASSERT_EQUAL_size_t (0 , alloc_frag .allocated_fragments );
3420+ TEST_ASSERT_EQUAL_size_t (0 , alloc_session .allocated_fragments );
3421+ TEST_ASSERT_EQUAL_size_t (0 , alloc_payload .allocated_fragments );
3422+ }
3423+
32593424void setUp (void ) {}
32603425
32613426void tearDown (void ) {}
@@ -3278,6 +3443,7 @@ int main(void)
32783443 RUN_TEST (test_session_unordered );
32793444
32803445 RUN_TEST (test_port );
3446+ RUN_TEST (test_port_timeouts );
32813447
32823448 return UNITY_END ();
32833449}
0 commit comments