Skip to content

Commit 678782a

Browse files
wip the tx pipeline is entirely broken
1 parent 0a26318 commit 678782a

File tree

2 files changed

+203
-201
lines changed

2 files changed

+203
-201
lines changed

libudpard/udpard.c

Lines changed: 135 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -469,12 +469,20 @@ static void* unbias_ptr(const void* const ptr, const size_t offset)
469469
// --------------------------------------------- TX PIPELINE ---------------------------------------------
470470
// ---------------------------------------------------------------------------------------------------------------------
471471

472-
typedef struct
472+
typedef struct tx_item_t
473473
{
474-
udpard_tx_item_t* head;
475-
udpard_tx_item_t* tail;
476-
size_t count;
477-
} tx_chain_t;
474+
udpard_tree_t index_order;
475+
udpard_tree_t index_deadline;
476+
477+
struct tx_item_t* head; ///< Points to the frame where offset=0. Points to itself if this is the first frame.
478+
struct tx_item_t* next; ///< Next frame in this transfer ordered by offset; NULL if last.
479+
480+
udpard_us_t deadline;
481+
udpard_prio_t priority;
482+
udpard_udpip_ep_t destination;
483+
udpard_bytes_mut_t datagram_payload;
484+
void* user_transfer_reference;
485+
} tx_item_t;
478486

479487
static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
480488
{
@@ -485,30 +493,30 @@ static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
485493
/// Frames with identical weight are processed in the FIFO order.
486494
static int32_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
487495
{
488-
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, udpard_tx_item_t, index_order)->priority)
489-
? +1
490-
: -1;
496+
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, tx_item_t, index_order)->priority) ? +1
497+
: -1;
491498
}
492499

493500
static int32_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
494501
{
495-
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, udpard_tx_item_t, index_deadline)->deadline) ? +1 : -1;
502+
return ((*(const udpard_us_t*)user) >= CAVL2_TO_OWNER(node, tx_item_t, index_deadline)->deadline) ? +1 : -1;
496503
}
497504

498-
static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
499-
const udpard_us_t deadline,
500-
const udpard_prio_t priority,
501-
const udpard_udpip_ep_t endpoint,
502-
const size_t datagram_payload_size,
503-
void* const user_transfer_reference)
505+
static tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
506+
const udpard_us_t deadline,
507+
const udpard_prio_t priority,
508+
const udpard_udpip_ep_t endpoint,
509+
const size_t datagram_payload_size,
510+
void* const user_transfer_reference)
504511
{
505-
udpard_tx_item_t* out = mem_alloc(memory.fragment, sizeof(udpard_tx_item_t));
512+
tx_item_t* out = mem_alloc(memory.fragment, sizeof(tx_item_t));
506513
if (out != NULL) {
507514
out->index_order = (udpard_tree_t){ 0 };
508515
out->index_deadline = (udpard_tree_t){ 0 };
509516
UDPARD_ASSERT(priority <= UDPARD_PRIORITY_MAX);
510517
out->priority = priority;
511-
out->next_in_transfer = NULL; // Last by default.
518+
out->head = out; // First by default.
519+
out->next = NULL; // Last by default.
512520
out->deadline = deadline;
513521
out->destination = endpoint;
514522
out->user_transfer_reference = user_transfer_reference;
@@ -517,13 +525,20 @@ static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
517525
out->datagram_payload.data = payload_data;
518526
out->datagram_payload.size = datagram_payload_size;
519527
} else {
520-
mem_free(memory.fragment, sizeof(udpard_tx_item_t), out);
528+
mem_free(memory.fragment, sizeof(tx_item_t), out);
521529
out = NULL;
522530
}
523531
}
524532
return out;
525533
}
526534

