Skip to content

Commit 166ec7a

Browse files
P2P feedback carries the remote topic hash and transfer-ID
1 parent 2c932de commit 166ec7a

File tree

3 files changed

+75
-63
lines changed

3 files changed

+75
-63
lines changed

libudpard/udpard.c

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,17 @@ static void bytes_scattered_read(bytes_scattered_reader_t* const reader, const s
208208
}
209209
}
210210

211+
static size_t bytes_scattered_size(const udpard_bytes_scattered_t head)
212+
{
213+
size_t size = head.bytes.size;
214+
const udpard_bytes_scattered_t* current = head.next;
215+
while (current != NULL) {
216+
size += current->bytes.size;
217+
current = current->next;
218+
}
219+
return size;
220+
}
221+
211222
/// We require that the fragment tree does not contain fully-contained or equal-range fragments. This implies that no
212223
/// two fragments have the same offset, and that fragments ordered by offset also order by their ends.
213224
static int32_t cavl_compare_fragment_offset(const void* const user, const udpard_tree_t* const node)
@@ -579,6 +590,7 @@ typedef struct tx_transfer_t
579590
udpard_tree_t index_transfer; ///< Specific transfer lookup for ack management. Key: tx_transfer_key_t
580591
udpard_list_member_t queue[UDPARD_IFACE_COUNT_MAX]; ///< Listed when ready for transmission.
581592
udpard_list_member_t agewise; ///< Listed when created; oldest at the tail.
593+
udpard_tree_t index_transfer_ack; ///< Only for acks. Key: tx_transfer_key_t but referencing remote_*.
582594

583595
/// We always keep a pointer to the head, plus a cursor that scans the frames during transmission.
584596
/// Both are NULL if the payload is destroyed.
@@ -592,23 +604,21 @@ typedef struct tx_transfer_t
592604
udpard_us_t staged_until;
593605

594606
/// Constant transfer properties supplied by the client.
607+
/// The remote_* fields are identical to the local ones except in the case of P2P transfers, where
608+
/// they contain the values encoded in the P2P header. This is needed to find pending acks (to minimize duplicates),
609+
/// and to report the correct values via the feedback callback.
610+
/// By default, the remote_* fields equal the local ones.
595611
uint64_t topic_hash;
596612
uint64_t transfer_id;
613+
uint64_t remote_topic_hash;
614+
uint64_t remote_transfer_id;
597615
udpard_us_t deadline;
598616
bool reliable;
599617
udpard_prio_t priority;
600618
udpard_udpip_ep_t destination[UDPARD_IFACE_COUNT_MAX];
601619
void* user_transfer_reference;
602620

603621
void (*feedback)(udpard_tx_t*, udpard_tx_feedback_t);
604-
605-
/// These entities are specific to outgoing acks only. I considered extracting them into a polymorphic
606-
/// tx_transfer_ack_t subtype with a virtual destructor, but it adds a bit more complexity than I would like
607-
/// to tolerate for a gain of only a dozen bytes per transfer object.
608-
/// These are unused for non-ack transfers.
609-
udpard_tree_t index_transfer_remote; ///< Key: tx_transfer_key_t but referencing the remotes.
610-
uint64_t remote_topic_hash;
611-
uint64_t remote_transfer_id;
612622
} tx_transfer_t;
613623

