Skip to content

Commit f1295e5

Browse files
mostly implement ack transmission but with a few missing todos; the tests are not yet updated
1 parent 03129f6 commit f1295e5

File tree

2 files changed

+240
-67
lines changed

2 files changed

+240
-67
lines changed

libudpard/udpard.c

Lines changed: 169 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,6 @@
4141

4242
typedef unsigned char byte_t; ///< For compatibility with platforms where byte size is not 8 bits.
4343

44-
#define BIG_BANG INT64_MIN
45-
#define HEAT_DEATH INT64_MAX
46-
47-
#define KILO 1000LL
48-
#define MEGA 1000000LL
49-
5044
/// Sessions will be garbage-collected after being idle for this long, along with unfinished transfers, if any.
5145
/// Pending slots within a live session will also be reset after this timeout to avoid storing stale data indefinitely.
5246
#define SESSION_LIFETIME (60 * MEGA)
@@ -63,7 +57,7 @@ typedef unsigned char byte_t; ///< For compatibility with platforms where byte s
6357
/// were found to offer no advantage except in the perfect scenario of non-restarting senders, and an increased
6458
/// implementation complexity (more branches, more lines of code), so they were replaced with a simple list.
6559
/// The list works equally well given a non-contiguous transfer-ID stream, unlike the bitmask, thus more robust.
66-
#define RX_TRANSFER_HISTORY_COUNT 16U
60+
#define RX_TRANSFER_HISTORY_COUNT 32U
6761

6862
/// In the ORDERED reassembly mode, with the most recently received transfer-ID N, the library will reject
6963
/// transfers with transfer-ID less than or equal to N-ORDERING_WINDOW (modulo 2^64) as late.
@@ -74,6 +68,18 @@ typedef unsigned char byte_t; ///< For compatibility with platforms where byte s
7468
static_assert((UDPARD_IPv4_SUBJECT_ID_MAX & (UDPARD_IPv4_SUBJECT_ID_MAX + 1)) == 0,
7569
"UDPARD_IPv4_SUBJECT_ID_MAX must be one less than a power of 2");
7670

