Skip to content

Commit e5dfb3b

Browse files
more tests
1 parent 2e47d18 commit e5dfb3b

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ gen_test_matrix(test_intrusive_tx "src/test_intrusive_tx.c")
9090
gen_test_matrix(test_intrusive_rx "src/test_intrusive_rx.c")
9191
gen_test_matrix(test_fragment "src/test_fragment.cpp;${library_dir}/udpard.c")
9292
gen_test_matrix(test_e2e_random "src/test_e2e_random.cpp;${library_dir}/udpard.c")
93+
gen_test_matrix(test_e2e_edge "src/test_e2e_edge.cpp;${library_dir}/udpard.c")
9394

9495
# Coverage targets. Usage:
9596
# cmake -DENABLE_COVERAGE=ON ..

tests/src/test_e2e_edge.cpp

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/// This software is distributed under the terms of the MIT License.
2+
/// Copyright (C) OpenCyphal Development Team <opencyphal.org>
3+
/// Copyright Amazon.com Inc. or its affiliates.
4+
/// SPDX-License-Identifier: MIT
5+
6+
// ReSharper disable CppPassValueParameterByConstReference
7+
8+
#include <udpard.h>
9+
#include "helpers.h"
10+
#include <unity.h>
11+
#include <array>
12+
#include <vector>
13+
14+
namespace {
15+
16+
void on_message(udpard_rx_t* rx, udpard_rx_port_t* port, udpard_rx_transfer_t transfer);
17+
void on_collision(udpard_rx_t* rx, udpard_rx_port_t* port, udpard_remote_t remote);
18+
void on_ack_mandate(udpard_rx_t* rx, udpard_rx_port_t* port, udpard_rx_ack_mandate_t am);
19+
20+
struct Context
21+
{
22+
std::vector<uint64_t> ids;
23+
size_t collisions = 0;
24+
size_t ack_mandates = 0;
25+
uint64_t expected_uid = 0;
26+
udpard_udpip_ep_t source = {};
27+
};
28+
29+
struct Fixture
30+
{
31+
instrumented_allocator_t tx_alloc_frag{};
32+
instrumented_allocator_t tx_alloc_payload{};
33+
instrumented_allocator_t rx_alloc_frag{};
34+
instrumented_allocator_t rx_alloc_session{};
35+
udpard_tx_t tx{};
36+
udpard_rx_t rx{};
37+
udpard_rx_port_t port{};
38+
udpard_mem_deleter_t tx_payload_deleter{};
39+
Context ctx{};
40+
udpard_udpip_ep_t dest{};
41+
udpard_udpip_ep_t source{};
42+
uint64_t topic_hash{ 0x90AB12CD34EF5678ULL };
43+
44+
Fixture(const Fixture&) = delete;
45+
Fixture& operator=(const Fixture&) = delete;
46+
Fixture(Fixture&&) = delete;
47+
Fixture& operator=(Fixture&&) = delete;
48+
49+
explicit Fixture(const udpard_us_t reordering_window)
50+
{
51+
instrumented_allocator_new(&tx_alloc_frag);
52+
instrumented_allocator_new(&tx_alloc_payload);
53+
instrumented_allocator_new(&rx_alloc_frag);
54+
instrumented_allocator_new(&rx_alloc_session);
55+
const udpard_tx_mem_resources_t tx_mem{ .fragment = instrumented_allocator_make_resource(&tx_alloc_frag),
56+
.payload = instrumented_allocator_make_resource(&tx_alloc_payload) };
57+
const udpard_rx_mem_resources_t rx_mem{ .session = instrumented_allocator_make_resource(&rx_alloc_session),
58+
.fragment = instrumented_allocator_make_resource(&rx_alloc_frag) };
59+
tx_payload_deleter = instrumented_allocator_make_deleter(&tx_alloc_payload);
60+
source = { .ip = 0x0A000001U, .port = 7501U };
61+
dest = udpard_make_subject_endpoint(222U);
62+
63+
TEST_ASSERT_TRUE(udpard_tx_new(&tx, 0x0A0B0C0D0E0F1011ULL, 16, tx_mem));
64+
TEST_ASSERT_TRUE(udpard_rx_new(&rx, &on_message, &on_collision, &on_ack_mandate));
65+
ctx.expected_uid = tx.local_uid;
66+
ctx.source = source;
67+
rx.user = &ctx;
68+
TEST_ASSERT_TRUE(udpard_rx_port_new(&port, topic_hash, 1024, reordering_window, rx_mem));
69+
}
70+
71+
~Fixture()
72+
{
73+
udpard_rx_port_free(&rx, &port);
74+
TEST_ASSERT_EQUAL_size_t(0, rx_alloc_frag.allocated_fragments);
75+
TEST_ASSERT_EQUAL_size_t(0, rx_alloc_session.allocated_fragments);
76+
TEST_ASSERT_EQUAL_size_t(0, tx_alloc_frag.allocated_fragments);
77+
TEST_ASSERT_EQUAL_size_t(0, tx_alloc_payload.allocated_fragments);
78+
instrumented_allocator_reset(&rx_alloc_frag);
79+
instrumented_allocator_reset(&rx_alloc_session);
80+
instrumented_allocator_reset(&tx_alloc_frag);
81+
instrumented_allocator_reset(&tx_alloc_payload);
82+
}
83+
84+
void push_single(const udpard_us_t ts, const uint64_t transfer_id)
85+
{
86+
std::array<uint8_t, 8> payload_buf{};
87+
for (size_t i = 0; i < payload_buf.size(); i++) {
88+
payload_buf[i] = static_cast<uint8_t>(transfer_id >> (i * 8U));
89+
}
90+
const udpard_bytes_t payload{ .size = payload_buf.size(), .data = payload_buf.data() };
91+
const udpard_us_t deadline = ts + 1000000;
92+
const uint_fast8_t iface_index = 0;
93+
TEST_ASSERT_GREATER_THAN_UINT32(
94+
0U,
95+
udpard_tx_push(&tx, ts, deadline, udpard_prio_slow, topic_hash, dest, transfer_id, payload, false, nullptr));
96+
udpard_tx_item_t* const item = udpard_tx_peek(&tx, ts);
97+
TEST_ASSERT_NOT_NULL(item);
98+
udpard_tx_pop(&tx, item);
99+
TEST_ASSERT_TRUE(
100+
udpard_rx_port_push(&rx, &port, ts, source, item->datagram_payload, tx_payload_deleter, iface_index));
101+
item->datagram_payload.data = nullptr;
102+
item->datagram_payload.size = 0;
103+
udpard_tx_free(tx.memory, item);
104+
}
105+
};
106+
107+
/// Callbacks keep the payload memory under control.
108+
void on_message(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_transfer_t transfer)
109+
{
110+
auto* const ctx = static_cast<Context*>(rx->user);
111+
ctx->ids.push_back(transfer.transfer_id);
112+
TEST_ASSERT_EQUAL_UINT64(ctx->expected_uid, transfer.remote.uid);
113+
TEST_ASSERT_EQUAL_UINT32(ctx->source.ip, transfer.remote.endpoints[0].ip);
114+
TEST_ASSERT_EQUAL_UINT16(ctx->source.port, transfer.remote.endpoints[0].port);
115+
udpard_fragment_free_all(transfer.payload, port->memory.fragment);
116+
}
117+
118+
void on_collision(udpard_rx_t* const rx, udpard_rx_port_t* const /*port*/, const udpard_remote_t /*remote*/)
119+
{
120+
auto* const ctx = static_cast<Context*>(rx->user);
121+
ctx->collisions++;
122+
}
123+
124+
void on_ack_mandate(udpard_rx_t* const rx, udpard_rx_port_t* const /*port*/, const udpard_rx_ack_mandate_t /*am*/)
125+
{
126+
auto* const ctx = static_cast<Context*>(rx->user);
127+
ctx->ack_mandates++;
128+
}
129+
130+
/// UNORDERED mode should drop duplicates while keeping arrival order.
131+
void test_udpard_rx_unordered_duplicates()
132+
{
133+
Fixture fix{ UDPARD_RX_REORDERING_WINDOW_UNORDERED };
134+
udpard_us_t now = 0;
135+
136+
const std::array<uint64_t, 6> ids{ 100, 200, 10100, 10200, 200, 100 };
137+
for (const auto id : ids) {
138+
fix.push_single(now, id);
139+
udpard_rx_poll(&fix.rx, now);
140+
now++;
141+
}
142+
udpard_rx_poll(&fix.rx, now + 100);
143+
144+
const std::array<uint64_t, 4> expected{ 100, 200, 10100, 10200 };
145+
TEST_ASSERT_EQUAL_size_t(expected.size(), fix.ctx.ids.size());
146+
for (size_t i = 0; i < expected.size(); i++) {
147+
TEST_ASSERT_EQUAL_UINT64(expected[i], fix.ctx.ids[i]);
148+
}
149+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.collisions);
150+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.ack_mandates);
151+
}
152+
153+
/// ORDERED mode waits for the window, then rejects late arrivals.
154+
void test_udpard_rx_ordered_out_of_order()
155+
{
156+
Fixture fix{ 50 };
157+
udpard_us_t now = 0;
158+
159+
// First batch builds the ordered baseline.
160+
fix.push_single(now, 100);
161+
udpard_rx_poll(&fix.rx, now);
162+
fix.push_single(++now, 300);
163+
udpard_rx_poll(&fix.rx, now);
164+
fix.push_single(++now, 200);
165+
udpard_rx_poll(&fix.rx, now);
166+
167+
// Let the reordering window close for the early transfers.
168+
now = 60;
169+
udpard_rx_poll(&fix.rx, now);
170+
171+
// Queue far-future IDs while keeping the head at 300.
172+
fix.push_single(now + 1, 10100);
173+
udpard_rx_poll(&fix.rx, now + 1);
174+
fix.push_single(now + 2, 10200);
175+
udpard_rx_poll(&fix.rx, now + 2);
176+
177+
// Late arrivals inside the window shall be dropped.
178+
fix.push_single(now + 3, 250);
179+
udpard_rx_poll(&fix.rx, now + 3);
180+
fix.push_single(now + 4, 150);
181+
udpard_rx_poll(&fix.rx, now + 4);
182+
183+
// Allow the window to expire so the remaining interned transfers eject.
184+
udpard_rx_poll(&fix.rx, now + 70);
185+
186+
const std::array<uint64_t, 5> expected{ 100, 200, 300, 10100, 10200 };
187+
TEST_ASSERT_EQUAL_size_t(expected.size(), fix.ctx.ids.size());
188+
for (size_t i = 0; i < expected.size(); i++) {
189+
TEST_ASSERT_EQUAL_UINT64(expected[i], fix.ctx.ids[i]);
190+
}
191+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.collisions);
192+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.ack_mandates);
193+
}
194+
195+
/// ORDERED mode after head advance should reject late IDs arriving after window expiry.
196+
void test_udpard_rx_ordered_head_advanced_late()
197+
{
198+
Fixture fix{ 50 };
199+
udpard_us_t now = 0;
200+
201+
fix.push_single(now, 100);
202+
udpard_rx_poll(&fix.rx, now);
203+
fix.push_single(++now, 300);
204+
udpard_rx_poll(&fix.rx, now);
205+
fix.push_single(++now, 200);
206+
udpard_rx_poll(&fix.rx, now);
207+
now = 60;
208+
udpard_rx_poll(&fix.rx, now); // head -> 300
209+
210+
fix.push_single(++now, 420);
211+
udpard_rx_poll(&fix.rx, now);
212+
fix.push_single(++now, 450);
213+
udpard_rx_poll(&fix.rx, now);
214+
now = 120;
215+
udpard_rx_poll(&fix.rx, now); // head -> 450
216+
217+
fix.push_single(++now, 320);
218+
udpard_rx_poll(&fix.rx, now);
219+
fix.push_single(++now, 310);
220+
udpard_rx_poll(&fix.rx, now);
221+
222+
const std::array<uint64_t, 5> expected{ 100, 200, 300, 420, 450 };
223+
TEST_ASSERT_EQUAL_size_t(expected.size(), fix.ctx.ids.size());
224+
for (size_t i = 0; i < expected.size(); i++) {
225+
TEST_ASSERT_EQUAL_UINT64(expected[i], fix.ctx.ids[i]);
226+
}
227+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.collisions);
228+
TEST_ASSERT_EQUAL_size_t(0, fix.ctx.ack_mandates);
229+
}
230+
231+
} // namespace
232+
233+
extern "C" void setUp() {}
234+
235+
extern "C" void tearDown() {}
236+
237+
int main()
238+
{
239+
UNITY_BEGIN();
240+
RUN_TEST(test_udpard_rx_unordered_duplicates);
241+
RUN_TEST(test_udpard_rx_ordered_out_of_order);
242+
RUN_TEST(test_udpard_rx_ordered_head_advanced_late);
243+
return UNITY_END();
244+
}

0 commit comments

Comments
 (0)