Skip to content

Commit 262cd10

Browse files
complete the tx pipeline
1 parent c8a7fec commit 262cd10

File tree

3 files changed

+160
-66
lines changed

3 files changed

+160
-66
lines changed

lib/cavl/cavl2.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ static inline CAVL2_T* cavl2_find(CAVL2_T* root, const void* const user_comparat
138138
/// The removed node will have all of its pointers set to NULL.
139139
static inline void cavl2_remove(CAVL2_T** const root, CAVL2_T* const node);
140140

141+
/// Replace the specified node with another node without rebalancing.
142+
/// This is useful when you want to replace a node with an equivalent one (same key ordering).
143+
/// The new node takes over the position (parent, children, balance factor) of the old node.
144+
/// The old node will have all of its pointers set to NULL.
145+
/// The new node must not already be in the tree; if it is, the behavior is undefined.
146+
/// The new node's fields (up, lr, bf) will be overwritten to match the old node's position in the tree.
147+
/// The complexity is O(1).
148+
/// The function has no effect if any of the pointers are NULL.
149+
/// If the old node is not in the tree, the behavior is undefined.
150+
static inline void cavl2_replace(CAVL2_T** const root, CAVL2_T* const old_node, CAVL2_T* const new_node);
151+
141152
/// True iff the node is in the tree. The complexity is O(1).
142153
/// Returns false if the node is NULL.
143154
/// Assumes that the node pointers are NULL when it is not inserted (this is ensured by the removal function).
@@ -434,6 +445,37 @@ static inline void cavl2_remove(CAVL2_T** const root, CAVL2_T* const node)
434445
}
435446
}
436447

448+
static inline void cavl2_replace(CAVL2_T** const root, CAVL2_T* const old_node, CAVL2_T* const new_node)
449+
{
450+
if ((root != NULL) && (old_node != NULL) && (new_node != NULL)) {
451+
CAVL2_ASSERT(*root != NULL); // Otherwise, old_node would have to be NULL.
452+
CAVL2_ASSERT((old_node->up != NULL) || (old_node == *root)); // old_node must be in the tree.
453+
CAVL2_ASSERT((new_node->up == NULL) && (new_node->lr[0] == NULL) && (new_node->lr[1] == NULL));
454+
// Copy the structural data from the old node to the new node.
455+
new_node->up = old_node->up;
456+
new_node->lr[0] = old_node->lr[0];
457+
new_node->lr[1] = old_node->lr[1];
458+
new_node->bf = old_node->bf;
459+
// Update the parent to point to the new node.
460+
if (old_node->up != NULL) {
461+
old_node->up->lr[old_node->up->lr[1] == old_node] = new_node;
462+
} else {
463+
*root = new_node;
464+
}
465+
// Update the children to point to the new parent.
466+
if (old_node->lr[0] != NULL) {
467+
old_node->lr[0]->up = new_node;
468+
}
469+
if (old_node->lr[1] != NULL) {
470+
old_node->lr[1]->up = new_node;
471+
}
472+
// Invalidate the old node's pointers to indicate it is no longer in the tree.
473+
old_node->up = NULL;
474+
old_node->lr[0] = NULL;
475+
old_node->lr[1] = NULL;
476+
}
477+
}
478+
437479
#ifdef __cplusplus
438480
}
439481
#endif

libudpard/udpard.c

Lines changed: 108 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
// --------------------------------------------- COMMONS ---------------------------------------------
3131

3232
#define CAVL2_T udpard_tree_t
33-
#define CAVL2_RELATION int64_t
33+
#define CAVL2_RELATION int32_t
3434
#define CAVL2_ASSERT(x) UDPARD_ASSERT(x) // NOSONAR
3535
#include "cavl2.h"
3636

@@ -284,15 +284,18 @@ static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
284284
}
285285

286286
/// Frames with identical weight are processed in the FIFO order.
287-
/// Frames with higher weight compare smaller (i.e., put on the left side of the tree).
288-
static int64_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
287+
static int32_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
289288
{
290-
return ((int)*(const udpard_prio_t*)user) - (int)(CAVL2_TO_OWNER(node, udpard_tx_item_t, index_prio)->priority);
289+
return (((int)*(const udpard_prio_t*)user) >= (int)CAVL2_TO_OWNER(node, udpard_tx_item_t, index_prio)->priority)
290+
? +1
291+
: -1;
291292
}
292293

293-
static int64_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
294+
static int32_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
294295
{
295-
return (*(const udpard_microsecond_t*)user) - (CAVL2_TO_OWNER(node, udpard_tx_item_t, index_deadline)->deadline);
296+
return ((*(const udpard_microsecond_t*)user) >= CAVL2_TO_OWNER(node, udpard_tx_item_t, index_deadline)->deadline)
297+
? +1
298+
: -1;
296299
}
297300

