@@ -18,18 +18,18 @@ namespace {
1818struct TransferKey
1919{
2020 uint64_t transfer_id;
21- size_t port_index ;
21+ uint64_t topic_hash ;
2222 bool operator ==(const TransferKey& other) const
2323 {
24- return (transfer_id == other.transfer_id ) && (port_index == other.port_index );
24+ return (transfer_id == other.transfer_id ) && (topic_hash == other.topic_hash );
2525 }
2626};
2727
2828struct TransferKeyHash
2929{
3030 size_t operator ()(const TransferKey& key) const
3131 {
32- return (std::hash<uint64_t >{}(key.transfer_id ) << 1U ) ^ std::hash<size_t >{}(key.port_index );
32+ return (std::hash<uint64_t >{}(key.transfer_id ) << 1U ) ^ std::hash<uint64_t >{}(key.topic_hash );
3333 }
3434};
3535
@@ -45,8 +45,7 @@ struct Context
4545 size_t received = 0 ;
4646 size_t collisions = 0 ;
4747 size_t ack_mandates = 0 ;
48- std::array<udpard_rx_port_t *, UDPARD_NETWORK_INTERFACE_COUNT_MAX> ports = {};
49- size_t port_count = 0 ;
48+ size_t truncated = 0 ;
5049 uint64_t remote_uid = 0 ;
5150 std::array<udpard_udpip_ep_t , UDPARD_NETWORK_INTERFACE_COUNT_MAX> remote_endpoints = {};
5251};
@@ -80,34 +79,37 @@ void shuffle_frames(std::vector<Arrival>& frames)
8079
8180void on_message (udpard_rx_t * const rx, udpard_rx_port_t * const port, const udpard_rx_transfer_t transfer)
8281{
83- auto * ctx = static_cast <Context*>(rx->user );
84- size_t port_index = ctx->port_count ;
85- for (size_t i = 0 ; i < ctx->port_count ; i++) {
86- if (ctx->ports [i] == port) {
87- port_index = i;
88- break ;
89- }
90- }
91- TEST_ASSERT (port_index < ctx->port_count );
92- const TransferKey key{ .transfer_id = transfer.transfer_id , .port_index = port_index };
82+ auto * const ctx = static_cast <Context*>(rx->user );
83+
84+ // Match the incoming transfer against the expected table keyed by topic hash and transfer-ID.
85+ const TransferKey key{ .transfer_id = transfer.transfer_id , .topic_hash = port->topic_hash };
9386 const auto it = ctx->expected .find (key);
9487 TEST_ASSERT (it != ctx->expected .end ());
88+
89+ // Gather fragments into a contiguous buffer so we can compare the stored prefix (payload may be truncated).
9590 std::vector<uint8_t > assembled (transfer.payload_size_stored );
9691 const size_t gathered = udpard_fragment_gather (
9792 transfer.payload , transfer.payload_size_stored , (transfer.payload_size_stored > 0U ) ? assembled.data () : nullptr );
9893 TEST_ASSERT_EQUAL_size_t (transfer.payload_size_stored , gathered);
99- TEST_ASSERT_EQUAL_size_t ( it->second .payload .size (), transfer. payload_size_stored );
94+ TEST_ASSERT_TRUE (transfer. payload_size_stored <= it->second .payload .size ());
10095 TEST_ASSERT_EQUAL_size_t (it->second .payload_size_wire , transfer.payload_size_wire );
10196 if (transfer.payload_size_stored > 0U ) {
10297 TEST_ASSERT_EQUAL_MEMORY (it->second .payload .data (), assembled.data (), transfer.payload_size_stored );
10398 }
99+
100+ // Verify remote and the return path discovery.
104101 TEST_ASSERT_EQUAL_UINT64 (ctx->remote_uid , transfer.remote .uid );
105102 for (size_t i = 0 ; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++) {
106103 if ((transfer.remote .endpoints [i].ip != 0U ) || (transfer.remote .endpoints [i].port != 0U )) {
107104 TEST_ASSERT_EQUAL_UINT32 (ctx->remote_endpoints [i].ip , transfer.remote .endpoints [i].ip );
108105 TEST_ASSERT_EQUAL_UINT16 (ctx->remote_endpoints [i].port , transfer.remote .endpoints [i].port );
109106 }
110107 }
108+ if (transfer.payload_size_stored < transfer.payload_size_wire ) {
109+ ctx->truncated ++;
110+ }
111+
112+ // Clean up.
111113 udpard_fragment_free_all (transfer.payload , port->memory .fragment );
112114 ctx->expected .erase (it);
113115 ctx->received ++;
@@ -129,17 +131,23 @@ void on_ack_mandate(udpard_rx_t* const rx, udpard_rx_port_t* const port, const u
129131 ctx->ack_mandates ++;
130132}
131133
134+ // / Randomized end-to-end TX/RX covering fragmentation, reordering, and extent-driven truncation.
132135void test_udpard_tx_rx_end_to_end ()
133136{
134137 seed_prng ();
138+
139+ // TX allocator setup and pipeline initialization.
135140 instrumented_allocator_t tx_alloc_frag{};
136141 instrumented_allocator_new (&tx_alloc_frag);
137142 instrumented_allocator_t tx_alloc_payload{};
138143 instrumented_allocator_new (&tx_alloc_payload);
144+ const udpard_mem_deleter_t tx_payload_deleter = instrumented_allocator_make_deleter (&tx_alloc_payload);
139145 const udpard_tx_mem_resources_t tx_mem{ .fragment = instrumented_allocator_make_resource (&tx_alloc_frag),
140146 .payload = instrumented_allocator_make_resource (&tx_alloc_payload) };
141147 udpard_tx_t tx;
142148 TEST_ASSERT_TRUE (udpard_tx_new (&tx, 0x0A0B0C0D0E0F1011ULL , 256 , tx_mem));
149+
150+ // RX allocator setup and shared RX instance with callbacks.
143151 instrumented_allocator_t rx_alloc_frag{};
144152 instrumented_allocator_new (&rx_alloc_frag);
145153 instrumented_allocator_t rx_alloc_session{};
@@ -148,59 +156,62 @@ void test_udpard_tx_rx_end_to_end()
148156 .fragment = instrumented_allocator_make_resource (&rx_alloc_frag) };
149157 udpard_rx_t rx;
150158 TEST_ASSERT_TRUE (udpard_rx_new (&rx, &on_message, &on_collision, &on_ack_mandate));
151- const udpard_mem_deleter_t tx_payload_deleter = instrumented_allocator_make_deleter (&tx_alloc_payload);
152- std::array< udpard_rx_port_t , 3 > ports{};
159+
160+ // Test parameters.
153161 constexpr std::array<uint64_t , 3 > topic_hashes{ 0x123456789ABCDEF0ULL ,
154162 0x0FEDCBA987654321ULL ,
155163 0x00ACE00ACE00ACEULL };
156164 constexpr std::array<uint32_t , 3 > subject_ids{ 10U , 20U , 30U };
157165 constexpr std::array<udpard_us_t , 3 > reorder_windows{ 2000 , UDPARD_RX_REORDERING_WINDOW_UNORDERED, 5000 };
166+ constexpr std::array<size_t , 3 > extents{ 1000 , 5000 , SIZE_MAX };
158167 std::array<uint_fast8_t , 3 > iface_indices{ 0U , 1U , 2U };
168+
169+ // Configure ports with varied extents and reordering windows to cover truncation and different RX modes.
170+ std::array<udpard_rx_port_t , 3 > ports{};
159171 for (size_t i = 0 ; i < ports.size (); i++) {
160- TEST_ASSERT_TRUE (udpard_rx_port_new (&ports[i], topic_hashes[i], 12000 , reorder_windows[i], rx_mem));
172+ TEST_ASSERT_TRUE (udpard_rx_port_new (&ports[i], topic_hashes[i], extents[i] , reorder_windows[i], rx_mem));
161173 }
174+
175+ // Setup the context.
162176 Context ctx{};
163- ctx.port_count = ports.size ();
164177 ctx.remote_uid = tx.local_uid ;
165178 for (size_t i = 0 ; i < ports.size (); i++) {
166- ctx.ports [i] = &ports[i];
167179 ctx.remote_endpoints [i] = { .ip = static_cast <uint32_t >(0x0A000001U + i),
168180 .port = static_cast <uint16_t >(7400U + i) };
169181 }
170182 rx.user = &ctx;
183+
184+ // Main test loop: generate transfers, push into TX, drain and shuffle frames, push into RX.
171185 std::array<uint64_t , 3 > transfer_ids{ static_cast <uint64_t >(rand ()),
172186 static_cast <uint64_t >(rand ()),
173187 static_cast <uint64_t >(rand ()) };
174188 udpard_us_t now = 0 ;
175189 for (size_t transfer_index = 0 ; transfer_index < 1000 ; transfer_index++) {
190+ now += static_cast <udpard_us_t >(random_range (1000 , 5000 ));
191+
192+ // Pick a port, build a random payload, and remember what to expect on that topic.
176193 const size_t port_index = random_range (0 , ports.size () - 1U );
177194 const uint64_t transfer_id = transfer_ids[port_index]++;
178195 const size_t payload_size = random_range (0 , 10000 );
179196 std::vector<uint8_t > payload (payload_size);
180197 fill_random (payload);
198+
199+ // Each transfer is sent on all redundant interfaces with different MTUs to exercise fragmentation variety.
181200 const udpard_bytes_t payload_view{ .size = payload.size (), .data = payload.data () };
182201 const auto priority = static_cast <udpard_prio_t >(random_range (0 , UDPARD_PRIORITY_MAX));
183202 const udpard_udpip_ep_t dest = udpard_make_subject_endpoint (subject_ids[port_index]);
184- const TransferKey key{ .transfer_id = transfer_id, .port_index = port_index };
203+ const TransferKey key{ .transfer_id = transfer_id, .topic_hash = topic_hashes[ port_index] };
185204 const bool inserted =
186205 ctx.expected .emplace (key, ExpectedPayload{ .payload = payload, .payload_size_wire = payload.size () }).second ;
187206 TEST_ASSERT_TRUE (inserted);
207+
208+ // Generate MTUs per redundant interface.
188209 std::array<size_t , 3 > mtu_values{};
189- for (size_t i = 0 ; i < 3 ; i++) {
190- const size_t candidate = random_range (UDPARD_MTU_MIN, 2000U );
191- bool unique = true ;
192- for (size_t j = 0 ; j < i; j++) {
193- if (mtu_values[j] == candidate) {
194- unique = false ;
195- break ;
196- }
197- }
198- if (!unique) {
199- i--;
200- continue ;
201- }
202- mtu_values[i] = candidate;
210+ for (auto & x : mtu_values) {
211+ x = random_range (UDPARD_MTU_MIN, 3000U );
203212 }
213+
214+ // Enqueue one transfer per interface with the per-interface MTU applied.
204215 const udpard_us_t deadline = now + 1000000 ;
205216 for (size_t iface = 0 ; iface < 3 ; iface++) {
206217 tx.mtu = mtu_values[iface];
@@ -216,21 +227,20 @@ void test_udpard_tx_rx_end_to_end()
216227 false ,
217228 &iface_indices[iface]));
218229 }
230+
231+ // Drain TX queue into local frame list so we can shuffle before injecting into RX ports.
219232 std::vector<Arrival> frames;
220233 frames.reserve (tx.queue_size );
221- while (true ) {
222- udpard_tx_item_t * item = udpard_tx_peek (&tx, now);
223- if (item == nullptr ) {
224- break ;
225- }
234+ while (udpard_tx_item_t * const item = udpard_tx_peek (&tx, now)) {
226235 udpard_tx_pop (&tx, item);
227- auto * iface_ptr = static_cast <uint_fast8_t *>(item->user_transfer_reference );
228- const uint_fast8_t iface_index = (iface_ptr != nullptr ) ? *iface_ptr : 0U ;
229- frames.push_back ({ .datagram = item->datagram_payload , .iface_index = iface_index });
236+ frames.push_back ({ .datagram = item->datagram_payload ,
237+ .iface_index = *static_cast <uint_fast8_t *>(item->user_transfer_reference ) });
230238 item->datagram_payload .data = nullptr ;
231239 item->datagram_payload .size = 0 ;
232240 udpard_tx_free (tx.memory , item);
233241 }
242+
243+ // Shuffle and push frames into the RX pipeline, simulating out-of-order redundant arrival.
234244 shuffle_frames (frames);
235245 for (const auto & [datagram, iface_index] : frames) {
236246 TEST_ASSERT_TRUE (udpard_rx_port_push (&rx,
@@ -242,12 +252,17 @@ void test_udpard_tx_rx_end_to_end()
242252 iface_index));
243253 now += 1 ;
244254 }
255+
256+ // Let the RX pipeline purge timeouts and deliver ready transfers.
245257 udpard_rx_poll (&rx, now);
246258 TEST_ASSERT_EQUAL_size_t (0 , tx.queue_size );
247259 }
260+
261+ // Final poll/validation and cleanup.
248262 udpard_rx_poll (&rx, now + 1000000 );
249263 TEST_ASSERT_TRUE (ctx.expected .empty ());
250264 TEST_ASSERT_EQUAL_size_t (1000 , ctx.received );
265+ TEST_ASSERT_TRUE (ctx.truncated > 0 );
251266 TEST_ASSERT_EQUAL_size_t (0 , ctx.collisions );
252267 TEST_ASSERT_EQUAL_size_t (0 , ctx.ack_mandates );
253268 for (auto & port : ports) {
0 commit comments