Skip to content

Commit a2b0c6d

Browse files
rx basics
1 parent dccdc8b commit a2b0c6d

File tree

2 files changed

+100
-6
lines changed

2 files changed

+100
-6
lines changed

libudpard/udpard.c

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,17 @@
3636

3737
typedef unsigned char byte_t; ///< For compatibility with platforms where byte size is not 8 bits.
3838

39-
#define RX_SLOT_COUNT (UDPARD_PRIORITY_MAX + 1U)
40-
#define BIG_BANG INT64_MIN
39+
#define BIG_BANG INT64_MIN
4140

4241
#define KILO 1000L
4342
#define MEGA 1000000LL
4443

4544
/// Sessions will be garbage-collected after being idle for this long, along with unfinished transfers, if any.
4645
#define SESSION_LIFETIME (60 * MEGA)
4746

47+
/// The maximum number of incoming transfers that can be in the state of incomplete reassembly simultaneously.
48+
#define RX_SLOT_COUNT (UDPARD_PRIORITY_MAX + 1U)
49+
4850
#define UDP_PORT 9382U
4951
#define IPv4_MCAST_PREFIX 0xEF000000UL
5052
#define IPv4_MCAST_SUFFIX_MASK 0x007FFFFFUL
@@ -574,3 +576,85 @@ void udpard_tx_free(const udpard_tx_mem_resources_t memory, udpard_tx_item_t* co
574576
}
575577

576578
// --------------------------------------------- RX PIPELINE ---------------------------------------------
579+
580+
/// All but the transfer metadata: fields that change from frame to frame within the same transfer.
581+
typedef struct
582+
{
583+
uint32_t index;
584+
uint32_t offset; ///< Offset of this fragment's payload within the full transfer payload.
585+
bool end_of_transfer;
586+
udpard_bytes_t payload; ///< Also contains the transfer CRC (but not the header CRC).
587+
udpard_bytes_mut_t origin; ///< The entirety of the free-able buffer passed from the application.
588+
} rx_frame_base_t;
589+
590+
/// Full frame state.
591+
typedef struct
592+
{
593+
rx_frame_base_t base;
594+
meta_t meta;
595+
} rx_frame_t;
596+
597+
static bool rx_validate_memory_resources(const udpard_rx_memory_resources_t memory)
598+
{
599+
return (memory.session.alloc != NULL) && (memory.session.free != NULL) && //
600+
(memory.fragment.alloc != NULL) && (memory.fragment.free != NULL);
601+
}
602+
603+
/// This is designed to be convertible to/from UdpardFragment, so that the application could be
604+
/// given a linked list of these objects represented as a list of UdpardFragment.
605+
typedef struct rx_fragment_t
606+
{
607+
udpard_fragment_t base;
608+
udpard_tree_t tree;
609+
} rx_fragment_t;
610+
611+
/// Internally, the RX pipeline is arranged as follows:
612+
///
613+
/// - There is one port per subscription or an RPC-service listener. Within the port, there are N sessions,
614+
/// one session per remote node emitting transfers on this port (i.e., on this subject, or sending
615+
/// request/response of this service). Sessions are constructed dynamically in memory provided by
616+
/// UdpardMemoryResource.
617+
///
618+
/// - Per session, there are UDPARD_NETWORK_INTERFACE_COUNT_MAX interface states to support interface redundancy.
619+
///
620+
/// - Per interface, there are RX_SLOT_COUNT slots; a slot keeps the state of a transfer in the process of being
621+
/// reassembled which includes its payload fragments.
622+
///
623+
/// Port -> Session -> Interface -> Slot -> Fragments.
624+
///
625+
/// Consider the following examples, where A,B,C denote distinct multi-frame transfers:
626+
///
627+
/// A0 A1 A2 B0 B1 B2 -- two transfers without OOO frames; both accepted
628+
/// A2 A0 A1 B0 B2 B1 -- two transfers with OOO frames; both accepted
629+
/// A0 A1 B0 A2 B1 B2 -- two transfers with interleaved frames; both accepted (this is why we need 2 slots)
630+
/// B1 A2 A0 C0 B0 A1 C1 -- if we have only 2 slots: B evicted by C; A and C accepted, B dropped
631+
/// B0 A0 A1 C0 B1 A2 C1 -- ditto
632+
/// A0 A1 C0 B0 A2 C1 B1 -- if we have only 2 slots: A evicted by B; B and C accepted, A dropped
633+
///
634+
/// TODO: Early truncation is really easy to add since every frame carries the payload offset from the origin.
635+
typedef struct
636+
{
637+
udpard_microsecond_t ts; ///< Timestamp of the earliest frame; TIMESTAMP_UNSET upon restart.
638+
uint64_t transfer_id; ///< When first constructed, this shall be set to UINT64_MAX (unreachable value).
639+
uint32_t max_index; ///< Maximum observed frame index in this transfer (so far); zero upon restart.
640+
uint32_t eot_index; ///< Frame index where the EOT flag was observed; FRAME_INDEX_UNSET upon restart.
641+
uint32_t accepted_frames; ///< Number of frames accepted so far.
642+
size_t payload_size;
643+
udpard_tree_t* fragments;
644+
} rx_slot_t;
645+
646+
typedef struct
647+
{
648+
udpard_microsecond_t ts_usec; ///< The timestamp of the last valid transfer to arrive on this interface.
649+
rx_slot_t slots[RX_SLOT_COUNT];
650+
} rx_iface_t;
651+
652+
/// Keep in mind that we have a dedicated session object per remote node per port; this means that the states
653+
/// kept here are specific per remote node, as it should be.
654+
typedef struct
655+
{
656+
udpard_tree_t index_remote_uid;
657+
uint64_t remote_uid;
658+
// TODO: a structure for transfer-ID storage and completeness marking.
659+
rx_iface_t ifaces[UDPARD_NETWORK_INTERFACE_COUNT_MAX];
660+
} rx_session_t;

