Skip to content

Commit a598ee8

Browse files
per-port callbacks
1 parent b92c423 commit a598ee8

File tree

5 files changed

+125
-120
lines changed

5 files changed

+125
-120
lines changed

libudpard/udpard.c

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ typedef unsigned char byte_t; ///< For compatibility with platforms where byte s
7171

7272
#define UDP_PORT 9382U
7373
#define IPv4_MCAST_PREFIX 0xEF000000UL
74+
static_assert((UDPARD_IPv4_SUBJECT_ID_MAX & (UDPARD_IPv4_SUBJECT_ID_MAX + 1)) == 0,
75+
"UDPARD_IPv4_SUBJECT_ID_MAX must be one less than a power of 2");
7476

7577
static size_t smaller(const size_t a, const size_t b) { return (a < b) ? a : b; }
7678
static size_t larger(const size_t a, const size_t b) { return (a > b) ? a : b; }
@@ -129,8 +131,6 @@ bool udpard_is_valid_endpoint(const udpard_udpip_ep_t ep)
129131

130132
udpard_udpip_ep_t udpard_make_subject_endpoint(const uint32_t subject_id)
131133
{
132-
static_assert((UDPARD_IPv4_SUBJECT_ID_MAX & (UDPARD_IPv4_SUBJECT_ID_MAX + 1)) == 0,
133-
"UDPARD_IPv4_SUBJECT_ID_MAX must be one less than a power of 2");
134134
return (udpard_udpip_ep_t){ .ip = IPv4_MCAST_PREFIX | (subject_id & UDPARD_IPv4_SUBJECT_ID_MAX), .port = UDP_PORT };
135135
}
136136

@@ -1091,7 +1091,7 @@ typedef struct
10911091
udpard_tree_t index_remote_uid; ///< Must be the first member.
10921092
udpard_remote_t remote; ///< Most recent discovered reverse path for P2P to the sender.
10931093

1094-
udpard_rx_port_t* owner;
1094+
udpard_rx_port_t* port;
10951095

10961096
/// Sessions interned for the reordering window closure.
10971097
udpard_tree_t index_reordering_window;
@@ -1160,8 +1160,7 @@ static void rx_session_on_ack_mandate(const rx_session_t* const self,
11601160
.remote = self->remote, .priority = priority, .transfer_id = transfer_id, .payload_head = payload_head
11611161
};
11621162
UDPARD_ASSERT(payload_head.data != NULL || payload_head.size == 0U);
1163-
UDPARD_ASSERT(rx->on_ack_mandate != NULL);
1164-
rx->on_ack_mandate(rx, self->owner, mandate);
1163+
self->port->vtable->on_ack_mandate(rx, self->port, mandate);
11651164
}
11661165

11671166
static int32_t cavl_compare_rx_session_by_remote_uid(const void* const user, const udpard_tree_t* const node)
@@ -1203,7 +1202,7 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
12031202
rx_slot_reset(&out->slots[i], args->owner->memory.fragment);
12041203
}
12051204
out->remote.uid = args->remote_uid;
1206-
out->owner = args->owner;
1205+
out->port = args->owner;
12071206
out->last_animated_ts = args->now;
12081207
out->history_current = 0;
12091208
out->initialized = false;
@@ -1218,14 +1217,14 @@ static void rx_session_free(rx_session_t* const self,
12181217
udpard_tree_t** const sessions_by_reordering)
12191218
{
12201219
for (size_t i = 0; i < RX_SLOT_COUNT; i++) {
1221-
rx_slot_reset(&self->slots[i], self->owner->memory.fragment);
1220+
rx_slot_reset(&self->slots[i], self->port->memory.fragment);
12221221
}
1223-
cavl2_remove(&self->owner->index_session_by_remote_uid, &self->index_remote_uid);
1222+
cavl2_remove(&self->port->index_session_by_remote_uid, &self->index_remote_uid);
12241223
if (cavl2_is_inserted(*sessions_by_reordering, &self->index_reordering_window)) {
12251224
cavl2_remove(sessions_by_reordering, &self->index_reordering_window);
12261225
}
12271226
delist(sessions_by_animation, &self->list_by_animation);
1228-
mem_free(self->owner->memory.session, sizeof(rx_session_t), self);
1227+
mem_free(self->port->memory.session, sizeof(rx_session_t), self);
12291228
}
12301229