298301
static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
@@ -328,7 +331,7 @@ static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
328331
/// The caller is responsible for freeing the memory allocated for the chain.
329332
static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
330333
const size_t mtu,
331-
const udpard_microsecond_t deadline_usec,
334+
const udpard_microsecond_t deadline,
332335
const meta_t meta,
333336
const udpard_udpip_ep_t endpoint,
334337
const udpard_bytes_t payload,
@@ -343,7 +346,7 @@ static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
343346
size_t offset = 0U;
344347
while (offset < payload_size_with_crc) {
345348
udpard_tx_item_t* const item = tx_item_new(memory,
346-
deadline_usec,
349+
deadline,
347350
meta.priority,
348351
endpoint,
349352
smaller(payload_size_with_crc - offset, mtu) + HEADER_SIZE_BYTES,
@@ -400,28 +403,22 @@ static uint32_t tx_push(udpard_tx_t* const tx,
400403
tx->errors_capacity++;
401404
} else {
402405
const tx_chain_t chain = tx_spool(tx->memory, mtu, deadline, meta, endpoint, payload, user_transfer_reference);
403-
if (chain.tail != NULL) { // Insert every item into the tx indexes.
406+
if (chain.tail != NULL) { // Insert the head into the tx index. Only the head, the rest is linked-listed.
407+
udpard_tx_item_t* const head = chain.head;
404408
UDPARD_ASSERT(frame_count == chain.count);
405-
udpard_tx_item_t* next = chain.head;
406-
do {
407-
const udpard_tree_t* res = cavl2_find_or_insert(&tx->index_prio, //
408-
&next->priority,
409-
&tx_cavl_compare_prio,
410-
&next->index_prio,
411-
&cavl2_trivial_factory);
412-
UDPARD_ASSERT(res == &next->index_prio);
413-
res = cavl2_find_or_insert(&tx->index_deadline, //
414-
&next->deadline,
415-
&tx_cavl_compare_deadline,
416-
&next->index_deadline,
417-
&cavl2_trivial_factory);
418-
UDPARD_ASSERT(res == &next->index_deadline);
419-
(void)res;
420-
next = next->next_in_transfer;
421-
} while (next != NULL);
409+
const udpard_tree_t* res = cavl2_find_or_insert(
410+
&tx->index_prio, &head->priority, &tx_cavl_compare_prio, &head->index_prio, &cavl2_trivial_factory);
411+
UDPARD_ASSERT(res == &head->index_prio);
412+
(void)res;
413+
res = cavl2_find_or_insert(&tx->index_deadline,
414+
&head->deadline,
415+
&tx_cavl_compare_deadline,
416+
&head->index_deadline,
417+
&cavl2_trivial_factory);
418+
UDPARD_ASSERT(res == &head->index_deadline);
419+
(void)res;
422420
tx->queue_size += chain.count;
423421
UDPARD_ASSERT(tx->queue_size <= tx->queue_capacity);
424-
UDPARD_ASSERT((chain.count + 0ULL) <= INT32_MAX); // +0 is to suppress warning.
425422
out = (uint32_t)chain.count;
426423
} else { // The queue is large enough but we ran out of heap memory, so we have to unwind the chain.
427424
tx->errors_oom++;
@@ -436,6 +433,30 @@ static uint32_t tx_push(udpard_tx_t* const tx,
436433
return out;
437434
}
438435

436+
static uint64_t tx_purge_expired(udpard_tx_t* const self, const udpard_microsecond_t now)
437+
{
438+
uint64_t count = 0;
439+
for (udpard_tree_t* p = cavl2_min(self->index_deadline); p != NULL; p = cavl2_next_greater(p)) {
440+
udpard_tx_item_t* const item = CAVL2_TO_OWNER(p, udpard_tx_item_t, index_deadline);
441+
if (item->deadline >= now) {
442+
break;
443+
}
444+
// Remove from both indices.
445+
cavl2_remove(&self->index_deadline, &item->index_deadline);
446+
cavl2_remove(&self->index_prio, &item->index_prio);
447+
// Free the entire transfer chain.
448+
udpard_tx_item_t* current = item;
449+
while (current != NULL) {
450+
udpard_tx_item_t* const next = current->next_in_transfer;
451+
udpard_tx_free(self->memory, current);
452+
current = next;
453+
count++;
454+
self->queue_size--;
455+
}
456+
}
457+
return count;
458+
}
459+
439460
bool udpard_tx_new(udpard_tx_t* const self,
440461
const uint64_t local_uid,
441462
const size_t queue_capacity,
@@ -463,19 +484,24 @@ uint32_t udpard_tx_publish(udpard_tx_t* const self,
463484
const uint32_t subject_id,
464485
const uint64_t transfer_id,
465486
const udpard_bytes_t payload,
487+
const bool ack_required,
466488
void* const user_transfer_reference)
467489
{
468-
(void)self;
469-
(void)now;
470-
(void)deadline;
471-
(void)priority;
472-
(void)topic_hash;
473-
(void)subject_id;
474-
(void)transfer_id;
475-
(void)payload;
476-
(void)user_transfer_reference;
477-
(void)tx_push;
478-
return 0;
490+
uint32_t out = 0;
491+
if ((self != NULL) && (self->local_uid != 0) && (priority <= UDPARD_PRIORITY_MAX) &&
492+
((payload.data != NULL) || (payload.size == 0U))) {
493+
self->errors_expiration += tx_purge_expired(self, now);
494+
const meta_t meta = {
495+
.priority = priority,
496+
.flag_ack = ack_required,
497+
.transfer_payload_size = (uint32_t)payload.size,
498+
.transfer_id = transfer_id,
499+
.sender_uid = self->local_uid,
500+
.topic_hash = topic_hash,
501+
};
502+
out = tx_push(self, deadline, meta, make_topic_ep(subject_id), payload, user_transfer_reference);
503+
}
504+
return out;
479505
}
480506

481507
uint32_t udpard_tx_p2p(udpard_tx_t* const self,
@@ -486,38 +512,62 @@ uint32_t udpard_tx_p2p(udpard_tx_t* const self,
486512
const udpard_prio_t priority,
487513
const uint64_t transfer_id,
488514
const udpard_bytes_t payload,
515+
const bool ack_required,
489516
void* const user_transfer_reference)
490517
{
491-
(void)self;
492-
(void)now;
493-
(void)deadline;
494-
(void)priority;
495-
(void)remote_uid;
496-
(void)remote_ep;
497-
(void)transfer_id;
498-
(void)payload;
499-
(void)user_transfer_reference;
500-
(void)tx_push;
501-
return 0;
518+
uint32_t out = 0;
519+
if ((self != NULL) && (self->local_uid != 0) && (remote_uid != 0) && (remote_ep.ip != 0) && (remote_ep.port != 0) &&
520+
(priority <= UDPARD_PRIORITY_MAX) && ((payload.data != NULL) || (payload.size == 0U))) {
521+
self->errors_expiration += tx_purge_expired(self, now);
522+
const meta_t meta = {
523+
.priority = priority,
524+
.flag_ack = ack_required,
525+
.transfer_payload_size = (uint32_t)payload.size,
526+
.transfer_id = transfer_id,
527+
.sender_uid = self->local_uid,
528+
.topic_hash = remote_uid,
529+
};
530+
out = tx_push(self, deadline, meta, remote_ep, payload, user_transfer_reference);
531+
}
532+
return out;
502533
}
503534

504-
udpard_tx_item_t* udpard_tx_peek(const udpard_tx_t* const self, const udpard_microsecond_t now)
535+
udpard_tx_item_t* udpard_tx_peek(udpard_tx_t* const self, const udpard_microsecond_t now)
505536
{
506-
(void)self;
507-
(void)now;
508-
return NULL;
537+
udpard_tx_item_t* out = NULL;
538+
if (self != NULL) {
539+
self->errors_expiration += tx_purge_expired(self, now);
540+
out = CAVL2_TO_OWNER(cavl2_min(self->index_prio), udpard_tx_item_t, index_prio);
541+
}
542+
return out;
509543
}
510544

511-
void udpard_tx_pop(const udpard_tx_t* const self, udpard_tx_item_t* const item)
545+
void udpard_tx_pop(udpard_tx_t* const self, udpard_tx_item_t* const item)
512546
{
513-
(void)self;
514-
(void)item;
547+
if ((self != NULL) && (item != NULL)) {
548+
if (item->next_in_transfer == NULL) {
549+
cavl2_remove(&self->index_prio, &item->index_prio);
550+
cavl2_remove(&self->index_deadline, &item->index_deadline);
551+
} else { // constant-time update, super quick, just relink a few pointers!
552+
cavl2_replace(&self->index_prio, &item->index_prio, &item->next_in_transfer->index_prio);
553+
cavl2_replace(&self->index_deadline, &item->index_deadline, &item->next_in_transfer->index_deadline);
554+
}
555+
self->queue_size--;
556+
}
515557
}
516558

517559
void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* const item)
518560
{
519-
(void)memory;
520-
(void)item;
561+
if (item != NULL) {
562+
UDPARD_ASSERT((item->index_prio.lr[0] == NULL) && (item->index_prio.up == NULL) &&
563+
(item->index_prio.lr[1] == NULL));
564+
UDPARD_ASSERT((item->index_deadline.lr[0] == NULL) && (item->index_deadline.up == NULL) &&
565+
(item->index_deadline.lr[1] == NULL));
566+
if (item->datagram_payload.data != NULL) {
567+
mem_free(memory.payload, item->datagram_payload.size, item->datagram_payload.data);
568+
}
569+
mem_free(memory.fragment, sizeof(udpard_tx_item_t), item);
570+
}
521571
}
522572

523573
// --------------------------------------------- RX PIPELINE ---------------------------------------------

libudpard/udpard.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ extern "C"
126126
/// To guarantee a single frame transfer, the maximum payload size shall be 4 bytes less to accommodate the CRC.
127127
#define UDPARD_MTU_DEFAULT_MAX_SINGLE_FRAME (UDPARD_MTU_DEFAULT - 4U)
128128

129-
/// MTU less than this is not supported and should not be used.
129+
/// MTU less than this should not be used.
130130
#define UDPARD_MTU_MIN 460U
131131

132132
#define UDPARD_PRIORITY_MAX 7U
@@ -317,9 +317,9 @@ typedef struct udpard_tx_t
317317

318318
/// Error counters incremented automatically when the corresponding error condition occurs.
319319
/// These counters are never decremented by the library but they can be reset by the application if needed.
320-
uint64_t errors_oom; ///< A transfer could not be enqueued due to OOM.
321-
uint64_t errors_capacity; ///< A transfer could not be enqueued due to queue capacity limit.
322-
uint64_t errors_deadline; ///< A frame had to be dropped due to premature deadline expiration.
320+
uint64_t errors_oom; ///< A transfer could not be enqueued due to OOM.
321+
uint64_t errors_capacity; ///< A transfer could not be enqueued due to queue capacity limit.
322+
uint64_t errors_expiration; ///< A frame had to be dropped due to premature deadline expiration.
323323

324324
/// Internal use only, do not modify!
325325
udpard_tree_t* index_prio; ///< Most urgent on the left, then according to the insertion order.
@@ -414,7 +414,7 @@ bool udpard_tx_new(udpard_tx_t* const self,
414414
/// the last frame of the transfer; the TX queue `memory.payload` resource is used for this allocation.
415415
///
416416
/// The time complexity is O(p + log e), where p is the amount of payload in the transfer, and e is the number of
417-
/// frames already enqueued in the transmission queue.
417+
/// transfers (not frames) already enqueued in the transmission queue.
418418
uint32_t udpard_tx_publish(udpard_tx_t* const self,
419419
const udpard_microsecond_t now,
420420
const udpard_microsecond_t deadline,
@@ -423,6 +423,7 @@ uint32_t udpard_tx_publish(udpard_tx_t* const self,
423423
const uint32_t subject_id,
424424
const uint64_t transfer_id,
425425
const udpard_bytes_t payload,
426+
const bool ack_required,
426427
void* const user_transfer_reference);
427428

428429
/// Similar to udpard_tx_publish, but for P2P transfers between specific nodes.
@@ -436,20 +437,21 @@ uint32_t udpard_tx_p2p(udpard_tx_t* const self,
436437
const udpard_prio_t priority,
437438
const uint64_t transfer_id,
438439
const udpard_bytes_t payload,
440+
const bool ack_required,
439441
void* const user_transfer_reference);
440442

441443
/// Purges all timed out items from the transmission queue automatically; returns the next item to be transmitted,
442444
/// if there is any, otherwise NULL. The returned item is not removed from the queue; use udpard_tx_pop() to do that.
443445
/// The returned item (if any) is guaranteed to be non-expired (deadline>=now).
444-
udpard_tx_item_t* udpard_tx_peek(const udpard_tx_t* const self, const udpard_microsecond_t now);
446+
udpard_tx_item_t* udpard_tx_peek(udpard_tx_t* const self, const udpard_microsecond_t now);
445447

446448
/// Transfers the ownership of the specified item to the application. The item does not necessarily need to be the
447449
/// top one -- it is safe to dequeue any item. The item is dequeued but not invalidated; it is the responsibility of
448450
/// the application to deallocate its memory later.
449451
/// The memory SHALL NOT be deallocated UNTIL this function is invoked (use udpard_tx_free()).
450452
/// If any of the arguments are NULL, the function has no effect.
451-
/// The time complexity is logarithmic of the queue size. This function does not invoke the dynamic memory manager.
452-
void udpard_tx_pop(const udpard_tx_t* const self, udpard_tx_item_t* const item);
453+
/// This function does not invoke the dynamic memory manager.
454+
void udpard_tx_pop(udpard_tx_t* const self, udpard_tx_item_t* const item);
453455

454456
/// This is a simple helper that frees the memory allocated for the item and its payload.
455457
/// If the item argument is NULL, the function has no effect. The time complexity is constant.

0 commit comments

Comments
 (0)