Skip to content

Commit 2e3fef0

Browse files
reception WIP -- data structures
1 parent a2b0c6d commit 2e3fef0

File tree

2 files changed

+99
-50
lines changed

2 files changed

+99
-50
lines changed

libudpard/udpard.c

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,52 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
270270
return ok;
271271
}
272272

273+
// --------------------------------------------- LIST CONTAINER ---------------------------------------------
274+
275+
/// No effect if not in the list.
276+
static void delist(udpard_list_t* const list, udpard_list_member_t* const member)
277+
{
278+
if (member->next != NULL) {
279+
member->next->prev = member->prev;
280+
}
281+
if (member->prev != NULL) {
282+
member->prev->next = member->next;
283+
}
284+
if (list->head == member) {
285+
list->head = member->next;
286+
}
287+
if (list->tail == member) {
288+
list->tail = member->prev;
289+
}
290+
member->next = NULL;
291+
member->prev = NULL;
292+
assert((list->head != NULL) == (list->tail != NULL));
293+
}
294+
295+
/// If the item is already in the list, it will be delisted first. Can be used for moving to the front.
296+
static void enlist_head(udpard_list_t* const list, udpard_list_member_t* const member)
297+
{
298+
delist(list, member);
299+
assert((member->next == NULL) && (member->prev == NULL));
300+
assert((list->head != NULL) == (list->tail != NULL));
301+
member->next = list->head;
302+
if (list->head != NULL) {
303+
list->head->prev = member;
304+
}
305+
list->head = member;
306+
if (list->tail == NULL) {
307+
list->tail = member;
308+
}
309+
assert((list->head != NULL) && (list->tail != NULL));
310+
}
311+
312+
#define LIST_MEMBER(ptr, owner_type, owner_field) ((owner_type*)unbias_ptr((ptr), offsetof(owner_type, owner_field)))
313+
static void* unbias_ptr(const void* const ptr, const size_t offset)
314+
{
315+
return (ptr == NULL) ? NULL : (void*)((char*)ptr - offset);
316+
}
317+
#define LIST_TAIL(list, owner_type, owner_field) LIST_MEMBER((list).tail, owner_type, owner_field)
318+
273319
// --------------------------------------------- TX PIPELINE ---------------------------------------------
274320

275321
typedef struct
@@ -600,61 +646,59 @@ static bool rx_validate_memory_resources(const udpard_rx_memory_resources_t memo
600646
(memory.fragment.alloc != NULL) && (memory.fragment.free != NULL);
601647
}
602648

603-
/// This is designed to be convertible to/from UdpardFragment, so that the application could be
604-
/// given a linked list of these objects represented as a list of UdpardFragment.
649+
/// This is designed to be convertible to/from udpard_fragment_t, so that the application could be
650+
/// given a linked list of these objects represented as a list of udpard_fragment_t.
605651
typedef struct rx_fragment_t
606652
{
607653
udpard_fragment_t base;
608654
udpard_tree_t tree;
609655
} rx_fragment_t;
610656

611-
/// Internally, the RX pipeline is arranged as follows:
612-
///
613-
/// - There is one port per subscription or an RPC-service listener. Within the port, there are N sessions,
614-
/// one session per remote node emitting transfers on this port (i.e., on this subject, or sending
615-
/// request/response of this service). Sessions are constructed dynamically in memory provided by
616-
/// UdpardMemoryResource.
617-
///
618-
/// - Per session, there are UDPARD_NETWORK_INTERFACE_COUNT_MAX interface states to support interface redundancy.
619-
///
620-
/// - Per interface, there are RX_SLOT_COUNT slots; a slot keeps the state of a transfer in the process of being
621-
/// reassembled which includes its payload fragments.
622-
///
623-
/// Port -> Session -> Interface -> Slot -> Fragments.
624-
///
625-
/// Consider the following examples, where A,B,C denote distinct multi-frame transfers:
626-
///
627-
/// A0 A1 A2 B0 B1 B2 -- two transfers without OOO frames; both accepted
628-
/// A2 A0 A1 B0 B2 B1 -- two transfers with OOO frames; both accepted
629-
/// A0 A1 B0 A2 B1 B2 -- two transfers with interleaved frames; both accepted (this is why we need 2 slots)
630-
/// B1 A2 A0 C0 B0 A1 C1 -- if we have only 2 slots: B evicted by C; A and C accepted, B dropped
631-
/// B0 A0 A1 C0 B1 A2 C1 -- ditto
632-
/// A0 A1 C0 B0 A2 C1 B1 -- if we have only 2 slots: A evicted by B; B and C accepted, A dropped
633-
///
634-
/// TODO: Early truncation is really easy to add since every frame carries the payload offset from the origin.
635657
typedef struct
636658
{
637-
udpard_microsecond_t ts; ///< Timestamp of the earliest frame; TIMESTAMP_UNSET upon restart.
638-
uint64_t transfer_id; ///< When first constructed, this shall be set to UINT64_MAX (unreachable value).
639-
uint32_t max_index; ///< Maximum observed frame index in this transfer (so far); zero upon restart.
640-
uint32_t eot_index; ///< Frame index where the EOT flag was observed; FRAME_INDEX_UNSET upon restart.
641-
uint32_t accepted_frames; ///< Number of frames accepted so far.
642-
size_t payload_size;
643-
udpard_tree_t* fragments;
644-
} rx_slot_t;
659+
uint32_t max_index; ///< Maximum observed frame index in this transfer (so far); zero upon restart.
660+
uint32_t eot_index; ///< Discovered EOT frame index.
661+
bool eot_discovered;
662+
uint32_t accepted_frames; ///< Number of frames accepted so far.
663+
size_t payload_size_stored; ///< Grows with each accepted fragment (out of order).
664+
uint32_t crc; ///< Out-of-order CRC reconstruction state.
665+
udpard_tree_t* fragments;
666+
} rx_slot_iface_t;
667+
668+
typedef enum
669+
{
670+
rx_slot_idle = 0,
671+
rx_slot_busy = 1,
672+
rx_slot_done = 2,
673+
} rx_slot_state_t;
645674

