Skip to content

Commit 2480c38

Browse files
advance
1 parent f6ba55a commit 2480c38

File tree

3 files changed

+87
-87
lines changed

3 files changed

+87
-87
lines changed

libudpard/udpard.c

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,22 +1143,18 @@ static void rx_session_on_ack_mandate(const rx_session_t* const self,
11431143
const udpard_bytes_t payload_head)
11441144
{
11451145
UDPARD_ASSERT(self->owner->invoked);
1146-
udpard_rx_subscription_t* const subscription =
1147-
(self->owner == &rx->p2p_port) ? NULL : (udpard_rx_subscription_t*)self->owner;
11481146
const udpard_rx_ack_mandate_t mandate = {
11491147
.remote = self->remote, .priority = priority, .transfer_id = transfer_id, .payload_head = payload_head
11501148
};
11511149
UDPARD_ASSERT(payload_head.data != NULL || payload_head.size == 0U);
11521150
UDPARD_ASSERT(rx->on_ack_mandate != NULL);
1153-
rx->on_ack_mandate(rx, subscription, mandate);
1151+
rx->on_ack_mandate(rx, self->owner, mandate);
11541152
}
11551153

11561154
/// The payload ownership is transferred to the application.
11571155
static void rx_session_on_message(const rx_session_t* const self, udpard_rx_t* const rx, rx_slot_t* const slot)
11581156
{
11591157
UDPARD_ASSERT(self->owner->invoked);
1160-
udpard_rx_subscription_t* const subscription =
1161-
(self->owner == &rx->p2p_port) ? NULL : (udpard_rx_subscription_t*)self->owner;
11621158
const udpard_rx_transfer_t transfer = {
11631159
.timestamp = slot->ts_min,
11641160
.priority = slot->priority,
@@ -1171,7 +1167,7 @@ static void rx_session_on_message(const rx_session_t* const self, udpard_rx_t* c
11711167
};
11721168
slot->fragments = NULL; // Transfer ownership to the application.
11731169
UDPARD_ASSERT(rx->on_message != NULL);
1174-
rx->on_message(rx, subscription, transfer);
1170+
rx->on_message(rx, self->owner, transfer);
11751171
}
11761172

11771173
static int32_t cavl_compare_rx_session_remote_uid(const void* const user, const udpard_tree_t* const node)
@@ -1224,9 +1220,9 @@ static rx_session_t* rx_session_new(udpard_rx_port_t* const owner,
12241220
}
12251221

12261222
/// Removes the instance from all indexes and frees all associated memory.
1227-
static void rx_session_del(rx_session_t* const self,
1228-
udpard_list_t* const sessions_by_animation,
1229-
udpard_tree_t** const sessions_by_reordering)
1223+
static void rx_session_free(rx_session_t* const self,
1224+
udpard_list_t* const sessions_by_animation,
1225+
udpard_tree_t** const sessions_by_reordering)
12301226
{
12311227
for (size_t i = 0; i < RX_SLOT_COUNT; i++) {
12321228
rx_slot_reset(&self->slots[i], self->owner->memory.fragment);
@@ -1523,10 +1519,6 @@ static void rx_session_update(rx_session_t* const self,
15231519
(ordered ? rx_session_update_ordered : rx_session_update_unordered)(self, rx, ts, frame, payload_deleter);
15241520
}
15251521

1526-
// --------------------------------------------- RX PORT ---------------------------------------------
1527-
1528-
// TODO
1529-
15301522
// --------------------------------------------- RX PUBLIC API ---------------------------------------------
15311523

15321524
static bool rx_validate_memory_resources(const udpard_rx_memory_resources_t memory)
@@ -1542,18 +1534,10 @@ bool udpard_rx_new(udpard_rx_t* const self,
15421534
const udpard_rx_on_collision_t on_collision,
15431535
const udpard_rx_on_ack_mandate_t on_ack_mandate)
15441536
{
1545-
const bool ok = (self != NULL) && (local_uid > 0) && rx_validate_memory_resources(p2p_port_memory) &&
1546-
(on_message != NULL) && (on_collision != NULL) && (on_ack_mandate != NULL);
1537+
bool ok = (self != NULL) && (local_uid > 0) && rx_validate_memory_resources(p2p_port_memory) &&
1538+
(on_message != NULL) && (on_collision != NULL) && (on_ack_mandate != NULL);
15471539
if (ok) {
15481540
mem_zero(sizeof(*self), self);
1549-
self->p2p_port = (udpard_rx_port_t){
1550-
.topic_hash = local_uid,
1551-
.extent = SIZE_MAX,
1552-
.reordering_window = UDPARD_REORDERING_WINDOW_UNORDERED,
1553-
.memory = p2p_port_memory,
1554-
.index_session_by_remote_uid = NULL,
1555-
.invoked = false,
1556-
};
15571541
self->list_session_by_animation = (udpard_list_t){ NULL, NULL };
15581542
self->index_session_by_reordering = NULL;
15591543
self->on_message = on_message;
@@ -1563,6 +1547,8 @@ bool udpard_rx_new(udpard_rx_t* const self,
15631547
self->errors_frame_malformed = 0;
15641548
self->errors_transfer_malformed = 0;
15651549
self->user = NULL;
1550+
ok =
1551+
udpard_rx_port_new(&self->p2p_port, local_uid, SIZE_MAX, UDPARD_REORDERING_WINDOW_UNORDERED, p2p_port_memory);
15661552
}
15671553
return ok;
15681554
}
@@ -1576,7 +1562,7 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
15761562
{
15771563
rx_session_t* const ses = LIST_TAIL(self->list_session_by_animation, rx_session_t, list_by_animation);
15781564
if ((ses != NULL) && (now >= (ses->last_animated_ts + SESSION_LIFETIME))) {
1579-
rx_session_del(ses, &self->list_session_by_animation, &self->index_session_by_reordering);
1565+
rx_session_free(ses, &self->list_session_by_animation, &self->index_session_by_reordering);
15801566
}
15811567
}
15821568
// Process reordering window timeouts.
@@ -1591,3 +1577,36 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
15911577
}
15921578
self->p2p_port.invoked = false;
15931579
}
1580+
1581+
bool udpard_rx_port_new(udpard_rx_port_t* const self,
1582+
const uint64_t topic_hash,
1583+
const size_t extent,
1584+
const udpard_us_t reordering_window,
1585+
const udpard_rx_memory_resources_t memory)
1586+
{
1587+
const bool win_ok = (reordering_window >= 0) || //
1588+
(reordering_window == UDPARD_REORDERING_WINDOW_UNORDERED) ||
1589+
(reordering_window == UDPARD_REORDERING_WINDOW_STATELESS);
1590+
const bool ok = (self != NULL) && rx_validate_memory_resources(memory) && win_ok;
1591+
if (ok) {
1592+
mem_zero(sizeof(*self), self);
1593+
self->topic_hash = topic_hash;
1594+
self->extent = extent;
1595+
self->reordering_window = reordering_window;
1596+
self->memory = memory;
1597+
self->index_session_by_remote_uid = NULL;
1598+
self->invoked = false;
1599+
}
1600+
return ok;
1601+
}
1602+
1603+
void udpard_rx_port_free(udpard_rx_t* const rx, udpard_rx_port_t* const port)
1604+
{
1605+
if ((rx != NULL) && (port != NULL)) {
1606+
while (port->index_session_by_remote_uid != NULL) {
1607+
rx_session_free((rx_session_t*)(void*)port->index_session_by_remote_uid,
1608+
&rx->list_session_by_animation,
1609+
&rx->index_session_by_reordering);
1610+
}
1611+
}
1612+
}

libudpard/udpard.h

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,8 @@ typedef struct udpard_rx_memory_resources_t
568568
/// This type represents an open input port, such as a subscription to a topic.
569569
typedef struct udpard_rx_port_t
570570
{
571-
uint64_t topic_hash; ///< Mismatch will be filtered out.
571+
/// Mismatch will be filtered out and the collision notification callback invoked.
572+
uint64_t topic_hash;
572573

573574
/// Transfer payloads exceeding this extent may be truncated.
574575
/// The total size of the received payload may still exceed this extent setting by some small margin.
@@ -619,17 +620,6 @@ typedef struct udpard_rx_port_t
619620
bool invoked;
620621
} udpard_rx_port_t;
621622

622-
typedef struct udpard_rx_subscription_t
623-
{
624-
udpard_rx_port_t base; ///< Always the first member to ensure pointer equivalence.
625-
626-
uint32_t subject_id;
627-
628-
/// The IP multicast group address and the UDP port number where UDP/IP datagrams matching this Cyphal
629-
/// subject will be sent by the publishers (remote nodes). READ-ONLY
630-
udpard_udpip_ep_t mcast_ep;
631-
} udpard_rx_subscription_t;
632-
633623
/// Represents a received Cyphal transfer.
634624
/// The payload is owned by this instance, so the application must free it after use; see udpardRxTransferFree.
635625
typedef struct udpard_rx_transfer_t
@@ -675,7 +665,6 @@ typedef struct udpard_rx_ack_mandate_t
675665
udpard_prio_t priority;
676666
uint64_t transfer_id;
677667
udpard_remote_t remote;
678-
679668
/// View of the first <=MTU bytes of the transfer payload that is being confirmed.
680669
/// Valid until return from the callback.
681670
udpard_bytes_t payload_head;
@@ -684,16 +673,17 @@ typedef struct udpard_rx_ack_mandate_t
684673
struct udpard_rx_t;
685674

686675
/// A new message is received from a topic, or a P2P message is received.
687-
/// The subscription is NULL for P2P transfers.
688676
/// The handler takes ownership of the payload; it must free it after use.
689-
typedef void (*udpard_rx_on_message_t)(struct udpard_rx_t*, udpard_rx_subscription_t*, udpard_rx_transfer_t);
677+
/// For P2P transfers, the p2p_port of udpard_rx_t is passed as the port argument.
678+
typedef void (*udpard_rx_on_message_t)(struct udpard_rx_t*, udpard_rx_port_t*, udpard_rx_transfer_t);
690679

691680
/// A topic hash collision is detected on a topic.
692-
typedef void (*udpard_rx_on_collision_t)(struct udpard_rx_t*, udpard_rx_subscription_t*, udpard_remote_t);
681+
/// For P2P transfers, the p2p_port of udpard_rx_t is passed as the port argument.
682+
typedef void (*udpard_rx_on_collision_t)(struct udpard_rx_t*, udpard_rx_port_t*, udpard_remote_t);
693683

694684
/// The application is required to send an acknowledgment back to the sender.
695-
/// The subscription is NULL for P2P transfers.
696-
typedef void (*udpard_rx_on_ack_mandate_t)(struct udpard_rx_t*, udpard_rx_subscription_t*, udpard_rx_ack_mandate_t);
685+
/// For P2P transfers, the p2p_port of udpard_rx_t is passed as the port argument.
686+
typedef void (*udpard_rx_on_ack_mandate_t)(struct udpard_rx_t*, udpard_rx_port_t*, udpard_rx_ack_mandate_t);
697687

698688
typedef struct udpard_rx_t
699689
{
@@ -715,7 +705,10 @@ typedef struct udpard_rx_t
715705

716706
/// The extent of the P2P port is set to SIZE_MAX by default (no truncation at all).
717707
/// The application can alter it via udpard_rx_t::p2p_port.extent at any moment if needed; it takes effect immediately
718-
/// but may in some cases cause in-progress transfers to be lost if increased updated mid-transfer.
708+
/// but may in some cases cause in-progress transfers to be lost if increased mid-transfer.
709+
///
710+
/// To free a udpard_rx_t instance, the application must simply free all its ports using udpard_rx_port_free().
711+
/// The RX instance will be safe to discard afterward.
719712
///
720713
/// True on success, false if any of the arguments are invalid.
721714
bool udpard_rx_new(udpard_rx_t* const self,
@@ -727,7 +720,7 @@ bool udpard_rx_new(udpard_rx_t* const self,
727720

728721
/// Must be invoked at least every few milliseconds (more often is fine) to purge timed-out sessions and eject
729722
/// received transfers when the reordering window expires. If this is invoked simultaneously with rx subscription
730-
/// reception, then this function should be invoked after the reception handling.
723+
/// reception, then this function should ideally be invoked after the reception handling.
731724
/// The time complexity is logarithmic in the number of living sessions.
732725
void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now);
733726

@@ -752,14 +745,17 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now);
752745
///
753746
/// The return value is true on success, false if any of the arguments are invalid.
754747
/// The time complexity is constant. This function does not invoke the dynamic memory manager.
755-
bool udpard_rx_subscription_new(udpard_rx_subscription_t* const self,
756-
const uint32_t subject_id,
757-
const uint64_t topic_hash,
758-
const size_t extent,
759-
udpard_us_t reordering_window,
760-
const udpard_rx_memory_resources_t memory);
761-
762-
void udpard_rx_subscription_free(udpard_rx_subscription_t* const self);
748+
bool udpard_rx_port_new(udpard_rx_port_t* const self,
749+
const uint64_t topic_hash,
750+
const size_t extent,
751+
const udpard_us_t reordering_window,
752+
const udpard_rx_memory_resources_t memory);
753+
754+
/// Returns all memory allocated for the sessions, slots, fragments, etc of the given port.
755+
/// Does not free the port itself and does not alter the RX instance aside from unlinking the port from it.
756+
/// It is safe to invoke this at any time, but the port instance shall not be used again unless re-initialized.
757+
/// The function has no effect if any of the arguments are NULL.
758+
void udpard_rx_port_free(udpard_rx_t* const rx, udpard_rx_port_t* const port);
763759

764760
/// The timestamp value indicates the arrival time of the datagram. Often, naive software timestamping is adequate
765761
/// for these purposes, but some applications may require a greater accuracy (e.g., for time synchronization).
@@ -771,37 +767,22 @@ void udpard_rx_subscription_free(udpard_rx_subscription_t* const self);
771767
/// any of the arguments are invalid; the function returns false in that case and the caller must clean up.
772768
///
773769
/// The function invokes the dynamic memory manager in the following cases only (refer to udpard_rx_port_t):
774-
///
775-
/// 1. A new session state instance is allocated when a new session is initiated.
776-
///
777-
/// 2. A new transfer fragment handle is allocated when a new transfer fragment is accepted.
778-
///
779-
/// 3. Allocated objects may occasionally be deallocated to clean up stale transfers and sessions when publishers
780-
/// disappear. This behavior does not increase the worst case execution time and does not improve the worst
781-
/// case memory consumption, so a deterministic application need not consider this behavior in its resource
782-
/// analysis. This behavior is implemented for the benefit of applications where rigorous characterization is
783-
/// unnecessary.
770+
/// 1. A new session state instance is allocated when a new session is initiated.
771+
/// 2. A new transfer fragment handle is allocated when a new transfer fragment is accepted.
772+
/// 3. Allocated objects may occasionally be deallocated to clean up stale transfers and sessions.
784773
///
785774
/// The time complexity is O(log n + log k) where n is the number of remote notes publishing on this subject (topic),
786775
/// and k is the number of fragments retained in memory for the corresponding in-progress transfer.
787776
/// No data copying takes place.
788777
///
789778
/// Returns true on successful processing, false if any of the arguments are invalid.
790-
bool udpard_rx_subscription_receive(udpard_rx_t* const rx,
791-
udpard_rx_subscription_t* const sub,
792-
const udpard_us_t timestamp,
793-
const udpard_udpip_ep_t source_endpoint,
794-
const udpard_bytes_mut_t datagram_payload,
795-
const udpard_mem_deleter_t payload_deleter,
796-
const uint_fast8_t redundant_iface_index);
797-
798-
/// Like the above but for P2P unicast transfers exchanged between specific nodes.
799-
bool udpard_rx_p2p_receive(udpard_rx_t* const rx,
800-
const udpard_us_t timestamp,
801-
const udpard_udpip_ep_t source_endpoint,
802-
const udpard_bytes_mut_t datagram_payload,
803-
const udpard_mem_deleter_t payload_deleter,
804-
const uint_fast8_t redundant_iface_index);
779+
bool udpard_rx_port_push(udpard_rx_t* const rx,
780+
udpard_rx_port_t* const port,
781+
const udpard_us_t timestamp,
782+
const udpard_udpip_ep_t source_ep,
783+
const udpard_bytes_mut_t datagram_payload,
784+
const udpard_mem_deleter_t payload_deleter,
785+
const uint_fast8_t redundant_iface_index);
805786

806787
#ifdef __cplusplus
807788
}

tests/src/test_intrusive_rx.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,8 +1973,8 @@ static void test_rx_slot_update(void)
19731973

19741974
typedef struct
19751975
{
1976-
udpard_rx_t* rx;
1977-
udpard_rx_subscription_t* sub;
1976+
udpard_rx_t* rx;
1977+
udpard_rx_port_t* port;
19781978
struct
19791979
{
19801980
/// The most recently received transfer is at index #0; older transfers follow.
@@ -1998,39 +1998,39 @@ typedef struct
19981998
} ack_mandate;
19991999
} callback_result_t;
20002000

2001-
static void on_message(udpard_rx_t* const rx, udpard_rx_subscription_t* const sub, const udpard_rx_transfer_t transfer)
2001+
static void on_message(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_transfer_t transfer)
20022002
{
20032003
printf("on_message: ts=%lld transfer_id=%llu payload_size_stored=%zu\n",
20042004
(long long)transfer.timestamp,
20052005
(unsigned long long)transfer.transfer_id,
20062006
transfer.payload_size_stored);
20072007
callback_result_t* const cb_result = (callback_result_t* const)rx->user;
20082008
cb_result->rx = rx;
2009-
cb_result->sub = sub;
2009+
cb_result->port = port;
20102010
for (size_t i = RX_SLOT_COUNT - 1; i > 0; i--) {
20112011
cb_result->message.history[i] = cb_result->message.history[i - 1];
20122012
}
20132013
cb_result->message.history[0] = transfer;
20142014
cb_result->message.count++;
20152015
}
20162016

2017-
static void on_collision(udpard_rx_t* const rx, udpard_rx_subscription_t* const sub, const udpard_remote_t remote)
2017+
static void on_collision(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_remote_t remote)
20182018
{
20192019
callback_result_t* const cb_result = (callback_result_t* const)rx->user;
20202020
cb_result->rx = rx;
2021-
cb_result->sub = sub;
2021+
cb_result->port = port;
20222022
cb_result->collision.remote = remote;
20232023
cb_result->collision.count++;
20242024
}
20252025

2026-
static void on_ack_mandate(udpard_rx_t* const rx, udpard_rx_subscription_t* const sub, const udpard_rx_ack_mandate_t am)
2026+
static void on_ack_mandate(udpard_rx_t* const rx, udpard_rx_port_t* const port, const udpard_rx_ack_mandate_t am)
20272027
{
20282028
printf("on_ack_mandate: transfer_id=%llu payload_head_size=%zu\n",
20292029
(unsigned long long)am.transfer_id,
20302030
am.payload_head.size);
20312031
callback_result_t* const cb_result = (callback_result_t* const)rx->user;
20322032
cb_result->rx = rx;
2033-
cb_result->sub = sub;
2033+
cb_result->port = port;
20342034
cb_result->ack_mandate.am = am;
20352035
cb_result->ack_mandate.count++;
20362036
// Copy the payload head to our storage.
@@ -2112,7 +2112,7 @@ static void test_session_ordered(void)
21122112
// Check the results and free the transfer.
21132113
TEST_ASSERT_EQUAL(1, cb_result.message.count);
21142114
TEST_ASSERT_EQUAL_PTR(&rx, cb_result.rx);
2115-
TEST_ASSERT_EQUAL_PTR(&port, cb_result.sub);
2115+
TEST_ASSERT_EQUAL_PTR(&port, cb_result.port);
21162116
TEST_ASSERT_EQUAL(1000, cb_result.message.history[0].timestamp);
21172117
TEST_ASSERT_EQUAL(udpard_prio_high, cb_result.message.history[0].priority);
21182118
TEST_ASSERT_EQUAL(42, cb_result.message.history[0].transfer_id);
@@ -2708,7 +2708,7 @@ static void test_session_unordered(void)
27082708
// Transfer is ejected immediately in UNORDERED mode.
27092709
TEST_ASSERT_EQUAL(1, cb_result.message.count);
27102710
TEST_ASSERT_EQUAL_PTR(&rx, cb_result.rx);
2711-
TEST_ASSERT_NULL(cb_result.sub); // p2p transfers have NULL subscription
2711+
TEST_ASSERT_EQUAL_PTR(&rx.p2p_port, cb_result.port);
27122712
TEST_ASSERT_EQUAL(1000, cb_result.message.history[0].timestamp);
27132713
TEST_ASSERT_EQUAL(udpard_prio_high, cb_result.message.history[0].priority);
27142714
TEST_ASSERT_EQUAL(100, cb_result.message.history[0].transfer_id);

0 commit comments

Comments
 (0)