libudpard/udpard.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ typedef struct udpard_fragment_t
236236
/// The application can use this pointer to free the outer buffer after the payload has been consumed.
237237
udpard_bytes_mut_t origin;
238238

239+
/// Zero-based index and byte offset of this fragment view from the beginning of the transfer payload.
240+
size_t offset;
241+
uint32_t index;
242+
239243
/// When the fragment is no longer needed, this deleter shall be used to free the origin buffer.
240244
/// We provide a dedicated deleter per fragment to allow NIC drivers to manage the memory directly,
241245
/// which allows DMA access to the fragment data without copying.
@@ -561,17 +565,23 @@ typedef struct udpard_rx_transfer_t
561565
/// The total size of the payload available to the application, in bytes, is provided for convenience;
562566
/// it is the sum of the sizes of all its fragments. For example, if the sender emitted a transfer of 2000
563567
/// bytes split into two frames, 1408 bytes in the first frame and 592 bytes in the second frame,
564-
/// then the payload_size will be 2000 and the payload buffer will contain two fragments of 1408 and 592 bytes.
565-
/// The transfer CRC is not included here. If the received payload exceeds the configured extent,
566-
/// the excess payload will be discarded and the payload_size will be set to the extent.
568+
/// then the payload_size_stored will be 2000 and the payload buffer will contain two fragments of 1408 and
569+
/// 592 bytes. The transfer CRC is not included here. If the received payload exceeds the configured extent,
570+
/// the excess payload will be discarded and the payload_size_stored will be set to the extent.
567571
///
568572
/// The application is given ownership of the payload buffer, so it is required to free it after use;
569573
/// this requires freeing both the handles and the payload buffers they point to.
570574
/// Beware that different memory resources may have been used to allocate the handles and the payload buffers;
571575
/// the application is responsible for freeing them using the correct memory resource.
572576
///
573577
/// If the payload is empty, the corresponding buffer pointers may be NULL.
574-
size_t payload_size;
578+
size_t payload_size_stored;
579+
580+
/// The original size of the transfer payload before extent-based truncation, in bytes.
581+
/// This value is provided for informational purposes only; the application should not attempt to access
582+
/// the excess payload as it has already been discarded. Cannot be less than payload_size_stored.
583+
size_t payload_size_wire;
584+
575585
udpard_fragment_t payload;
576586
} udpard_rx_transfer_t;
577587

0 commit comments

Comments
 (0)