Skip to content

Commit c8a7fec

Browse files
tx pipeline core
1 parent 94d3aa1 commit c8a7fec

File tree

3 files changed

+339
-60
lines changed

3 files changed

+339
-60
lines changed

libudpard/udpard.c

Lines changed: 296 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,13 @@ static uint32_t crc_compute(const size_t size, const void* const data)
155155

156156
typedef struct
157157
{
158-
uint_fast8_t version;
159158
udpard_prio_t priority;
160-
bool flag_eot;
161159
bool flag_ack;
162-
uint32_t frame_index;
163-
uint32_t frame_payload_offset;
164160
uint32_t transfer_payload_size;
165161
uint64_t transfer_id;
166162
uint64_t sender_uid;
167163
uint64_t topic_hash;
168-
} header_t;
164+
} meta_t;
169165

170166
static byte_t* serialize_u32(byte_t* ptr, const uint32_t value)
171167
{
@@ -205,59 +201,323 @@ static const byte_t* deserialize_u64(const byte_t* ptr, uint64_t* const out_valu
205201
return ptr;
206202
}
207203

208-
static void header_serialize(byte_t* const buffer, const header_t hdr)
204+
static byte_t* header_serialize(byte_t* const buffer,
205+
const meta_t meta,
206+
const bool flag_eot,
207+
const uint32_t frame_index,
208+
const uint32_t frame_payload_offset)
209209
{
210210
byte_t* ptr = buffer;
211211
byte_t flags = 0;
212-
if (hdr.flag_eot) {
212+
if (flag_eot) {
213213
flags |= HEADER_FLAG_EOT;
214214
}
215-
if (hdr.flag_ack) {
215+
if (meta.flag_ack) {
216216
flags |= HEADER_FLAG_ACK;
217217
}
218-
*ptr++ = (byte_t)(HEADER_VERSION | (hdr.priority << 5U));
218+
*ptr++ = (byte_t)(HEADER_VERSION | (meta.priority << 5U));
219219
*ptr++ = flags;
220220
*ptr++ = 0;
221221
*ptr++ = 0;
222-
ptr = serialize_u32(ptr, hdr.frame_index);
223-
ptr = serialize_u32(ptr, hdr.frame_payload_offset);
224-
ptr = serialize_u32(ptr, hdr.transfer_payload_size);
225-
ptr = serialize_u64(ptr, hdr.transfer_id);
226-
ptr = serialize_u64(ptr, hdr.sender_uid);
227-
ptr = serialize_u64(ptr, hdr.topic_hash);
222+
ptr = serialize_u32(ptr, frame_index);
223+
ptr = serialize_u32(ptr, frame_payload_offset);
224+
ptr = serialize_u32(ptr, meta.transfer_payload_size);
225+
ptr = serialize_u64(ptr, meta.transfer_id);
226+
ptr = serialize_u64(ptr, meta.sender_uid);
227+
ptr = serialize_u64(ptr, meta.topic_hash);
228228
ptr = serialize_u32(ptr, 0);
229229
ptr = serialize_u32(ptr, crc_compute(HEADER_SIZE_BYTES - CRC_SIZE_BYTES, buffer));
230230
UDPARD_ASSERT((size_t)(ptr - buffer) == HEADER_SIZE_BYTES);
231+
return ptr;
231232
}
232233

233234
static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
234-
header_t* const out_hdr,
235+
meta_t* const out_meta,
236+
bool* const flag_eot,
237+
uint32_t* const frame_index,
238+
uint32_t* const frame_payload_offset,
235239
udpard_bytes_mut_t* const out_payload)
236240
{
237241
UDPARD_ASSERT(out_payload != NULL);
238-
const bool ok = (dgram_payload.size >= HEADER_SIZE_BYTES) && (dgram_payload.data != NULL) && //
239-
(crc_compute(HEADER_SIZE_BYTES, dgram_payload.data) == CRC_RESIDUE_AFTER_OUTPUT_XOR);
242+
bool ok = (dgram_payload.size >= HEADER_SIZE_BYTES) && (dgram_payload.data != NULL) && //
243+
(crc_compute(HEADER_SIZE_BYTES, dgram_payload.data) == CRC_RESIDUE_AFTER_OUTPUT_XOR);
240244
if (ok) {
241-
const byte_t* ptr = dgram_payload.data;
242-
const byte_t vp = *ptr++;
243-
out_hdr->version = vp & 0x1FU;
244-
out_hdr->priority = (udpard_prio_t)((byte_t)(vp >> 5U) & 0x07U);
245-
const byte_t flags = *ptr++;
246-
out_hdr->flag_eot = (flags & HEADER_FLAG_EOT) != 0U;
247-
out_hdr->flag_ack = (flags & HEADER_FLAG_ACK) != 0U;
248-
ptr += 2U;
249-
ptr = deserialize_u32(ptr, &out_hdr->frame_index);
250-
ptr = deserialize_u32(ptr, &out_hdr->frame_payload_offset);
251-
ptr = deserialize_u32(ptr, &out_hdr->transfer_payload_size);
252-
ptr = deserialize_u64(ptr, &out_hdr->transfer_id);
253-
ptr = deserialize_u64(ptr, &out_hdr->sender_uid);
254-
ptr = deserialize_u64(ptr, &out_hdr->topic_hash);
255-
(void)ptr;
256-
// Set up the output payload view.
257-
out_payload->size = dgram_payload.size - HEADER_SIZE_BYTES;
258-
out_payload->data = (byte_t*)dgram_payload.data + HEADER_SIZE_BYTES;
245+
const byte_t* ptr = dgram_payload.data;
246+
const byte_t head = *ptr++;
247+
const byte_t version = head & 0x1FU;
248+
if (version == HEADER_VERSION) {
249+
out_meta->priority = (udpard_prio_t)((byte_t)(head >> 5U) & 0x07U);
250+
const byte_t flags = *ptr++;
251+
*flag_eot = (flags & HEADER_FLAG_EOT) != 0U;
252+
out_meta->flag_ack = (flags & HEADER_FLAG_ACK) != 0U;
253+
ptr += 2U;
254+
ptr = deserialize_u32(ptr, frame_index);
255+
ptr = deserialize_u32(ptr, frame_payload_offset);
256+
ptr = deserialize_u32(ptr, &out_meta->transfer_payload_size);
257+
ptr = deserialize_u64(ptr, &out_meta->transfer_id);
258+
ptr = deserialize_u64(ptr, &out_meta->sender_uid);
259+
ptr = deserialize_u64(ptr, &out_meta->topic_hash);
260+
(void)ptr;
261+
// Set up the output payload view.
262+
out_payload->size = dgram_payload.size - HEADER_SIZE_BYTES;
263+
out_payload->data = (byte_t*)dgram_payload.data + HEADER_SIZE_BYTES;
264+
} else {
265+
ok = false;
266+
}
259267
}
260268
return ok;
261269
}
262270

263271
// --------------------------------------------- TX PIPELINE ---------------------------------------------
272+
273+
typedef struct
274+
{
275+
udpard_tx_item_t* head;
276+
udpard_tx_item_t* tail;
277+
size_t count;
278+
} tx_chain_t;
279+
280+
static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
281+
{
282+
return (memory.fragment.alloc != NULL) && (memory.fragment.free != NULL) && //
283+
(memory.payload.alloc != NULL) && (memory.payload.free != NULL);
284+
}
285+
286+
/// Frames with identical weight are processed in the FIFO order.
287+
/// Frames with higher weight compare smaller (i.e., put on the left side of the tree).
288+
static int64_t tx_cavl_compare_prio(const void* const user, const udpard_tree_t* const node)
289+
{
290+
return ((int)*(const udpard_prio_t*)user) - (int)(CAVL2_TO_OWNER(node, udpard_tx_item_t, index_prio)->priority);
291+
}
292+
293+
static int64_t tx_cavl_compare_deadline(const void* const user, const udpard_tree_t* const node)
294+
{
295+
return (*(const udpard_microsecond_t*)user) - (CAVL2_TO_OWNER(node, udpard_tx_item_t, index_deadline)->deadline);
296+
}
297+
298+
static udpard_tx_item_t* tx_item_new(const udpard_tx_mem_resources_t memory,
299+
const udpard_microsecond_t deadline,
300+
const udpard_prio_t priority,
301+
const udpard_udpip_ep_t endpoint,
302+
const size_t datagram_payload_size,
303+
void* const user_transfer_reference)
304+
{
305+
udpard_tx_item_t* out = mem_alloc(memory.fragment, sizeof(udpard_tx_item_t));
306+
if (out != NULL) {
307+
out->index_prio = (udpard_tree_t){ 0 };
308+
out->index_deadline = (udpard_tree_t){ 0 };
309+
UDPARD_ASSERT(priority <= UDPARD_PRIORITY_MAX);
310+
out->priority = priority;
311+
out->next_in_transfer = NULL; // Last by default.
312+
out->deadline = deadline;
313+
out->destination = endpoint;
314+
out->user_transfer_reference = user_transfer_reference;
315+
void* const payload_data = mem_alloc(memory.payload, datagram_payload_size);
316+
if (NULL != payload_data) {
317+
out->datagram_payload.data = payload_data;
318+
out->datagram_payload.size = datagram_payload_size;
319+
} else {
320+
mem_free(memory.fragment, sizeof(udpard_tx_item_t), out);
321+
out = NULL;
322+
}
323+
}
324+
return out;
325+
}
326+
327+
/// Produces a chain of tx queue items for later insertion into the tx queue. The tail is NULL if OOM.
328+
/// The caller is responsible for freeing the memory allocated for the chain.
329+
static tx_chain_t tx_spool(const udpard_tx_mem_resources_t memory,
330+
const size_t mtu,
331+
const udpard_microsecond_t deadline_usec,
332+
const meta_t meta,
333+
const udpard_udpip_ep_t endpoint,
334+
const udpard_bytes_t payload,
335+
void* const user_transfer_reference)
336+
{
337+
UDPARD_ASSERT(mtu > 0);
338+
UDPARD_ASSERT((payload.data != NULL) || (payload.size == 0U));
339+
const size_t payload_size_with_crc = payload.size + CRC_SIZE_BYTES;
340+
byte_t crc_bytes[CRC_SIZE_BYTES];
341+
serialize_u32(crc_bytes, crc_compute(payload.size, payload.data));
342+
tx_chain_t out = { NULL, NULL, 0 };
343+
size_t offset = 0U;
344+
while (offset < payload_size_with_crc) {
345+
udpard_tx_item_t* const item = tx_item_new(memory,
346+
deadline_usec,
347+
meta.priority,
348+
endpoint,
349+
smaller(payload_size_with_crc - offset, mtu) + HEADER_SIZE_BYTES,
350+
user_transfer_reference);
351+
if (NULL == out.head) {
352+
out.head = item;
353+
} else {
354+
out.tail->next_in_transfer = item;
355+
}
356+
out.tail = item;
357+
if (NULL == out.tail) {
358+
break;
359+
}
360+
const bool last = (payload_size_with_crc - offset) <= mtu;
361+
byte_t* const dst_buffer = item->datagram_payload.data;
362+
byte_t* write_ptr = header_serialize(dst_buffer, meta, last, (uint32_t)out.count, (uint32_t)offset);
363+
if (offset < payload.size) {
364+
const size_t progress = smaller(payload.size - offset, mtu);
365+
// NOLINTNEXTLINE(clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling)
366+
(void)memcpy(write_ptr, ((const byte_t*)payload.data) + offset, progress);
367+
offset += progress;
368+
write_ptr += progress;
369+
UDPARD_ASSERT(offset <= payload.size);
370+
UDPARD_ASSERT((!last) || (offset == payload.size));
371+
}
372+
if (offset >= payload.size) {
373+
const size_t crc_offset = offset - payload.size;
374+
UDPARD_ASSERT(crc_offset < CRC_SIZE_BYTES);
375+
const size_t available = item->datagram_payload.size - (size_t)(write_ptr - dst_buffer);
376+
UDPARD_ASSERT(available <= CRC_SIZE_BYTES);
377+
const size_t write_size = smaller(CRC_SIZE_BYTES - crc_offset, available);
378+
// NOLINTNEXTLINE(clang-analyzer-security.insecureAPI.DeprecatedOrUnsafeBufferHandling)
379+
(void)memcpy(write_ptr, &crc_bytes[crc_offset], write_size);
380+
offset += write_size;
381+
}
382+
out.count++;
383+
}
384+
UDPARD_ASSERT((offset == payload_size_with_crc) || (out.tail == NULL));
385+
return out;
386+
}
387+
388+
static uint32_t tx_push(udpard_tx_t* const tx,
389+
const udpard_microsecond_t deadline,
390+
const meta_t meta,
391+
const udpard_udpip_ep_t endpoint,
392+
const udpard_bytes_t payload,
393+
void* const user_transfer_reference)
394+
{
395+
UDPARD_ASSERT(tx != NULL);
396+
uint32_t out = 0; // The number of frames enqueued; zero on error (error counters incremented).
397+
const size_t mtu = larger(tx->mtu, UDPARD_MTU_MIN);
398+
const size_t frame_count = ((payload.size + CRC_SIZE_BYTES + mtu) - 1U) / mtu;
399+
if ((tx->queue_size + frame_count) > tx->queue_capacity) {
400+
tx->errors_capacity++;
401+
} else {
402+
const tx_chain_t chain = tx_spool(tx->memory, mtu, deadline, meta, endpoint, payload, user_transfer_reference);
403+
if (chain.tail != NULL) { // Insert every item into the tx indexes.
404+
UDPARD_ASSERT(frame_count == chain.count);
405+
udpard_tx_item_t* next = chain.head;
406+
do {
407+
const udpard_tree_t* res = cavl2_find_or_insert(&tx->index_prio, //
408+
&next->priority,
409+
&tx_cavl_compare_prio,
410+
&next->index_prio,
411+
&cavl2_trivial_factory);
412+
UDPARD_ASSERT(res == &next->index_prio);
413+
res = cavl2_find_or_insert(&tx->index_deadline, //
414+
&next->deadline,
415+
&tx_cavl_compare_deadline,
416+
&next->index_deadline,
417+
&cavl2_trivial_factory);
418+
UDPARD_ASSERT(res == &next->index_deadline);
419+
(void)res;
420+
next = next->next_in_transfer;
421+
} while (next != NULL);
422+
tx->queue_size += chain.count;
423+
UDPARD_ASSERT(tx->queue_size <= tx->queue_capacity);
424+
UDPARD_ASSERT((chain.count + 0ULL) <= INT32_MAX); // +0 is to suppress warning.
425+
out = (uint32_t)chain.count;
426+
} else { // The queue is large enough but we ran out of heap memory, so we have to unwind the chain.
427+
tx->errors_oom++;
428+
udpard_tx_item_t* head = chain.head;
429+
while (head != NULL) {
430+
udpard_tx_item_t* const next = head->next_in_transfer;
431+
udpard_tx_free(tx->memory, head);
432+
head = next;
433+
}
434+
}
435+
}
436+
return out;
437+
}
438+
439+
bool udpard_tx_new(udpard_tx_t* const self,
440+
const uint64_t local_uid,
441+
const size_t queue_capacity,
442+
const udpard_tx_mem_resources_t memory)
443+
{
444+
const bool ok = (NULL != self) && (local_uid != 0) && tx_validate_mem_resources(memory);
445+
if (ok) {
446+
mem_zero(sizeof(*self), self);
447+
self->local_uid = local_uid;
448+
self->queue_capacity = queue_capacity;
449+
self->mtu = UDPARD_MTU_DEFAULT;
450+
self->memory = memory;
451+
self->queue_size = 0;
452+
self->index_prio = NULL;
453+
self->index_deadline = NULL;
454+
}
455+
return ok;
456+
}
457+
458+
uint32_t udpard_tx_publish(udpard_tx_t* const self,
459+
const udpard_microsecond_t now,
460+
const udpard_microsecond_t deadline,
461+
const udpard_prio_t priority,
462+
const uint64_t topic_hash,
463+
const uint32_t subject_id,
464+
const uint64_t transfer_id,
465+
const udpard_bytes_t payload,
466+
void* const user_transfer_reference)
467+
{
468+
(void)self;
469+
(void)now;
470+
(void)deadline;
471+
(void)priority;
472+
(void)topic_hash;
473+
(void)subject_id;
474+
(void)transfer_id;
475+
(void)payload;
476+
(void)user_transfer_reference;
477+
(void)tx_push;
478+
return 0;
479+
}
480+
481+
uint32_t udpard_tx_p2p(udpard_tx_t* const self,
482+
const udpard_microsecond_t now,
483+
const udpard_microsecond_t deadline,
484+
const uint64_t remote_uid,
485+
const udpard_udpip_ep_t remote_ep,
486+
const udpard_prio_t priority,
487+
const uint64_t transfer_id,
488+
const udpard_bytes_t payload,
489+
void* const user_transfer_reference)
490+
{
491+
(void)self;
492+
(void)now;
493+
(void)deadline;
494+
(void)priority;
495+
(void)remote_uid;
496+
(void)remote_ep;
497+
(void)transfer_id;
498+
(void)payload;
499+
(void)user_transfer_reference;
500+
(void)tx_push;
501+
return 0;
502+
}
503+
504+
udpard_tx_item_t* udpard_tx_peek(const udpard_tx_t* const self, const udpard_microsecond_t now)
505+
{
506+
(void)self;
507+
(void)now;
508+
return NULL;
509+
}
510+
511+
void udpard_tx_pop(const udpard_tx_t* const self, udpard_tx_item_t* const item)
512+
{
513+
(void)self;
514+
(void)item;
515+
}
516+
517+
void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* const item)
518+
{
519+
(void)memory;
520+
(void)item;
521+
}
522+
523+
// --------------------------------------------- RX PIPELINE ---------------------------------------------

libudpard/udpard.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ extern "C"
126126
/// To guarantee a single frame transfer, the maximum payload size shall be 4 bytes less to accommodate the CRC.
127127
#define UDPARD_MTU_DEFAULT_MAX_SINGLE_FRAME (UDPARD_MTU_DEFAULT - 4U)
128128

129+
/// MTU less than this is not supported and should not be used.
130+
#define UDPARD_MTU_MIN 460U
131+
129132
#define UDPARD_PRIORITY_MAX 7U
130133

131134
/// The library supports at most this many redundant network interfaces per Cyphal node.

0 commit comments

Comments
 (0)