614624
static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
@@ -639,8 +649,8 @@ static void tx_transfer_free_payload(tx_transfer_t* const tr)
639649
static void tx_transfer_retire(udpard_tx_t* const tx, tx_transfer_t* const tr, const bool success)
640650
{
641651
// Construct the feedback object first before the transfer is destroyed.
642-
const udpard_tx_feedback_t fb = { .topic_hash = tr->topic_hash,
643-
.transfer_id = tr->transfer_id,
652+
const udpard_tx_feedback_t fb = { .topic_hash = tr->remote_topic_hash,
653+
.transfer_id = tr->remote_transfer_id,
644654
.user_transfer_reference = tr->user_transfer_reference,
645655
.success = success };
646656
UDPARD_ASSERT(tr->reliable == (tr->feedback != NULL));
@@ -655,7 +665,7 @@ static void tx_transfer_retire(udpard_tx_t* const tx, tx_transfer_t* const tr, c
655665
(void)cavl2_remove_if(&tx->index_staged, &tr->index_staged);
656666
cavl2_remove(&tx->index_deadline, &tr->index_deadline);
657667
cavl2_remove(&tx->index_transfer, &tr->index_transfer);
658-
(void)cavl2_remove_if(&tx->index_transfer_remote, &tr->index_transfer_remote);
668+
(void)cavl2_remove_if(&tx->index_transfer_ack, &tr->index_transfer_ack);
659669

660670
// Free the memory. The payload memory may already be empty depending on where we were invoked from.
661671
tx_transfer_free_payload(tr);
@@ -712,7 +722,7 @@ static int32_t tx_cavl_compare_transfer(const void* const user, const udpard_tre
712722
static int32_t tx_cavl_compare_transfer_remote(const void* const user, const udpard_tree_t* const node)
713723
{
714724
const tx_transfer_key_t* const key = (const tx_transfer_key_t*)user;
715-
const tx_transfer_t* const tr = CAVL2_TO_OWNER(node, tx_transfer_t, index_transfer_remote); // clang-format off
725+
const tx_transfer_t* const tr = CAVL2_TO_OWNER(node, tx_transfer_t, index_transfer_ack); // clang-format off
716726
if (key->topic_hash < tr->remote_topic_hash) { return -1; }
717727
if (key->topic_hash > tr->remote_topic_hash) { return +1; }
718728
if (key->transfer_id < tr->remote_transfer_id) { return -1; }
@@ -894,6 +904,8 @@ static uint32_t tx_push(udpard_tx_t* const tx,
894904
tr->staged_until = now;
895905
tr->topic_hash = meta.topic_hash;
896906
tr->transfer_id = meta.transfer_id;
907+
tr->remote_topic_hash = meta.topic_hash;
908+
tr->remote_transfer_id = meta.transfer_id;
897909
tr->deadline = deadline;
898910
tr->reliable = meta.flag_ack;
899911
tr->priority = meta.priority;
@@ -1001,9 +1013,9 @@ static void tx_send_ack(udpard_rx_t* const rx,
10011013
// Check if an ack for this transfer is already enqueued.
10021014
const tx_transfer_key_t key = { .topic_hash = topic_hash, .transfer_id = transfer_id };
10031015
tx_transfer_t* const prior =
1004-
CAVL2_TO_OWNER(cavl2_find(tx->index_transfer_remote, &key, &tx_cavl_compare_transfer_remote),
1016+
CAVL2_TO_OWNER(cavl2_find(tx->index_transfer_ack, &key, &tx_cavl_compare_transfer_remote),
10051017
tx_transfer_t,
1006-
index_transfer_remote);
1018+
index_transfer_ack);
10071019
const uint32_t prior_ep_mask = (prior != NULL) ? valid_ep_mask(prior->destination) : 0U;
10081020
const uint32_t new_ep_mask = valid_ep_mask(remote.endpoints);
10091021
const bool new_better = (new_ep_mask & (~prior_ep_mask)) != 0U;
@@ -1052,10 +1064,10 @@ static void tx_send_ack(udpard_rx_t* const rx,
10521064
UDPARD_ASSERT(tr != NULL);
10531065
tr->remote_topic_hash = topic_hash;
10541066
tr->remote_transfer_id = transfer_id;
1055-
(void)cavl2_find_or_insert(&tx->index_transfer_remote,
1067+
(void)cavl2_find_or_insert(&tx->index_transfer_ack,
10561068
&key,
10571069
tx_cavl_compare_transfer_remote,
1058-
&tr->index_transfer_remote,
1070+
&tr->index_transfer_ack,
10591071
cavl2_trivial_factory);
10601072
} else {
10611073
rx->errors_ack_tx++;
@@ -1115,23 +1127,15 @@ uint32_t udpard_tx_push(udpard_tx_t* const self,
11151127
((payload.bytes.data != NULL) || (payload.bytes.size == 0U)) &&
11161128
(tx_transfer_find(self, topic_hash, transfer_id) == NULL);
11171129
if (ok) {
1118-
// Before attempting to enqueue a new transfer, we need to update the transmission scheduler.
1119-
// It may release some items from the tx queue, and it may also promote some staged transfers to the queue.
11201130
udpard_tx_poll(self, now, UDPARD_IFACE_MASK_ALL);
1121-
// Compute the total payload size.
1122-
size_t size = payload.bytes.size;
1123-
const udpard_bytes_scattered_t* current = payload.next;
1124-
while (current != NULL) {
1125-
size += current->bytes.size;
1126-
current = current->next;
1131+
const meta_t meta = {
1132+
.priority = priority,
1133+
.flag_ack = feedback != NULL,
1134+
.transfer_payload_size = (uint32_t)bytes_scattered_size(payload),
1135+
.transfer_id = transfer_id,
1136+
.sender_uid = self->local_uid,
1137+
.topic_hash = topic_hash,
11271138
};
1128-
// Enqueue the transfer.
1129-
const meta_t meta = { .priority = priority,
1130-
.flag_ack = feedback != NULL,
1131-
.transfer_payload_size = (uint32_t)size,
1132-
.transfer_id = transfer_id,
1133-
.sender_uid = self->local_uid,
1134-
.topic_hash = topic_hash };
11351139
out = tx_push(self, now, deadline, meta, remote_ep, payload, feedback, user_transfer_reference, NULL);
11361140
}
11371141
return out;
@@ -1148,9 +1152,13 @@ uint32_t udpard_tx_push_p2p(udpard_tx_t* const self,
11481152
void (*const feedback)(udpard_tx_t*, udpard_tx_feedback_t),
11491153
void* const user_transfer_reference)
11501154
{
1151-
uint32_t out = 0;
1152-
if (self != NULL) {
1153-
// Serialize the P2P header.
1155+
uint32_t out = 0;
1156+
const bool ok = (self != NULL) && (deadline >= now) && (now >= 0) && (self->local_uid != 0) &&
1157+
(valid_ep_mask(remote.endpoints) != 0) && (priority <= UDPARD_PRIORITY_MAX) &&
1158+
((payload.bytes.data != NULL) || (payload.bytes.size == 0U));
1159+
if (ok) {
1160+
udpard_tx_poll(self, now, UDPARD_IFACE_MASK_ALL);
1161+
// Serialize the P2P header and prepend it to the payload.
11541162
byte_t header[UDPARD_P2P_HEADER_BYTES];
11551163
byte_t* ptr = header;
11561164
*ptr++ = P2P_KIND_RESPONSE;
@@ -1159,20 +1167,25 @@ uint32_t udpard_tx_push_p2p(udpard_tx_t* const self,
11591167
ptr = serialize_u64(ptr, request_transfer_id);
11601168
UDPARD_ASSERT((ptr - header) == UDPARD_P2P_HEADER_BYTES);
11611169
(void)ptr;
1162-
// Construct the full P2P payload with the header prepended. No copying needed!
1163-
const udpard_bytes_scattered_t headed_payload = { .bytes = { .size = UDPARD_P2P_HEADER_BYTES, .data = header },
1164-
.next = &payload };
1165-
// Enqueue the transfer.
1166-
out = udpard_tx_push(self,
1167-
now,
1168-
deadline,
1169-
priority,
1170-
remote.uid,
1171-
remote.endpoints,
1172-
self->p2p_transfer_id++,
1173-
headed_payload,
1174-
feedback,
1175-
user_transfer_reference);
1170+
const udpard_bytes_scattered_t full_payload = { .bytes = { .size = UDPARD_P2P_HEADER_BYTES, .data = header },
1171+
.next = &payload };
1172+
// Enqueue the transfer, having propagated the scheduler state beforehand.
1173+
const meta_t meta = {
1174+
.priority = priority,
1175+
.flag_ack = feedback != NULL,
1176+
.transfer_payload_size = (uint32_t)bytes_scattered_size(full_payload),
1177+
.transfer_id = self->p2p_transfer_id++,
1178+
.sender_uid = self->local_uid,
1179+
.topic_hash = remote.uid,
1180+
};
1181+
tx_transfer_t* tr = NULL;
1182+
out =
1183+
tx_push(self, now, deadline, meta, remote.endpoints, full_payload, feedback, user_transfer_reference, &tr);
1184+
if (out > 0) {
1185+
UDPARD_ASSERT(tr != NULL);
1186+
tr->remote_topic_hash = request_topic_hash;
1187+
tr->remote_transfer_id = request_transfer_id;
1188+
}
11761189
}
11771190
return out;
11781191
}

libudpard/udpard.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ typedef struct udpard_tx_mem_resources_t
329329
} udpard_tx_mem_resources_t;
330330

331331
/// Outcome notification for a reliable transfer previously scheduled for transmission.
332+
/// For P2P transfers, the topic hash and the transfer-ID values specify which message this is a response to.
332333
typedef struct udpard_tx_feedback_t
333334
{
334335
uint64_t topic_hash;
@@ -427,7 +428,7 @@ struct udpard_tx_t
427428
udpard_tree_t* index_staged;
428429
udpard_tree_t* index_deadline;
429430
udpard_tree_t* index_transfer;
430-
udpard_tree_t* index_transfer_remote;
431+
udpard_tree_t* index_transfer_ack;
431432

432433
/// Opaque pointer for the application use only. Not accessed by the library.
433434
void* user;
@@ -500,7 +501,8 @@ uint32_t udpard_tx_push(udpard_tx_t* const self,
500501

501502
/// This is a specialization of the general push function for P2P transfers.
502503
/// It is used to send P2P responses to messages received from topics; the request_* values shall be taken from
503-
/// the message transfer that is being responded to.
504+
/// the message transfer that is being responded to. The topic_hash and the transfer_id fields of the feedback struct
505+
/// will be set to the request_topic_hash and request_transfer_id values, respectively.
504506
/// P2P transfers are a bit more complex because they carry some additional metadata that is automatically
505507
/// composed/parsed by the library transparently for the application.
506508
/// The size of the serialized payload will include UDPARD_P2P_HEADER_BYTES additional bytes for the P2P header.

tests/src/test_intrusive_tx.c

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,12 @@ static void test_tx_validation_and_free(void)
181181
&tx.index_deadline, &tr->deadline, tx_cavl_compare_deadline, &tr->index_deadline, cavl2_trivial_factory);
182182
(void)cavl2_find_or_insert(
183183
&tx.index_transfer, &key, tx_cavl_compare_transfer, &tr->index_transfer, cavl2_trivial_factory);
184-
(void)cavl2_find_or_insert(&tx.index_transfer_remote,
185-
&key,
186-
tx_cavl_compare_transfer_remote,
187-
&tr->index_transfer_remote,
188-
cavl2_trivial_factory);
184+
(void)cavl2_find_or_insert(
185+
&tx.index_transfer_ack, &key, tx_cavl_compare_transfer_remote, &tr->index_transfer_ack, cavl2_trivial_factory);
189186
enlist_head(&tx.agewise, &tr->agewise);
190187
tx_transfer_retire(&tx, tr, true);
191188
TEST_ASSERT_NULL(tx.index_staged);
192-
TEST_ASSERT_NULL(tx.index_transfer_remote);
189+
TEST_ASSERT_NULL(tx.index_transfer_ack);
193190
instrumented_allocator_reset(&alloc_transfer);
194191
instrumented_allocator_reset(&alloc_payload);
195192
}
@@ -230,16 +227,16 @@ static void test_tx_comparators_and_feedback(void)
230227

231228
// Remote comparator mirrors the above.
232229
tx_transfer_key_t rkey = { .topic_hash = 2, .transfer_id = 1 };
233-
TEST_ASSERT_EQUAL(-1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_remote));
230+
TEST_ASSERT_EQUAL(-1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_ack));
234231
rkey.topic_hash = 5;
235-
TEST_ASSERT_EQUAL(1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_remote));
232+
TEST_ASSERT_EQUAL(1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_ack));
236233
rkey.topic_hash = tr.remote_topic_hash;
237234
rkey.transfer_id = 2;
238-
TEST_ASSERT_EQUAL(-1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_remote));
235+
TEST_ASSERT_EQUAL(-1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_ack));
239236
rkey.transfer_id = 6;
240-
TEST_ASSERT_EQUAL(1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_remote));
237+
TEST_ASSERT_EQUAL(1, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_ack));
241238
rkey.transfer_id = tr.remote_transfer_id;
242-
TEST_ASSERT_EQUAL(0, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_remote));
239+
TEST_ASSERT_EQUAL(0, tx_cavl_compare_transfer_remote(&rkey, &tr.index_transfer_ack));
243240
}
244241

245242
static void test_tx_spool_and_queue_errors(void)
@@ -372,10 +369,10 @@ static void test_tx_ack_and_scheduler(void)
372369
prior.destination[0] = make_ep(3);
373370
prior.remote_topic_hash = 7;
374371
prior.remote_transfer_id = 8;
375-
cavl2_find_or_insert(&tx2.index_transfer_remote,
372+
cavl2_find_or_insert(&tx2.index_transfer_ack,
376373
&(tx_transfer_key_t){ .topic_hash = 7, .transfer_id = 8 },
377374
tx_cavl_compare_transfer_remote,
378-
&prior.index_transfer_remote,
375+
&prior.index_transfer_ack,
379376
cavl2_trivial_factory);
380377
rx.errors_ack_tx = 0;
381378
rx.tx = &tx2;

0 commit comments

Comments
 (0)