71+
#define P2P_KIND_RESPONSE 0U
72+
#define P2P_KIND_ACK 1U
73+
74+
#define BIG_BANG INT64_MIN
75+
#define HEAT_DEATH INT64_MAX
76+
77+
#define KILO 1000LL
78+
#define MEGA 1000000LL
79+
80+
/// Pending ack transfers expire after this long if not transmitted.
81+
#define ACK_TX_DEADLINE MEGA
82+
7783
static size_t smaller(const size_t a, const size_t b) { return (a < b) ? a : b; }
7884
static size_t larger(const size_t a, const size_t b) { return (a > b) ? a : b; }
7985
static int64_t min_i64(const int64_t a, const int64_t b) { return (a < b) ? a : b; }
@@ -479,7 +485,7 @@ static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
479485
/// Frames with identical weight are processed in the FIFO order.
480486
static int32_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
481487
{
482-
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, udpard_tx_item_t, index_prio)->priority)
488+
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, udpard_tx_item_t, index_order)->priority)
483489
? +1
484490
: -1;
485491
}
@@ -498,7 +504,7 @@ static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
498504
{
499505
udpard_tx_item_t* out = mem_alloc(memory.fragment, sizeof(udpard_tx_item_t));
500506
if (out != NULL) {
501-
out->index_prio = (udpard_tree_t){ 0 };
507+
out->index_order = (udpard_tree_t){ 0 };
502508
out->index_deadline = (udpard_tree_t){ 0 };
503509
UDPARD_ASSERT(priority <= UDPARD_PRIORITY_MAX);
504510
out->priority = priority;
@@ -582,8 +588,8 @@ static uint32_t tx_push(udpard_tx_t* const tx,
582588
udpard_tx_item_t* const head = chain.head;
583589
UDPARD_ASSERT(frame_count == chain.count);
584590
const udpard_tree_t* res = cavl2_find_or_insert(
585-
&tx->index_prio, &head->priority, &tx_cavl_compare_prio, &head->index_prio, &cavl2_trivial_factory);
586-
UDPARD_ASSERT(res == &head->index_prio);
591+
&tx->index_order, &head->priority, &tx_cavl_compare_prio, &head->index_order, &cavl2_trivial_factory);
592+
UDPARD_ASSERT(res == &head->index_order);
587593
(void)res;
588594
res = cavl2_find_or_insert(&tx->index_deadline,
589595
&head->deadline,
@@ -619,7 +625,7 @@ static uint64_t tx_purge_expired(udpard_tx_t* const self, const udpard_us_t now)
619625
udpard_tree_t* const next = cavl2_next_greater(p); // Get next before removing current node from tree.
620626
// Remove from both indices.
621627
cavl2_remove(&self->index_deadline, &item->index_deadline);
622-
cavl2_remove(&self->index_prio, &item->index_prio);
628+
cavl2_remove(&self->index_order, &item->index_order);
623629
// Free the entire transfer chain.
624630
udpard_tx_item_t* current = item;
625631
while (current != NULL) {
@@ -647,7 +653,7 @@ bool udpard_tx_new(udpard_tx_t* const self,
647653
self->mtu = UDPARD_MTU_DEFAULT;
648654
self->memory = memory;
649655
self->queue_size = 0;
650-
self->index_prio = NULL;
656+
self->index_order = NULL;
651657
self->index_deadline = NULL;
652658
}
653659
return ok;
@@ -688,7 +694,7 @@ udpard_tx_item_t* udpard_tx_peek(udpard_tx_t* const self, const udpard_us_t now)
688694
udpard_tx_item_t* out = NULL;
689695
if (self != NULL) {
690696
self->errors_expiration += tx_purge_expired(self, now);
691-
out = CAVL2_TO_OWNER(cavl2_min(self->index_prio), udpard_tx_item_t, index_prio);
697+
out = CAVL2_TO_OWNER(cavl2_min(self->index_order), udpard_tx_item_t, index_order);
692698
}
693699
return out;
694700
}
@@ -697,10 +703,10 @@ void udpard_tx_pop(udpard_tx_t* const self, udpard_tx_item_t* const item)
697703
{
698704
if ((self != NULL) && (item != NULL)) {
699705
if (item->next_in_transfer == NULL) {
700-
cavl2_remove(&self->index_prio, &item->index_prio);
706+
cavl2_remove(&self->index_order, &item->index_order);
701707
cavl2_remove(&self->index_deadline, &item->index_deadline);
702708
} else { // constant-time update, super quick, just relink a few pointers!
703-
cavl2_replace(&self->index_prio, &item->index_prio, &item->next_in_transfer->index_prio);
709+
cavl2_replace(&self->index_order, &item->index_order, &item->next_in_transfer->index_order);
704710
cavl2_replace(&self->index_deadline, &item->index_deadline, &item->next_in_transfer->index_deadline);
705711
}
706712
self->queue_size--;
@@ -710,8 +716,8 @@ void udpard_tx_pop(udpard_tx_t* const self, udpard_tx_item_t* const item)
710716
void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* const item)
711717
{
712718
if (item != NULL) {
713-
UDPARD_ASSERT((item->index_prio.lr[0] == NULL) && (item->index_prio.up == NULL) &&
714-
(item->index_prio.lr[1] == NULL));
719+
UDPARD_ASSERT((item->index_order.lr[0] == NULL) && (item->index_order.up == NULL) &&
720+
(item->index_order.lr[1] == NULL));
715721
UDPARD_ASSERT((item->index_deadline.lr[0] == NULL) && (item->index_deadline.up == NULL) &&
716722
(item->index_deadline.lr[1] == NULL));
717723
if (item->datagram_payload.data != NULL) {
@@ -721,6 +727,62 @@ void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* co
721727
}
722728
}
723729

730+
/// Handle an ACK received from a remote node.
731+
/// This is where we acknowledge pending transmissions.
732+
static void tx_receive_ack(udpard_rx_t* const rx,
733+
const uint64_t topic_hash,
734+
const uint64_t transfer_id,
735+
const udpard_remote_t remote)
736+
{
737+
(void)rx;
738+
(void)topic_hash;
739+
(void)transfer_id;
740+
(void)remote;
741+
// TODO: implement
742+
}
743+
744+
/// Generate an ack transfer for the specified remote transfer.
745+
static void tx_send_ack(udpard_rx_t* const rx,
746+
const udpard_us_t now,
747+
const udpard_prio_t priority,
748+
const uint64_t topic_hash,
749+
const uint64_t transfer_id,
750+
const udpard_remote_t remote)
751+
{
752+
// Compose the ack transfer payload. It simply contains the topic hash and the ID of the acked transfer.
753+
byte_t header[UDPARD_P2P_HEADER_BYTES];
754+
byte_t* ptr = header;
755+
*ptr++ = P2P_KIND_ACK;
756+
ptr += 7U; // Reserved bytes.
757+
ptr = serialize_u64(ptr, topic_hash);
758+
ptr = serialize_u64(ptr, transfer_id);
759+
UDPARD_ASSERT((ptr - header) == UDPARD_P2P_HEADER_BYTES);
760+
const udpard_bytes_t payload = { .size = UDPARD_P2P_HEADER_BYTES, .data = header };
761+
762+
// Enqueue the ack transfer.
763+
const uint64_t p2p_transfer_id = rx->p2p_transfer_id++;
764+
for (uint_fast8_t i = 0; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++) {
765+
udpard_tx_t* const tx = rx->tx[i];
766+
if ((tx != NULL) && udpard_is_valid_endpoint(remote.endpoints[i])) {
767+
// TODO: scan the transmission queue for already pending acks; abort if one is already there.
768+
const uint32_t count = udpard_tx_push(tx,
769+
now,
770+
now + ACK_TX_DEADLINE,
771+
priority,
772+
remote.uid, // this is a P2P transfer
773+
remote.endpoints[i],
774+
p2p_transfer_id,
775+
payload,
776+
false,
777+
NULL);
778+
UDPARD_ASSERT(count <= 1);
779+
if (count != 1) { // ack is always a single-frame transfer, so we get either 0 or 1
780+
rx->errors_ack_tx[i]++;
781+
}
782+
}
783+
}
784+
}
785+
724786
// ---------------------------------------------------------------------------------------------------------------------
725787
// --------------------------------------------- RX PIPELINE ---------------------------------------------
726788
// ---------------------------------------------------------------------------------------------------------------------
@@ -1084,6 +1146,9 @@ static void rx_slot_update(rx_slot_t* const slot,
10841146

10851147
// --------------------------------------------- SESSION & PORT ---------------------------------------------
10861148

1149+
/// The number of times `from` must be incremented (modulo 2^64) to reach `to`.
1150+
static uint64_t rx_transfer_id_forward_distance(const uint64_t from, const uint64_t to) { return to - from; }
1151+
10871152
/// Keep in mind that we have a dedicated session object per remote node per port; this means that the states
10881153
/// kept here are specific per remote node, as it should be.
10891154
typedef struct rx_session_t
@@ -1126,9 +1191,6 @@ typedef struct udpard_rx_port_vtable_private_t
11261191
void (*update_session)(rx_session_t*, udpard_rx_t*, udpard_us_t, rx_frame_t*, udpard_mem_deleter_t);
11271192
} udpard_rx_port_vtable_private_t;
11281193

1129-
/// The number of times `from` must be incremented (modulo 2^64) to reach `to`.
1130-
static uint64_t rx_transfer_id_forward_distance(const uint64_t from, const uint64_t to) { return to - from; }
1131-
11321194
/// True iff the given transfer-ID was recently ejected.
11331195
static bool rx_session_is_transfer_ejected(const rx_session_t* const self, const uint64_t transfer_id)
11341196
{
@@ -1163,21 +1225,6 @@ static bool rx_session_is_transfer_interned(const rx_session_t* const self, cons
11631225
return false;
11641226
}
11651227

1166-
static void rx_session_on_ack_mandate(const rx_session_t* const self,
1167-
udpard_rx_t* const rx,
1168-
const udpard_prio_t priority,
1169-
const uint64_t transfer_id,
1170-
const udpard_bytes_t payload_head)
1171-
{
1172-
UDPARD_ASSERT(rx_session_is_transfer_ejected(self, transfer_id) ||
1173-
rx_session_is_transfer_interned(self, transfer_id));
1174-
const udpard_rx_ack_mandate_t mandate = {
1175-
.remote = self->remote, .priority = priority, .transfer_id = transfer_id, .payload_head = payload_head
1176-
};
1177-
UDPARD_ASSERT(payload_head.data != NULL || payload_head.size == 0U);
1178-
self->port->vtable->on_ack_mandate(rx, self->port, mandate);
1179-
}
1180-
11811228
static int32_t cavl_compare_rx_session_by_remote_uid(const void* const user, const udpard_tree_t* const node)
11821229
{
11831230
const uint64_t uid_a = *(const uint64_t*)user;
@@ -1456,8 +1503,8 @@ static void rx_session_update_ordered(rx_session_t* const self,
14561503
if (slot->state == rx_slot_done) {
14571504
UDPARD_ASSERT(rx_session_is_transfer_interned(self, slot->transfer_id));
14581505
if (frame->meta.flag_ack) {
1459-
rx_session_on_ack_mandate(
1460-
self, rx, slot->priority, slot->transfer_id, ((udpard_fragment_t*)cavl2_min(slot->fragments))->view);
1506+
// Payload view: ((udpard_fragment_t*)cavl2_min(slot->fragments))->view
1507+
tx_send_ack(rx, ts, slot->priority, self->port->topic_hash, slot->transfer_id, self->remote);
14611508
}
14621509
rx_session_ordered_scan_slots(self, rx, ts, false);
14631510
}
@@ -1466,7 +1513,8 @@ static void rx_session_update_ordered(rx_session_t* const self,
14661513
// meaning that the sender will not get a confirmation if the retransmitted transfer is too old.
14671514
// We assume that RX_TRANSFER_HISTORY_COUNT is enough to cover all sensible use cases.
14681515
if ((is_interned || is_ejected) && frame->meta.flag_ack && (frame->base.offset == 0U)) {
1469-
rx_session_on_ack_mandate(self, rx, frame->meta.priority, frame->meta.transfer_id, frame->base.payload);
1516+
// Payload view: frame->base.payload
1517+
tx_send_ack(rx, ts, frame->meta.priority, self->port->topic_hash, frame->meta.transfer_id, self->remote);
14701518
}
14711519
mem_free_payload(payload_deleter, frame->base.origin);
14721520
}
@@ -1496,16 +1544,15 @@ static void rx_session_update_unordered(rx_session_t* const self,
14961544
&rx->errors_oom,
14971545
&rx->errors_transfer_malformed);
14981546
if (slot->state == rx_slot_done) {
1499-
if (frame->meta.flag_ack) {
1500-
rx_session_on_ack_mandate(
1501-
self, rx, slot->priority, slot->transfer_id, ((udpard_fragment_t*)cavl2_min(slot->fragments))->view);
1547+
if (frame->meta.flag_ack) { // Payload view: ((udpard_fragment_t*)cavl2_min(slot->fragments))->view
1548+
tx_send_ack(rx, ts, slot->priority, self->port->topic_hash, slot->transfer_id, self->remote);
15021549
}
15031550
rx_session_eject(self, rx, slot);
15041551
}
1505-
} else { // retransmit ACK if needed
1506-
if (frame->meta.flag_ack && (frame->base.offset == 0U)) {
1552+
} else { // retransmit ACK if needed
1553+
if (frame->meta.flag_ack && (frame->base.offset == 0U)) { // Payload view: frame->base.payload
15071554
UDPARD_ASSERT(rx_session_is_transfer_ejected(self, frame->meta.transfer_id));
1508-
rx_session_on_ack_mandate(self, rx, frame->meta.priority, frame->meta.transfer_id, frame->base.payload);
1555+
tx_send_ack(rx, ts, frame->meta.priority, self->port->topic_hash, frame->meta.transfer_id, self->remote);
15091556
}
15101557
mem_free_payload(payload_deleter, frame->base.origin);
15111558
}
@@ -1593,7 +1640,9 @@ static bool rx_validate_mem_resources(const udpard_rx_mem_resources_t memory)
15931640
(memory.fragment.alloc != NULL) && (memory.fragment.free != NULL);
15941641
}
15951642

1596-
void udpard_rx_new(udpard_rx_t* const self)
1643+
void udpard_rx_new(udpard_rx_t* const self,
1644+
udpard_tx_t* const tx[UDPARD_NETWORK_INTERFACE_COUNT_MAX],
1645+
const uint64_t p2p_transfer_id_initial)
15971646
{
15981647
UDPARD_ASSERT(self != NULL);
15991648
mem_zero(sizeof(*self), self);
@@ -1602,7 +1651,11 @@ void udpard_rx_new(udpard_rx_t* const self)
16021651
self->errors_oom = 0;
16031652
self->errors_frame_malformed = 0;
16041653
self->errors_transfer_malformed = 0;
1605-
self->user = NULL;
1654+
for (size_t i = 0; i < UDPARD_NETWORK_INTERFACE_COUNT_MAX; i++) {
1655+
self->tx[i] = tx[i];
1656+
}
1657+
self->p2p_transfer_id = p2p_transfer_id_initial;
1658+
self->user = NULL;
16061659
}
16071660

16081661
void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
@@ -1638,7 +1691,7 @@ bool udpard_rx_port_new(udpard_rx_port_t* const self,
16381691
(reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) ||
16391692
(reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS);
16401693
const bool ok = (self != NULL) && rx_validate_mem_resources(memory) && win_ok && (vtable != NULL) &&
1641-
(vtable->on_message != NULL) && (vtable->on_ack_mandate != NULL) && (vtable->on_collision != NULL);
1694+
(vtable->on_message != NULL) && (vtable->on_collision != NULL);
16421695
if (ok) {
16431696
mem_zero(sizeof(*self), self);
16441697
self->topic_hash = topic_hash;
@@ -1659,6 +1712,74 @@ bool udpard_rx_port_new(udpard_rx_port_t* const self,
16591712
return ok;
16601713
}
16611714

1715+
/// A thin proxy that reads the P2P header and dispatches the message to the appropriate handler.
1716+
static void rx_p2p_on_message(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_transfer_t transfer)
1717+
{
1718+
udpard_rx_port_p2p_t* const self = (udpard_rx_port_p2p_t*)port;
1719+
1720+
// Read the header.
1721+
udpard_fragment_t* const frag0 = udpard_fragment_seek(transfer.payload, 0);
1722+
if (frag0->view.size < UDPARD_P2P_HEADER_BYTES) {
1723+
++rx->errors_transfer_malformed;
1724+
udpard_fragment_free_all(transfer.payload, port->memory.fragment);
1725+
return; // Bad transfer -- fragmented header. We can still handle it but it's a protocol violation.
1726+
}
1727+
1728+
// Parse the P2P header.
1729+
const byte_t* ptr = (const byte_t*)frag0->view.data;
1730+
const byte_t kind = *ptr++;
1731+
ptr += 7U; // reserved
1732+
uint64_t topic_hash = 0;
1733+
uint64_t transfer_id = 0;
1734+
ptr = deserialize_u64(ptr, &topic_hash);
1735+
ptr = deserialize_u64(ptr, &transfer_id);
1736+
UDPARD_ASSERT((ptr == (UDPARD_P2P_HEADER_BYTES + (byte_t*)frag0->view.data)));
1737+
1738+
// Remove the header from the view.
1739+
frag0->view.size -= UDPARD_P2P_HEADER_BYTES;
1740+
frag0->view.data = UDPARD_P2P_HEADER_BYTES + (byte_t*)(frag0->view.data);
1741+
1742+
// Process the data depending on the kind.
1743+
if (kind == P2P_KIND_ACK) {
1744+
tx_receive_ack(rx, topic_hash, transfer_id, transfer.remote);
1745+
} else if (kind == P2P_KIND_RESPONSE) {
1746+
const udpard_rx_transfer_p2p_t tr = { .base = transfer, .topic_hash = topic_hash };
1747+
self->vtable->on_message(rx, self, tr);
1748+
} else {
1749+
(void)0; // Malformed, ignored.
1750+
}
1751+
}
1752+
1753+
static void rx_p2p_on_collision(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_remote_t remote)
1754+
{
1755+
(void)rx;
1756+
(void)port;
1757+
(void)remote;
1758+
// A hash collision on a P2P port simply means that someone sent a transfer to the wrong unicast endpoint.
1759+
// This could happen if nodes swapped UDP/IP endpoints live, or if there are multiple nodes sharing the
1760+
// same UDP endpoint (same socket). Simply ignore it as there is nothing to do.
1761+
}
1762+
1763+
bool udpard_rx_port_new_p2p(udpard_rx_port_p2p_t* const self,
1764+
const uint64_t local_uid,
1765+
const size_t extent,
1766+
const udpard_rx_mem_resources_t memory,
1767+
const udpard_rx_port_p2p_vtable_t* const vtable)
1768+
{
1769+
static const udpard_rx_port_vtable_t proxy = { .on_message = rx_p2p_on_message,
1770+
.on_collision = rx_p2p_on_collision };
1771+
if ((self != NULL) && (vtable != NULL) && (vtable->on_message != NULL)) {
1772+
self->vtable = vtable;
1773+
return udpard_rx_port_new((udpard_rx_port_t*)&self, //
1774+
local_uid,
1775+
extent + UDPARD_P2P_HEADER_BYTES,
1776+
UDPARD_RX_REORDERING_WINDOW_UNORDERED,
1777+
memory,
1778+
&proxy);
1779+
}
1780+
return false;
1781+
}
1782+
16621783
void udpard_rx_port_free(udpard_rx_t* const rx, udpard_rx_port_t* const port)
16631784
{
16641785
if ((rx != NULL) && (port != NULL)) {

0 commit comments

Comments
 (0)