646675
typedef struct
647676
{
648-
udpard_microsecond_t ts_usec; ///< The timestamp of the last valid transfer to arrive on this interface.
649-
rx_slot_t slots[RX_SLOT_COUNT];
650-
} rx_iface_t;
677+
rx_slot_state_t state;
678+
udpard_microsecond_t ts_discovery; ///< The timestamp of the first frame received for this transfer.
679+
udpard_microsecond_t ts_completion; ///< The timestamp of the final accepted frame for this transfer.
680+
uint64_t transfer_id;
681+
rx_slot_iface_t iface[UDPARD_NETWORK_INTERFACE_COUNT_MAX];
682+
} rx_slot_t;
651683

652684
/// Keep in mind that we have a dedicated session object per remote node per port; this means that the states
653685
/// kept here are specific per remote node, as it should be.
654686
typedef struct
655687
{
656-
udpard_tree_t index_remote_uid;
657-
uint64_t remote_uid;
688+
udpard_tree_t index_remote_uid;
689+
udpard_remote_t remote; ///< Most recent discovered reverse path for P2P to the sender.
690+
691+
udpard_rx_subscription_t* owner;
692+
693+
/// Sessions interned for the reordering window closure.
694+
udpard_tree_t index_reordering_window;
695+
udpard_microsecond_t reordering_window_deadline;
696+
697+
/// LRU last animated list for automatic retirement of stale sessions.
698+
udpard_list_member_t list_by_animation;
699+
udpard_microsecond_t last_animated_ts;
700+
658701
// TODO: a structure for transfer-ID storage and completeness marking.
659-
rx_iface_t ifaces[UDPARD_NETWORK_INTERFACE_COUNT_MAX];
702+
703+
rx_slot_t slots[RX_SLOT_COUNT];
660704
} rx_session_t;

libudpard/udpard.h

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ extern "C"
131131

132132
#define UDPARD_PRIORITY_MAX 7U
133133

134-
/// The library supports at most this many redundant network interfaces per Cyphal node.
134+
/// The library supports at most this many local redundant network interfaces.
135135
#define UDPARD_NETWORK_INTERFACE_COUNT_MAX 3U
136136

137137
typedef int64_t udpard_microsecond_t;
@@ -155,6 +155,17 @@ typedef struct udpard_tree_t
155155
int_fast8_t bf;
156156
} udpard_tree_t;
157157

158+
typedef struct udpard_list_member_t
159+
{
160+
struct udpard_list_member_t* next;
161+
struct udpard_list_member_t* prev;
162+
} udpard_list_member_t;
163+
typedef struct udpard_list_t
164+
{
165+
udpard_list_member_t* head; ///< NULL if list empty
166+
udpard_list_member_t* tail; ///< NULL if list empty
167+
} udpard_list_t;
168+
158169
typedef struct udpard_bytes_mut_t
159170
{
160171
size_t size;
@@ -596,14 +607,11 @@ typedef struct udpard_rx_ack_mandate_t
596607

597608
struct udpard_rx_t;
598609

599-
/// A new message is received on a topic.
610+
/// A new message is received from a topic, or a P2P message is received.
611+
/// The subscription is NULL for P2P transfers.
600612
/// The handler takes ownership of the payload; it must free it after use.
601613
typedef void* (*udpard_rx_on_message_t)(struct udpard_rx_t*, udpard_rx_subscription_t*, udpard_rx_transfer_t);
602614

603-
/// A new peer-to-peer transfer is received.
604-
/// The handler takes ownership of the payload; it must free it after use.
605-
typedef void* (*udpard_rx_on_p2p_t)(struct udpard_rx_t*, udpard_rx_transfer_t*);
606-
607615
/// A topic hash collision is detected on a topic.
608616
typedef void* (*udpard_rx_on_collision_t)(struct udpard_rx_t*, udpard_rx_subscription_t*);
609617

@@ -615,12 +623,10 @@ typedef struct udpard_rx_t
615623
{
616624
udpard_rx_port_t p2p_port; ///< A single port used for accepting all P2P transfers.
617625

618-
udpard_tree_t* index_session_by_expiration; ///< Soonest on the left.
626+
udpard_list_t list_session_by_animation; ///< Oldest at the tail.
619627
udpard_tree_t* index_session_by_reordering; ///< Earliest reordering window closure on the left.
620-
udpard_tree_t* index_remote_by_uid; ///< For P2P remote node return path endpoint discovery.
621628

622629
udpard_rx_on_message_t on_message;
623-
udpard_rx_on_p2p_t on_p2p;
624630
udpard_rx_on_collision_t on_collision;
625631
udpard_rx_on_ack_mandate_t on_ack_mandate;
626632

@@ -634,7 +640,6 @@ bool udpard_rx_new(udpard_rx_t* const self,
634640
const uint64_t local_uid,
635641
const udpard_rx_memory_resources_t memory,
636642
const udpard_rx_on_message_t on_message,
637-
const udpard_rx_on_p2p_t on_p2p,
638643
const udpard_rx_on_collision_t on_collision,
639644
const udpard_rx_on_ack_mandate_t on_ack_mandate);
640645

0 commit comments

Comments
 (0)