|
| 1 | +/// This software is distributed under the terms of the MIT License. |
| 2 | +/// Copyright (C) OpenCyphal Development Team <opencyphal.org> |
| 3 | +/// Copyright Amazon.com Inc. or its affiliates. |
| 4 | +/// SPDX-License-Identifier: MIT |
| 5 | + |
| 6 | +// ReSharper disable CppPassValueParameterByConstReference |
| 7 | + |
| 8 | +#include <udpard.h> |
| 9 | +#include "helpers.h" |
| 10 | +#include <unity.h> |
| 11 | +#include <algorithm> |
| 12 | +#include <array> |
| 13 | +#include <unordered_map> |
| 14 | +#include <vector> |
| 15 | + |
| 16 | +namespace { |
| 17 | + |
| 18 | +struct TransferKey |
| 19 | +{ |
| 20 | + uint64_t transfer_id; |
| 21 | + size_t port_index; |
| 22 | + bool operator==(const TransferKey& other) const |
| 23 | + { |
| 24 | + return (transfer_id == other.transfer_id) && (port_index == other.port_index); |
| 25 | + } |
| 26 | +}; |
| 27 | + |
| 28 | +struct TransferKeyHash |
| 29 | +{ |
| 30 | + size_t operator()(const TransferKey& key) const |
| 31 | + { |
| 32 | + return (std::hash<uint64_t>{}(key.transfer_id) << 1U) ^ std::hash<size_t>{}(key.port_index); |
| 33 | + } |
| 34 | +}; |
| 35 | + |
| 36 | +struct ExpectedPayload |
| 37 | +{ |
| 38 | + std::vector<uint8_t> payload; |
| 39 | + size_t payload_size_wire; |
| 40 | +}; |
| 41 | + |
| 42 | +struct Context |
| 43 | +{ |
| 44 | + std::unordered_map<TransferKey, ExpectedPayload, TransferKeyHash> expected; |
| 45 | + size_t received = 0; |
| 46 | + size_t collisions = 0; |
| 47 | + size_t ack_mandates = 0; |
| 48 | + std::array<udpard_rx_port_t*, UDPARD_NETWORK_INTERFACE_COUNT_MAX> ports = {}; |
| 49 | + size_t port_count = 0; |
| 50 | + uint64_t remote_uid = 0; |
| 51 | + std::array<udpard_udpip_ep_t, UDPARD_NETWORK_INTERFACE_COUNT_MAX> remote_endpoints = {}; |
| 52 | +}; |
| 53 | + |
| 54 | +struct Arrival |
| 55 | +{ |
| 56 | + udpard_bytes_mut_t datagram; |
| 57 | + uint_fast8_t iface_index; |
| 58 | +}; |
| 59 | + |
| 60 | +size_t random_range(const size_t min, const size_t max) |
| 61 | +{ |
| 62 | + const size_t span = max - min + 1U; |
| 63 | + return min + (static_cast<size_t>(rand()) % span); |
| 64 | +} |
| 65 | + |
| 66 | +void fill_random(std::vector<uint8_t>& data) |
| 67 | +{ |
| 68 | + for (auto& byte : data) { |
| 69 | + byte = static_cast<uint8_t>(random_range(0, UINT8_MAX)); |
| 70 | + } |
| 71 | +} |
| 72 | + |
| 73 | +void shuffle_frames(std::vector<Arrival>& frames) |
| 74 | +{ |
| 75 | + for (size_t i = frames.size(); i > 1; i--) { |
| 76 | + const size_t j = random_range(0, i - 1); |
| 77 | + std::swap(frames[i - 1U], frames[j]); |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +void on_message(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_transfer_t transfer) |
| 82 | +{ |
| 83 | + auto* ctx = static_cast<Context*>(rx->user); |
| 84 | + size_t port_index = ctx->port_count; |
| 85 | + for (size_t i = 0; i < ctx->port_count; i++) { |
| 86 | + if (ctx->ports[i] == port) { |
| 87 | + port_index = i; |
| 88 | + break; |
| 89 | + } |
| 90 | + } |
| 91 | + TEST_ASSERT(port_index < ctx->port_count); |
| 92 | + const TransferKey key{ .transfer_id = transfer.transfer_id, .port_index = port_index }; |
| 93 | + const auto it = ctx->expected.find(key); |
| 94 | + TEST_ASSERT(it != ctx->expected.end()); |
| 95 | + std::vector<uint8_t> assembled(transfer.payload_size_stored); |
| 96 | + const size_t gathered = udpard_fragment_gather( |
| 97 | + transfer.payload, transfer.payload_size_stored, (transfer.payload_size_stored > 0U) ? assembled.data() : nullptr); |
| 98 | + TEST_ASSERT_EQUAL_size_t(transfer.payload_size_stored, gathered); |
| 99 | + TEST_ASSERT_EQUAL_size_t(it->second.payload.size(), transfer.payload_size_stored); |
| 100 | + TEST_ASSERT_EQUAL_size_t(it->second.payload_size_wire, transfer.payload_size_wire); |
| 101 | + if (transfer.payload_size_stored > 0U) { |
| 102 | + TEST_ASSERT_EQUAL_MEMORY(it->second.payload.data(), assembled.data(), transfer.payload_size_stored); |
| 103 | + } |
| 104 | + TEST_ASSERT_EQUAL_UINT64(ctx->remote_uid, transfer.remote.uid); |
| 105 | + for (size_t i = 0; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++) { |
| 106 | + if ((transfer.remote.endpoints[i].ip != 0U) || (transfer.remote.endpoints[i].port != 0U)) { |
| 107 | + TEST_ASSERT_EQUAL_UINT32(ctx->remote_endpoints[i].ip, transfer.remote.endpoints[i].ip); |
| 108 | + TEST_ASSERT_EQUAL_UINT16(ctx->remote_endpoints[i].port, transfer.remote.endpoints[i].port); |
| 109 | + } |
| 110 | + } |
| 111 | + udpard_fragment_free_all(transfer.payload, port->memory.fragment); |
| 112 | + ctx->expected.erase(it); |
| 113 | + ctx->received++; |
| 114 | +} |
| 115 | + |
| 116 | +void on_collision(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_remote_t remote) |
| 117 | +{ |
| 118 | + auto* ctx = static_cast<Context*>(rx->user); |
| 119 | + (void)port; |
| 120 | + (void)remote; |
| 121 | + ctx->collisions++; |
| 122 | +} |
| 123 | + |
| 124 | +void on_ack_mandate(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_ack_mandate_t mandate) |
| 125 | +{ |
| 126 | + auto* ctx = static_cast<Context*>(rx->user); |
| 127 | + (void)port; |
| 128 | + (void)mandate; |
| 129 | + ctx->ack_mandates++; |
| 130 | +} |
| 131 | + |
| 132 | +void test_udpard_tx_rx_end_to_end() |
| 133 | +{ |
| 134 | + seed_prng(); |
| 135 | + instrumented_allocator_t tx_alloc_frag{}; |
| 136 | + instrumented_allocator_new(&tx_alloc_frag); |
| 137 | + instrumented_allocator_t tx_alloc_payload{}; |
| 138 | + instrumented_allocator_new(&tx_alloc_payload); |
| 139 | + const udpard_tx_mem_resources_t tx_mem{ .fragment = instrumented_allocator_make_resource(&tx_alloc_frag), |
| 140 | + .payload = instrumented_allocator_make_resource(&tx_alloc_payload) }; |
| 141 | + udpard_tx_t tx; |
| 142 | + TEST_ASSERT_TRUE(udpard_tx_new(&tx, 0x0A0B0C0D0E0F1011ULL, 256, tx_mem)); |
| 143 | + instrumented_allocator_t rx_alloc_frag{}; |
| 144 | + instrumented_allocator_new(&rx_alloc_frag); |
| 145 | + instrumented_allocator_t rx_alloc_session{}; |
| 146 | + instrumented_allocator_new(&rx_alloc_session); |
| 147 | + const udpard_rx_mem_resources_t rx_mem{ .session = instrumented_allocator_make_resource(&rx_alloc_session), |
| 148 | + .fragment = instrumented_allocator_make_resource(&rx_alloc_frag) }; |
| 149 | + udpard_rx_t rx; |
| 150 | + TEST_ASSERT_TRUE(udpard_rx_new(&rx, &on_message, &on_collision, &on_ack_mandate)); |
| 151 | + const udpard_mem_deleter_t tx_payload_deleter = instrumented_allocator_make_deleter(&tx_alloc_payload); |
| 152 | + std::array<udpard_rx_port_t, 3> ports{}; |
| 153 | + constexpr std::array<uint64_t, 3> topic_hashes{ 0x123456789ABCDEF0ULL, |
| 154 | + 0x0FEDCBA987654321ULL, |
| 155 | + 0x00ACE00ACE00ACEULL }; |
| 156 | + constexpr std::array<uint32_t, 3> subject_ids{ 10U, 20U, 30U }; |
| 157 | + constexpr std::array<udpard_us_t, 3> reorder_windows{ 2000, UDPARD_RX_REORDERING_WINDOW_UNORDERED, 5000 }; |
| 158 | + std::array<uint_fast8_t, 3> iface_indices{ 0U, 1U, 2U }; |
| 159 | + for (size_t i = 0; i < ports.size(); i++) { |
| 160 | + TEST_ASSERT_TRUE(udpard_rx_port_new(&ports[i], topic_hashes[i], 12000, reorder_windows[i], rx_mem)); |
| 161 | + } |
| 162 | + Context ctx{}; |
| 163 | + ctx.port_count = ports.size(); |
| 164 | + ctx.remote_uid = tx.local_uid; |
| 165 | + for (size_t i = 0; i < ports.size(); i++) { |
| 166 | + ctx.ports[i] = &ports[i]; |
| 167 | + ctx.remote_endpoints[i] = { .ip = static_cast<uint32_t>(0x0A000001U + i), |
| 168 | + .port = static_cast<uint16_t>(7400U + i) }; |
| 169 | + } |
| 170 | + rx.user = &ctx; |
| 171 | + std::array<uint64_t, 3> transfer_ids{ static_cast<uint64_t>(rand()), |
| 172 | + static_cast<uint64_t>(rand()), |
| 173 | + static_cast<uint64_t>(rand()) }; |
| 174 | + udpard_us_t now = 0; |
| 175 | + for (size_t transfer_index = 0; transfer_index < 1000; transfer_index++) { |
| 176 | + const size_t port_index = random_range(0, ports.size() - 1U); |
| 177 | + const uint64_t transfer_id = transfer_ids[port_index]++; |
| 178 | + const size_t payload_size = random_range(0, 10000); |
| 179 | + std::vector<uint8_t> payload(payload_size); |
| 180 | + fill_random(payload); |
| 181 | + const udpard_bytes_t payload_view{ .size = payload.size(), .data = payload.data() }; |
| 182 | + const auto priority = static_cast<udpard_prio_t>(random_range(0, UDPARD_PRIORITY_MAX)); |
| 183 | + const udpard_udpip_ep_t dest = udpard_make_subject_endpoint(subject_ids[port_index]); |
| 184 | + const TransferKey key{ .transfer_id = transfer_id, .port_index = port_index }; |
| 185 | + const bool inserted = |
| 186 | + ctx.expected.emplace(key, ExpectedPayload{ .payload = payload, .payload_size_wire = payload.size() }).second; |
| 187 | + TEST_ASSERT_TRUE(inserted); |
| 188 | + std::array<size_t, 3> mtu_values{}; |
| 189 | + for (size_t i = 0; i < 3; i++) { |
| 190 | + const size_t candidate = random_range(UDPARD_MTU_MIN, 2000U); |
| 191 | + bool unique = true; |
| 192 | + for (size_t j = 0; j < i; j++) { |
| 193 | + if (mtu_values[j] == candidate) { |
| 194 | + unique = false; |
| 195 | + break; |
| 196 | + } |
| 197 | + } |
| 198 | + if (!unique) { |
| 199 | + i--; |
| 200 | + continue; |
| 201 | + } |
| 202 | + mtu_values[i] = candidate; |
| 203 | + } |
| 204 | + const udpard_us_t deadline = now + 1000000; |
| 205 | + for (size_t iface = 0; iface < 3; iface++) { |
| 206 | + tx.mtu = mtu_values[iface]; |
| 207 | + TEST_ASSERT_GREATER_THAN_UINT32(0U, |
| 208 | + udpard_tx_push(&tx, |
| 209 | + now, |
| 210 | + deadline, |
| 211 | + priority, |
| 212 | + topic_hashes[port_index], |
| 213 | + dest, |
| 214 | + transfer_id, |
| 215 | + payload_view, |
| 216 | + false, |
| 217 | + &iface_indices[iface])); |
| 218 | + } |
| 219 | + std::vector<Arrival> frames; |
| 220 | + frames.reserve(tx.queue_size); |
| 221 | + while (true) { |
| 222 | + udpard_tx_item_t* item = udpard_tx_peek(&tx, now); |
| 223 | + if (item == nullptr) { |
| 224 | + break; |
| 225 | + } |
| 226 | + udpard_tx_pop(&tx, item); |
| 227 | + auto* iface_ptr = static_cast<uint_fast8_t*>(item->user_transfer_reference); |
| 228 | + const uint_fast8_t iface_index = (iface_ptr != nullptr) ? *iface_ptr : 0U; |
| 229 | + frames.push_back({ .datagram = item->datagram_payload, .iface_index = iface_index }); |
| 230 | + item->datagram_payload.data = nullptr; |
| 231 | + item->datagram_payload.size = 0; |
| 232 | + udpard_tx_free(tx.memory, item); |
| 233 | + } |
| 234 | + shuffle_frames(frames); |
| 235 | + for (const auto& [datagram, iface_index] : frames) { |
| 236 | + TEST_ASSERT_TRUE(udpard_rx_port_push(&rx, |
| 237 | + &ports[port_index], |
| 238 | + now, |
| 239 | + ctx.remote_endpoints[iface_index], |
| 240 | + datagram, |
| 241 | + tx_payload_deleter, |
| 242 | + iface_index)); |
| 243 | + now += 1; |
| 244 | + } |
| 245 | + udpard_rx_poll(&rx, now); |
| 246 | + TEST_ASSERT_EQUAL_size_t(0, tx.queue_size); |
| 247 | + } |
| 248 | + udpard_rx_poll(&rx, now + 1000000); |
| 249 | + TEST_ASSERT_TRUE(ctx.expected.empty()); |
| 250 | + TEST_ASSERT_EQUAL_size_t(1000, ctx.received); |
| 251 | + TEST_ASSERT_EQUAL_size_t(0, ctx.collisions); |
| 252 | + TEST_ASSERT_EQUAL_size_t(0, ctx.ack_mandates); |
| 253 | + for (auto& port : ports) { |
| 254 | + udpard_rx_port_free(&rx, &port); |
| 255 | + } |
| 256 | + TEST_ASSERT_EQUAL_size_t(0, rx_alloc_frag.allocated_fragments); |
| 257 | + TEST_ASSERT_EQUAL_size_t(0, rx_alloc_session.allocated_fragments); |
| 258 | + TEST_ASSERT_EQUAL_size_t(0, tx_alloc_frag.allocated_fragments); |
| 259 | + TEST_ASSERT_EQUAL_size_t(0, tx_alloc_payload.allocated_fragments); |
| 260 | + instrumented_allocator_reset(&rx_alloc_frag); |
| 261 | + instrumented_allocator_reset(&rx_alloc_session); |
| 262 | + instrumented_allocator_reset(&tx_alloc_frag); |
| 263 | + instrumented_allocator_reset(&tx_alloc_payload); |
| 264 | +} |
| 265 | + |
| 266 | +} // namespace |
| 267 | + |
| 268 | +extern "C" void setUp() {} |
| 269 | + |
| 270 | +extern "C" void tearDown() {} |
| 271 | + |
| 272 | +int main() |
| 273 | +{ |
| 274 | + UNITY_BEGIN(); |
| 275 | + RUN_TEST(test_udpard_tx_rx_end_to_end); |
| 276 | + return UNITY_END(); |
| 277 | +} |
0 commit comments