Skip to content

Commit ecb5704

Browse files
refcounting
1 parent 98b94bd commit ecb5704

File tree

2 files changed

+51
-21
lines changed

2 files changed

+51
-21
lines changed

libudpard/udpard.c

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -474,12 +474,23 @@ static bool header_deserialize(const udpard_bytes_mut_t dgram_payload,
474474
/// but it might create a bit more memory pressure on average.
475475
typedef struct tx_frame_t
476476
{
477+
size_t refcount; ///< Buffer destroyed when refcount reaches zero.
477478
struct tx_frame_t* next;
478479
byte_t data[];
479480
} tx_frame_t;
480481

481482
static size_t tx_frame_object_size(const size_t mtu) { return sizeof(tx_frame_t) + mtu + HEADER_SIZE_BYTES; }
482483

484+
static udpard_bytes_t tx_frame_view(const tx_frame_t* const frame, const size_t mtu)
485+
{
486+
return (udpard_bytes_t){ .size = mtu + HEADER_SIZE_BYTES, .data = frame->data };
487+
}
488+
489+
static tx_frame_t* tx_frame_from_view(const udpard_bytes_t view)
490+
{
491+
return (tx_frame_t*)unbias_ptr(view.data, offsetof(tx_frame_t, data));
492+
}
493+
483494
typedef struct
484495
{
485496
uint64_t topic_hash;
@@ -536,14 +547,14 @@ static bool tx_validate_mem_resources(const udpard_tx_mem_resources_t memory)
536547
(memory.payload.alloc != NULL) && (memory.payload.free != NULL);
537548
}
538549

539-
static void tx_transfer_free_payload(const udpard_tx_mem_resources_t mem, tx_transfer_t* const tr)
550+
static void tx_transfer_free_payload(udpard_tx_t* const tx, tx_transfer_t* const tr)
540551
{
541552
UDPARD_ASSERT(tr != NULL);
542553
tx_frame_t* frame = tr->head;
543554
while (frame != NULL) {
544555
tx_frame_t* const next = frame->next;
545556
const size_t mtu = (frame->next == NULL) ? tr->mtu_last : tr->mtu;
546-
mem_free(mem.payload, tx_frame_object_size(mtu), frame);
557+
udpard_tx_refcount_dec(tx, tx_frame_view(frame, mtu));
547558
frame = next;
548559
}
549560
tr->head = NULL;
@@ -553,7 +564,7 @@ static void tx_transfer_free_payload(const udpard_tx_mem_resources_t mem, tx_tra
553564
static void tx_transfer_free(udpard_tx_t* const tx, tx_transfer_t* const tr)
554565
{
555566
UDPARD_ASSERT(tr != NULL);
556-
tx_transfer_free_payload(tx->memory, tr);
567+
tx_transfer_free_payload(tx, tr);
557568
// Remove the transfer from all indexes.
558569
delist(&tx->queue[tr->priority], &tr->queue);
559570
if (cavl2_is_inserted(tx->index_staged, &tr->index_staged)) {
@@ -627,6 +638,7 @@ static tx_frame_t* tx_spool(const udpard_tx_mem_resources_t memory,
627638
break;
628639
}
629640
// Populate the frame contents.
641+
tail->refcount = 1;
630642
tail->next = NULL;
631643
const byte_t* const read_ptr = ((const byte_t*)payload.data) + offset;
632644
prefix_crc = crc_add(prefix_crc, progress, read_ptr);
@@ -735,7 +747,7 @@ static void tx_receive_ack(udpard_rx_t* const rx,
735747
.attempts = tr->attempts,
736748
.success = true,
737749
};
738-
tx_transfer_free_payload(tx->memory, tr); // do this early to release memory before callback
750+
tx_transfer_free_payload(tx, tr); // do this early to release memory before callback
739751
tr->feedback(tx, fb);
740752
tx_transfer_free(tx, tr);
741753
}
@@ -861,7 +873,7 @@ static void tx_purge_expired(udpard_tx_t* const self, const udpard_us_t now)
861873
.attempts = tr->attempts,
862874
.success = false,
863875
};
864-
tx_transfer_free_payload(self->memory, tr); // do this early to release memory before callback
876+
tx_transfer_free_payload(self, tr); // do this early to release memory before callback
865877
if (tr->feedback != NULL) {
866878
tr->feedback(self, fb);
867879
}
@@ -914,19 +926,14 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now)
914926
tx_frame_t* const frame_next = frame->next;
915927
const bool last_attempt = tr->deadline <= tr->retry_at;
916928
const bool last_frame = frame_next == NULL; // if not last attempt we will have to rewind to head
917-
const size_t frame_size = last_frame ? tr->mtu_last : tr->mtu;
918-
// Transfer ownership to the application if no further attempts will be made to reduce queue/memory pressure.
919-
const udpard_bytes_mut_t frame_origin = { .size = last_attempt ? tx_frame_object_size(frame_size) : 0U,
920-
.data = last_attempt ? frame : NULL };
921929

922930
// Eject the frame.
923931
const udpard_tx_ejection_t ejection = {
924932
.now = now,
925933
.deadline = tr->deadline,
926934
.dscp = self->dscp_value_per_priority[tr->priority],
927935
.destination = tr->destination,
928-
.datagram_view = { .size = HEADER_SIZE_BYTES + frame_size, .data = frame->data },
929-
.datagram_origin = frame_origin,
936+
.datagram = tx_frame_view(frame, last_frame ? tr->mtu_last : tr->mtu),
930937
.user_transfer_reference = tr->user_transfer_reference,
931938
};
932939
if (!self->vtable->eject(self, ejection)) { // The easy case -- no progress was made at this time;
@@ -937,8 +944,10 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now)
937944
if (last_attempt) { // no need to keep frames that we will no longer use; free early to reduce pressure
938945
UDPARD_ASSERT(tr->head == tr->cursor); // They go together on the last attempt.
939946
tr->head = frame_next;
940-
self->enqueued_frames_count--; // Ownership transferred to the application.
947+
udpard_tx_refcount_dec(self, ejection.datagram);
948+
self->enqueued_frames_count--;
941949
}
950+
tr->cursor = frame_next;
942951

943952
// Finalize the transmission if this was the last frame of the transfer.
944953
if (last_frame) {
@@ -954,8 +963,6 @@ static void tx_eject_pending(udpard_tx_t* const self, const udpard_us_t now)
954963
cavl2_find_or_insert(
955964
&self->index_staged, &tr->retry_at, tx_cavl_compare_staged, &tr->index_staged, cavl2_trivial_factory);
956965
}
957-
} else {
958-
tr->cursor = frame_next;
959966
}
960967
}
961968
}
@@ -969,6 +976,27 @@ void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now)
969976
}
970977
}
971978