12311230
/// The payload ownership is transferred to the application. The history log and the window will be updated.
@@ -1247,12 +1246,11 @@ static void rx_session_eject(rx_session_t* const self, udpard_rx_t* const rx, rx
12471246
.payload_size_wire = slot->total_size,
12481247
.payload = (udpard_fragment_t*)slot->fragments,
12491248
};
1250-
UDPARD_ASSERT(rx->on_message != NULL);
1251-
rx->on_message(rx, self->owner, transfer);
1249+
self->port->vtable->on_message(rx, self->port, transfer);
12521250

12531251
// Finally, reset the slot.
12541252
slot->fragments = NULL; // Transfer ownership to the application.
1255-
rx_slot_reset(slot, self->owner->memory.fragment);
1253+
rx_slot_reset(slot, self->port->memory.fragment);
12561254
}
12571255

12581256
/// In the ORDERED mode, checks which slots can be ejected or interned in the reordering window.
@@ -1291,13 +1289,13 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
12911289
// The reordering window timeout implies that earlier transfers will be dropped if ORDERED mode is used.
12921290
const bool eject =
12931291
(slot != NULL) && ((slot->transfer_id == tid_expected) ||
1294-
(ts >= (slot->ts_min + self->owner->reordering_window)) || (force_one && (iter == 0)));
1292+
(ts >= (slot->ts_min + self->port->reordering_window)) || (force_one && (iter == 0)));
12951293
if (!eject) {
12961294
// The slot is done but cannot be ejected yet; arm the reordering window timer.
12971295
// There may be transfers with future (more distant) transfer-IDs with an earlier reordering window
12981296
// closure deadline, but we ignore them because the nearest transfer overrides the more distant ones.
12991297
if (slot != NULL) {
1300-
self->reordering_window_deadline = slot->ts_min + self->owner->reordering_window;
1298+
self->reordering_window_deadline = slot->ts_min + self->port->reordering_window;
13011299
const udpard_tree_t* res = cavl2_find_or_insert(&rx->index_session_by_reordering, //-------------
13021300
&self->reordering_window_deadline,
13031301
&cavl_compare_rx_session_by_reordering_deadline,
@@ -1319,7 +1317,7 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
13191317
for (size_t i = 0; i < RX_SLOT_COUNT; i++) {
13201318
rx_slot_t* const slot = &self->slots[i];
13211319
if ((slot->state == rx_slot_busy) && rx_session_is_transfer_late_or_ejected(self, slot->transfer_id)) {
1322-
rx_slot_reset(slot, self->owner->memory.fragment);
1320+
rx_slot_reset(slot, self->port->memory.fragment);
13231321
}
13241322
}
13251323
}
@@ -1341,7 +1339,7 @@ static rx_slot_t* rx_session_get_slot(rx_session_t* const self,
13411339
// Use this opportunity to check for timed-out in-progress slots. This may free up a slot for the search below.
13421340
for (size_t i = 0; i < RX_SLOT_COUNT; i++) {
13431341
if ((self->slots[i].state == rx_slot_busy) && (ts >= (self->slots[i].ts_max + SESSION_LIFETIME))) {
1344-
rx_slot_reset(&self->slots[i], self->owner->memory.fragment);
1342+
rx_slot_reset(&self->slots[i], self->port->memory.fragment);
13451343
}
13461344
}
13471345
// This appears to be a new transfer, so we will need to allocate a new slot for it.
@@ -1364,7 +1362,7 @@ static rx_slot_t* rx_session_get_slot(rx_session_t* const self,
13641362
// If it's busy, it is probably just a stale transfer, so it's a no-brainer to evict it.
13651363
// If it's done, we have to force the reordering window to close early to free up a slot without transfer loss.
13661364
if (slot->state == rx_slot_busy) {
1367-
rx_slot_reset(slot, self->owner->memory.fragment); // Just a stale transfer, it's probably dead anyway.
1365+
rx_slot_reset(slot, self->port->memory.fragment); // Just a stale transfer, it's probably dead anyway.
13681366
} else {
13691367
UDPARD_ASSERT(slot->state == rx_slot_done);
13701368
// The oldest slot is DONE; we cannot just reset it, we must force an early ejection.
@@ -1402,10 +1400,10 @@ static void rx_session_update_ordered(rx_session_t* const self,
14021400
((slot->state == rx_slot_busy) && (slot->transfer_id == frame.meta.transfer_id)));
14031401
rx_slot_update(slot,
14041402
ts,
1405-
self->owner->memory.fragment,
1403+
self->port->memory.fragment,
14061404
payload_deleter,
14071405
frame,
1408-
self->owner->extent,
1406+
self->port->extent,
14091407
&rx->errors_oom,
14101408
&rx->errors_transfer_malformed);
14111409
if (slot->state == rx_slot_done) {
@@ -1434,7 +1432,7 @@ static void rx_session_update_unordered(rx_session_t* const self,
14341432
const rx_frame_t frame,
14351433
const udpard_mem_deleter_t payload_deleter)
14361434
{
1437-
UDPARD_ASSERT(self->owner->reordering_window < 0);
1435+
UDPARD_ASSERT(self->port->reordering_window < 0);
14381436
// We do not check interned transfers because in the UNORDERED mode they are never interned, always ejected ASAP.
14391437
// We don't care about the ordering, either; we just accept anything that looks new.
14401438
if (!rx_session_is_transfer_ejected(self, frame.meta.transfer_id)) {
@@ -1444,10 +1442,10 @@ static void rx_session_update_unordered(rx_session_t* const self,
14441442
((slot->state == rx_slot_busy) && (slot->transfer_id == frame.meta.transfer_id)));
14451443
rx_slot_update(slot,
14461444
ts,
1447-
self->owner->memory.fragment,
1445+
self->port->memory.fragment,
14481446
payload_deleter,
14491447
frame,
1450-
self->owner->extent,
1448+
self->port->extent,
14511449
&rx->errors_oom,
14521450
&rx->errors_transfer_malformed);
14531451
if (slot->state == rx_slot_done) {
@@ -1475,7 +1473,7 @@ static void rx_session_update(rx_session_t* const self,
14751473
const uint_fast8_t ifindex)
14761474
{
14771475
UDPARD_ASSERT(self->remote.uid == frame.meta.sender_uid);
1478-
UDPARD_ASSERT(frame.meta.topic_hash == self->owner->topic_hash); // must be checked by the caller beforehand
1476+
UDPARD_ASSERT(frame.meta.topic_hash == self->port->topic_hash); // must be checked by the caller beforehand
14791477

14801478
// Animate the session to prevent it from being retired.
14811479
enlist_head(&rx->list_session_by_animation, &self->list_by_animation);
@@ -1497,7 +1495,7 @@ static void rx_session_update(rx_session_t* const self,
14971495
}
14981496

14991497
// Accept the frame depending on the selected reassembly mode.
1500-
const bool ordered = self->owner->reordering_window >= 0;
1498+
const bool ordered = self->port->reordering_window >= 0;
15011499
(ordered ? rx_session_update_ordered : rx_session_update_unordered)(self, rx, ts, frame, payload_deleter);
15021500
}
15031501

@@ -1509,19 +1507,13 @@ static bool rx_validate_mem_resources(const udpard_rx_mem_resources_t memory)
15091507
(memory.fragment.alloc != NULL) && (memory.fragment.free != NULL);
15101508
}
15111509

1512-
bool udpard_rx_new(udpard_rx_t* const self,
1513-
const udpard_rx_on_message_t on_message,
1514-
const udpard_rx_on_collision_t on_collision,
1515-
const udpard_rx_on_ack_mandate_t on_ack_mandate)
1510+
bool udpard_rx_new(udpard_rx_t* const self)
15161511
{
1517-
const bool ok = (self != NULL) && (on_message != NULL) && (on_collision != NULL) && (on_ack_mandate != NULL);
1512+
const bool ok = (self != NULL);
15181513
if (ok) {
15191514
mem_zero(sizeof(*self), self);
15201515
self->list_session_by_animation = (udpard_list_t){ NULL, NULL };
15211516
self->index_session_by_reordering = NULL;
1522-
self->on_message = on_message;
1523-
self->on_collision = on_collision;
1524-
self->on_ack_mandate = on_ack_mandate;
15251517
self->errors_oom = 0;
15261518
self->errors_frame_malformed = 0;
15271519
self->errors_transfer_malformed = 0;
@@ -1552,23 +1544,26 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
15521544
}
15531545
}
15541546

1555-
bool udpard_rx_port_new(udpard_rx_port_t* const self,
1556-
const uint64_t topic_hash,
1557-
const size_t extent,
1558-
const udpard_us_t reordering_window,
1559-
const udpard_rx_mem_resources_t memory)
1547+
bool udpard_rx_port_new(udpard_rx_port_t* const self,
1548+
const uint64_t topic_hash,
1549+
const size_t extent,
1550+
const udpard_us_t reordering_window,
1551+
const udpard_rx_mem_resources_t memory,
1552+
const udpard_rx_port_vtable_t* const vtable)
15601553
{
15611554
const bool win_ok = (reordering_window >= 0) || //
15621555
(reordering_window == UDPARD_RX_REORDERING_WINDOW_UNORDERED) ||
15631556
(reordering_window == UDPARD_RX_REORDERING_WINDOW_STATELESS);
1564-
const bool ok = (self != NULL) && rx_validate_mem_resources(memory) && win_ok;
1557+
const bool ok = (self != NULL) && rx_validate_mem_resources(memory) && win_ok && (vtable != NULL) &&
1558+
(vtable->on_message != NULL) && (vtable->on_ack_mandate != NULL) && (vtable->on_collision != NULL);
15651559
if (ok) {
15661560
mem_zero(sizeof(*self), self);
15671561
self->topic_hash = topic_hash;
15681562
self->extent = extent;
15691563
self->reordering_window = reordering_window;
15701564
self->memory = memory;
15711565
self->index_session_by_remote_uid = NULL;
1566+
self->vtable = vtable;
15721567
self->user = NULL;
15731568
}
15741569
return ok;
@@ -1643,7 +1638,7 @@ static void rx_port_accept_stateless(udpard_rx_t* const rx,
16431638
.payload_size_wire = frame.meta.transfer_payload_size,
16441639
.payload = frag,
16451640
};
1646-
rx->on_message(rx, port, transfer);
1641+
port->vtable->on_message(rx, port, transfer);
16471642
} else {
16481643
mem_free_payload(payload_deleter, frame.base.origin);
16491644
++rx->errors_oom;
@@ -1685,7 +1680,7 @@ bool udpard_rx_port_push(udpard_rx_t* const rx,
16851680
mem_free_payload(payload_deleter, frame.base.origin);
16861681
udpard_remote_t remote = { .uid = frame.meta.sender_uid };
16871682
remote.endpoints[redundant_iface_index] = source_ep;
1688-
rx->on_collision(rx, port, remote);
1683+
port->vtable->on_collision(rx, port, remote);
16891684
}
16901685
} else {
16911686
mem_free_payload(payload_deleter, frame.base.origin);

0 commit comments

Comments
 (0)