535+
typedef struct
536+
{
537+
tx_item_t* head;
538+
tx_item_t* tail;
539+
size_t count;
540+
} tx_chain_t;
541+
527542
/// Produces a chain of tx queue items for later insertion into the tx queue. The tail is NULL if OOM.
528543
/// The caller is responsible for freeing the memory allocated for the chain.
529544
static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
@@ -540,22 +555,23 @@ static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
540555
tx_chain_t out = { NULL, NULL, 0 };
541556
size_t offset = 0U;
542557
do {
543-
const size_t progress = smaller(payload.size - offset, mtu);
544-
udpard_tx_item_t* const item = tx_item_new(memory, //
545-
deadline,
546-
meta.priority,
547-
endpoint,
548-
progress + HEADER_SIZE_BYTES,
549-
user_transfer_reference);
558+
const size_t progress = smaller(payload.size - offset, mtu);
559+
tx_item_t* const item = tx_item_new(memory, //
560+
deadline,
561+
meta.priority,
562+
endpoint,
563+
progress + HEADER_SIZE_BYTES,
564+
user_transfer_reference);
550565
if (NULL == out.head) {
551566
out.head = item;
552567
} else {
553-
out.tail->next_in_transfer = item;
568+
out.tail->next = item;
554569
}
555570
out.tail = item;
556571
if (NULL == out.tail) {
557572
break;
558573
}
574+
item->head = out.head; // All frames in a transfer have a pointer to the head.
559575
const byte_t* const read_ptr = ((const byte_t*)payload.data) + offset;
560576
prefix_crc = crc_add(prefix_crc, progress, read_ptr);
561577
byte_t* const write_ptr = header_serialize(
@@ -569,6 +585,13 @@ static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
569585
return out;
570586
}
571587

588+
/// Derives the ack timeout for an outgoing transfer using an empirical formula.
589+
/// The number of retries is initially zero when the transfer is sent for the first time.
590+
static udpard_us_t tx_ack_timeout(const udpard_us_t baseline, const udpard_prio_t prio, const uint_fast8_t retries)
591+
{
592+
return baseline * (1L << smaller((uint16_t)prio + retries, 15)); // NOLINT(*-signed-bitwise)
593+
}
594+
572595
static uint32_t tx_push(udpard_tx_t* const tx,
573596
const udpard_us_t deadline,
574597
const meta_t meta,
@@ -585,7 +608,7 @@ static uint32_t tx_push(udpard_tx_t* const tx,
585608
} else {
586609
const tx_chain_t chain = tx_spool(tx->memory, mtu, deadline, meta, endpoint, payload, user_transfer_reference);
587610
if (chain.tail != NULL) { // Insert the head into the tx index. Only the head, the rest is linked-listed.
588-
udpard_tx_item_t* const head = chain.head;
611+
tx_item_t* const head = chain.head;
589612
UDPARD_ASSERT(frame_count == chain.count);
590613
const udpard_tree_t* res = cavl2_find_or_insert(
591614
&tx->index_order, &head->priority, &tx_cavl_compare_prio, &head->index_order, &cavl2_trivial_factory);
@@ -603,10 +626,11 @@ static uint32_t tx_push(udpard_tx_t* const tx,
603626
out = (uint32_t)chain.count;
604627
} else { // The queue is large enough but we ran out of heap memory, so we have to unwind the chain.
605628
tx->errors_oom++;
606-
udpard_tx_item_t* head = chain.head;
629+
tx_item_t* head = chain.head;
607630
while (head != NULL) {
608-
udpard_tx_item_t* const next = head->next_in_transfer;
609-
udpard_tx_free(tx->memory, head);
631+
tx_item_t* const next = head->next;
632+
mem_free(tx->memory.payload, head->datagram_payload.size, head->datagram_payload.data);
633+
mem_free(tx->memory.fragment, sizeof(tx_item_t), head);
610634
head = next;
611635
}
612636
}
@@ -618,7 +642,7 @@ static uint64_t tx_purge_expired(udpard_tx_t* const self, const udpard_us_t now)
618642
{
619643
uint64_t count = 0;
620644
for (udpard_tree_t* p = cavl2_min(self->index_deadline); p != NULL;) {
621-
udpard_tx_item_t* const item = CAVL2_TO_OWNER(p, udpard_tx_item_t, index_deadline);
645+
tx_item_t* const item = CAVL2_TO_OWNER(p, tx_item_t, index_deadline);
622646
if (item->deadline >= now) {
623647
break;
624648
}
@@ -627,10 +651,11 @@ static uint64_t tx_purge_expired(udpard_tx_t* const self, const udpard_us_t now)
627651
cavl2_remove(&self->index_deadline, &item->index_deadline);
628652
cavl2_remove(&self->index_order, &item->index_order);
629653
// Free the entire transfer chain.
630-
udpard_tx_item_t* current = item;
654+
tx_item_t* current = item;
631655
while (current != NULL) {
632-
udpard_tx_item_t* const next_in_transfer = current->next_in_transfer;
633-
udpard_tx_free(self->memory, current);
656+
tx_item_t* const next_in_transfer = current->next;
657+
mem_free(self->memory.payload, current->datagram_payload.size, current->datagram_payload.data);
658+
mem_free(self->memory.fragment, sizeof(tx_item_t), current);
634659
current = next_in_transfer;
635660
count++;
636661
self->queue_size--;
@@ -640,95 +665,8 @@ static uint64_t tx_purge_expired(udpard_tx_t* const self, const udpard_us_t now)
640665
return count;
641666
}
642667

643-
bool udpard_tx_new(udpard_tx_t* const self,
644-
const uint64_t local_uid,
645-
const size_t queue_capacity,
646-
const udpard_tx_mem_resources_t memory)
647-
{
648-
const bool ok = (NULL != self) && (local_uid != 0) && tx_validate_mem_resources(memory);
649-
if (ok) {
650-
mem_zero(sizeof(*self), self);
651-
self->local_uid = local_uid;
652-
self->queue_capacity = queue_capacity;
653-
self->mtu = UDPARD_MTU_DEFAULT;
654-
self->memory = memory;
655-
self->queue_size = 0;
656-
self->index_order = NULL;
657-
self->index_deadline = NULL;
658-
}
659-
return ok;
660-
}
661-
662-
uint32_t udpard_tx_push(udpard_tx_t* const self,
663-
const udpard_us_t now,
664-
const udpard_us_t deadline,
665-
const udpard_prio_t priority,
666-
const uint64_t topic_hash,
667-
const udpard_udpip_ep_t remote_ep,
668-
const uint64_t transfer_id,
669-
const udpard_bytes_t payload,
670-
const bool ack_required,
671-
void* const user_transfer_reference)
672-
{
673-
uint32_t out = 0;
674-
const bool ok = (self != NULL) && (deadline >= now) && (self->local_uid != 0) &&
675-
udpard_is_valid_endpoint(remote_ep) && (priority <= UDPARD_PRIORITY_MAX) &&
676-
((payload.data != NULL) || (payload.size == 0U));
677-
if (ok) {
678-
self->errors_expiration += tx_purge_expired(self, now);
679-
const meta_t meta = {
680-
.priority = priority,
681-
.flag_ack = ack_required,
682-
.transfer_payload_size = (uint32_t)payload.size,
683-
.transfer_id = transfer_id,
684-
.sender_uid = self->local_uid,
685-
.topic_hash = topic_hash,
686-
};
687-
out = tx_push(self, deadline, meta, remote_ep, payload, user_transfer_reference);
688-
}
689-
return out;
690-
}
691-
692-
udpard_tx_item_t* udpard_tx_peek(udpard_tx_t* const self, const udpard_us_t now)
693-
{
694-
udpard_tx_item_t* out = NULL;
695-
if (self != NULL) {
696-
self->errors_expiration += tx_purge_expired(self, now);
697-
out = CAVL2_TO_OWNER(cavl2_min(self->index_order), udpard_tx_item_t, index_order);
698-
}
699-
return out;
700-
}
701-
702-
void udpard_tx_pop(udpard_tx_t* const self, udpard_tx_item_t* const item)
703-
{
704-
if ((self != NULL) && (item != NULL)) {
705-
if (item->next_in_transfer == NULL) {
706-
cavl2_remove(&self->index_order, &item->index_order);
707-
cavl2_remove(&self->index_deadline, &item->index_deadline);
708-
} else { // constant-time update, super quick, just relink a few pointers!
709-
cavl2_replace(&self->index_order, &item->index_order, &item->next_in_transfer->index_order);
710-
cavl2_replace(&self->index_deadline, &item->index_deadline, &item->next_in_transfer->index_deadline);
711-
}
712-
self->queue_size--;
713-
}
714-
}
715-
716-
void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* const item)
717-
{
718-
if (item != NULL) {
719-
UDPARD_ASSERT((item->index_order.lr[0] == NULL) && (item->index_order.up == NULL) &&
720-
(item->index_order.lr[1] == NULL));
721-
UDPARD_ASSERT((item->index_deadline.lr[0] == NULL) && (item->index_deadline.up == NULL) &&
722-
(item->index_deadline.lr[1] == NULL));
723-
if (item->datagram_payload.data != NULL) {
724-
mem_free(memory.payload, item->datagram_payload.size, item->datagram_payload.data);
725-
}
726-
mem_free(memory.fragment, sizeof(udpard_tx_item_t), item);
727-
}
728-
}
729-
730668
/// Handle an ACK received from a remote node.
731-
/// This is where we acknowledge pending transmissions.
669+
/// This is where we dequeue pending transmissions and invoke the feedback callback.
732670
static void tx_receive_ack(udpard_rx_t* const rx,
733671
const uint64_t topic_hash,
734672
const uint64_t transfer_id,
@@ -738,7 +676,7 @@ static void tx_receive_ack(udpard_rx_t* const rx,
738676
(void)topic_hash;
739677
(void)transfer_id;
740678
(void)remote;
741-
// TODO: implement
679+
// TODO: find the transfer in the TX queue by topic and transfer-ID and remove it; invoke the feedback callback.
742680
}
743681

744682
/// Generate an ack transfer for the specified remote transfer.
@@ -784,6 +722,80 @@ static void tx_send_ack(udpard_rx_t* const rx,
784722
}
785723
}
786724

725+
bool udpard_tx_new(udpard_tx_t* const self,
726+
const uint64_t local_uid,
727+
const size_t queue_capacity,
728+
const udpard_tx_mem_resources_t memory,
729+
const udpard_tx_vtable_t* const vtable)
730+
{
731+
const bool ok = (NULL != self) && (local_uid != 0) && tx_validate_mem_resources(memory) && (vtable != NULL) &&
732+
(vtable->eject != NULL) && (vtable->feedback != NULL);
733+
if (ok) {
734+
mem_zero(sizeof(*self), self);
735+
self->vtable = vtable;
736+
self->local_uid = local_uid;
737+
self->queue_capacity = queue_capacity;
738+
self->mtu = UDPARD_MTU_DEFAULT;
739+
self->ack_baseline_timeout = UDPARD_TX_ACK_BASELINE_TIMEOUT_DEFAULT_us;
740+
self->memory = memory;
741+
self->queue_size = 0;
742+
self->index_order = NULL;
743+
self->index_deadline = NULL;
744+
self->user = NULL;
745+
}
746+
return ok;
747+
}
748+
749+
uint32_t udpard_tx_push(udpard_tx_t* const self,
750+
const udpard_us_t now,
751+
const udpard_us_t deadline,
752+
const udpard_prio_t priority,
753+
const uint64_t topic_hash,
754+
const udpard_udpip_ep_t remote_ep,
755+
const uint64_t transfer_id,
756+
const udpard_bytes_t payload,
757+
const bool reliable,
758+
void* const user_transfer_reference)
759+
{
760+
uint32_t out = 0;
761+
const bool ok = (self != NULL) && (deadline >= now) && (self->local_uid != 0) &&
762+
udpard_is_valid_endpoint(remote_ep) && (priority <= UDPARD_PRIORITY_MAX) &&
763+
((payload.data != NULL) || (payload.size == 0U));
764+
if (ok) {
765+
self->errors_expiration += tx_purge_expired(self, now);
766+
const meta_t meta = {
767+
.priority = priority,
768+
.flag_ack = reliable,
769+
.transfer_payload_size = (uint32_t)payload.size,
770+
.transfer_id = transfer_id,
771+
.sender_uid = self->local_uid,
772+
.topic_hash = topic_hash,
773+
};
774+
out = tx_push(self, deadline, meta, remote_ep, payload, user_transfer_reference);
775+
}
776+
return out;
777+
}
778+
779+
void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now)
780+
{
781+
if ((self != NULL) && (now >= 0)) {
782+
self->errors_expiration += tx_purge_expired(self, now);
783+
while (self->queue_size > 0) {
784+
// TODO fetch the next scheduled frame and invoke the eject callback
785+
break; // Remove this when implemented
786+
}
787+
}
788+
}
789+
790+
void udpard_tx_free(udpard_tx_t* const self)
791+
{
792+
if (self != NULL) {
793+
// TODO: do this for all items in the queue:
794+
// mem_free(memory.payload, item->datagram_payload.size, item->datagram_payload.data);
795+
// mem_free(memory.fragment, sizeof(tx_item_t), item);
796+
}
797+
}
798+
787799
// ---------------------------------------------------------------------------------------------------------------------
788800
// --------------------------------------------- RX PIPELINE ---------------------------------------------
789801
// ---------------------------------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)