979+
void udpard_tx_refcount_inc(udpard_tx_t* const self, const udpard_bytes_t datagram)
980+
{
981+
if ((self != NULL) && (datagram.data != NULL)) {
982+
tx_frame_t* const frame = tx_frame_from_view(datagram);
983+
UDPARD_ASSERT(frame->refcount > 0); // NOLINT(*ArrayBound)
984+
frame->refcount++;
985+
}
986+
}
987+
988+
void udpard_tx_refcount_dec(udpard_tx_t* const self, const udpard_bytes_t datagram)
989+
{
990+
if ((self != NULL) && (datagram.data != NULL)) {
991+
tx_frame_t* const frame = tx_frame_from_view(datagram);
992+
UDPARD_ASSERT(frame->refcount > 0); // NOLINT(*ArrayBound)
993+
frame->refcount--;
994+
if (frame->refcount == 0U) {
995+
mem_free(self->memory.payload, tx_frame_object_size(datagram.size), frame);
996+
}
997+
}
998+
}
999+
9721000
void udpard_tx_free(udpard_tx_t* const self)
9731001
{
9741002
if (self != NULL) {

libudpard/udpard.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,13 +331,10 @@ typedef struct udpard_tx_ejection_t
331331
uint_fast8_t dscp; ///< Set the DSCP field of the outgoing packet to this.
332332
udpard_udpip_ep_t destination; ///< Unicast or multicast UDP/IP endpoint.
333333

334-
/// If the ejection handler returns success, the application is responsible for freeing the datagram_origin.data
335-
/// using udpard_tx_t::memory.payload.free() at some point in the future (either within the callback or later),
336-
/// unless datagram_origin.data is NULL, in which case the library will retain the ownership.
337-
/// It may help to know that the view is a small fixed offset greater than the origin,
338-
/// so both may not have to be kept, depending on the implementation.
339-
udpard_bytes_t datagram_view; ///< Transmit this; do not free it.
340-
udpard_bytes_mut_t datagram_origin; ///< Free this unless NULL.
334+
/// If the datagram pointer is retained by the application, udpard_tx_refcount_inc() must be invoked on it.
335+
/// When no longer needed (e.g, upon transmission), udpard_tx_refcount_dec() must be invoked.
336+
/// Ref counting is needed because the library may need to retain the buffer for subsequent retransmissions.
337+
udpard_bytes_t datagram;
341338

342339
/// This is the same pointer that was passed to udpard_tx_push().
343340
void* user_transfer_reference;
@@ -483,6 +480,11 @@ uint32_t udpard_tx_push(udpard_tx_t* const self,
483480
/// The function may deallocate memory. The time complexity is logarithmic in the number of enqueued transfers.
484481
void udpard_tx_poll(udpard_tx_t* const self, const udpard_us_t now);
485482

483+
/// When a datagram is ejected and the application opts to keep it, these functions must be used to manage the
484+
/// datagram buffer lifetime. The datagram will be freed once the reference count reaches zero.
485+
void udpard_tx_refcount_inc(udpard_tx_t* const self, const udpard_bytes_t datagram);
486+
void udpard_tx_refcount_dec(udpard_tx_t* const self, const udpard_bytes_t datagram);
487+
486488
/// Drops all enqueued items; afterward, the instance is safe to discard. Callbacks will not be invoked.
487489
void udpard_tx_free(udpard_tx_t* const self);
488490

0 commit comments

Comments
 (0)