Skip to content

Commit 944a6a2

Browse files
committed
Revert "Refactor RTSP and Snapshot Modules for Improved Buffer Handling and Zero-Copy Support"
This reverts commit a734e96.
1 parent 116f06e commit 944a6a2

File tree

17 files changed

+841
-1435
lines changed

17 files changed

+841
-1435
lines changed

src/buffer_pool.c

Lines changed: 36 additions & 259 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
#include <errno.h>
1212
#include <unistd.h>
1313
#include <sys/time.h>
14-
#include <sys/socket.h>
15-
#include <netinet/in.h>
1614

1715
#define WORKER_STATS_INC(field) \
1816
do \
@@ -83,9 +81,9 @@ static buffer_pool_segment_t *buffer_pool_segment_create(size_t buffer_size, siz
8381
{
8482
buffer_ref_t *ref = &segment->refs[i];
8583
ref->data = segment->buffers + (i * buffer_size);
84+
ref->size = buffer_size;
85+
ref->refcount = 0;
8686
ref->segment = segment;
87-
ref->type = BUFFER_TYPE_MEMORY;
88-
ref->parent_ref = NULL;
8987
ref->free_next = pool->free_list;
9088
pool->free_list = ref;
9189
}
@@ -209,22 +207,11 @@ void buffer_ref_put(buffer_ref_t *ref)
209207
ref->refcount--;
210208
if (ref->refcount <= 0)
211209
{
212-
if (ref->type == BUFFER_TYPE_MEMORY_SLICE)
213-
{
214-
buffer_ref_t *parent = ref->parent_ref;
215-
if (parent)
216-
{
217-
buffer_ref_put(parent);
218-
}
219-
free(ref);
220-
return;
221-
}
222-
223210
if (ref->type == BUFFER_TYPE_FILE)
224211
{
225-
if (ref->fd >= 0)
212+
if (ref->file_fd >= 0)
226213
{
227-
close(ref->fd);
214+
close(ref->file_fd);
228215
}
229216
free(ref);
230217
return;
@@ -245,76 +232,34 @@ void buffer_ref_put(buffer_ref_t *ref)
245232
}
246233
}
247234

248-
/**
249-
* Allocate one or more buffers from the pool (with partial allocation support)
250-
*
251-
* @param pool The buffer pool to allocate from
252-
* @param num_buffers Desired number of buffers to allocate
253-
* @param allocated [out] Optional pointer to receive actual number of buffers allocated
254-
* @return Head of linked list of allocated buffers (linked via process_next/send_next/free_next union)
255-
* Returns NULL if no buffers available at all
256-
*
257-
* Note: This function supports partial allocation. If fewer than num_buffers are available,
258-
* it will allocate as many as possible (minimum 1). Check *allocated for actual count.
259-
* Buffers are pre-linked together using the union field.
260-
*/
261-
buffer_ref_t *buffer_pool_alloc_from(buffer_pool_t *pool, size_t num_buffers, size_t *allocated)
235+
buffer_ref_t *buffer_pool_alloc_from(buffer_pool_t *pool, size_t size)
262236
{
263-
if (!pool || num_buffers == 0)
264-
{
265-
if (allocated)
266-
*allocated = 0;
237+
if (!pool || size > pool->buffer_size)
267238
return NULL;
268-
}
269239

270-
/* Try to expand if we don't have enough buffers */
271-
if (pool->num_free < num_buffers)
240+
if (!pool->free_list)
272241
{
273-
if (pool->num_free == 0)
242+
if (pool == &zerocopy_state.pool)
274243
{
275-
/* No buffers at all - must expand */
276-
if (pool == &zerocopy_state.pool)
277-
{
278-
WORKER_STATS_INC(pool_exhaustions);
279-
}
280-
else if (pool == &zerocopy_state.control_pool)
281-
{
282-
WORKER_STATS_INC(control_pool_exhaustions);
283-
}
284-
285-
if (buffer_pool_expand(pool) < 0)
286-
{
287-
logger(LOG_DEBUG, "%s: Cannot allocate any buffers (pool exhausted, max: %zu)",
288-
buffer_pool_name(pool), pool->max_buffers);
289-
if (allocated)
290-
*allocated = 0;
291-
return NULL;
292-
}
293-
294-
/* After expansion, we should have at least some buffers */
295-
if (pool->num_free == 0)
296-
{
297-
logger(LOG_ERROR, "%s: Expansion succeeded but still no free buffers",
298-
buffer_pool_name(pool));
299-
if (allocated)
300-
*allocated = 0;
301-
return NULL;
302-
}
244+
WORKER_STATS_INC(pool_exhaustions);
245+
}
246+
else if (pool == &zerocopy_state.control_pool)
247+
{
248+
WORKER_STATS_INC(control_pool_exhaustions);
303249
}
304250

305-
/* We have some buffers but not enough - try to expand to meet demand */
306-
size_t needed = num_buffers - pool->num_free;
307-
size_t expansions_needed = (needed + pool->expand_size - 1) / pool->expand_size;
251+
if (buffer_pool_expand(pool) < 0)
252+
{
253+
logger(LOG_DEBUG, "%s: Exhausted and cannot expand (total: %zu, max: %zu)",
254+
buffer_pool_name(pool), pool->num_buffers, pool->max_buffers);
255+
return NULL;
256+
}
308257

309-
for (size_t i = 0; i < expansions_needed && pool->num_free < num_buffers; i++)
258+
if (!pool->free_list)
310259
{
311-
if (buffer_pool_expand(pool) < 0)
312-
{
313-
/* Expansion failed, but we can still use what we have */
314-
logger(LOG_DEBUG, "%s: Partial allocation - requested %zu, have %zu",
315-
buffer_pool_name(pool), num_buffers, pool->num_free);
316-
break;
317-
}
260+
logger(LOG_ERROR, "%s: Expansion succeeded but free_list is still empty",
261+
buffer_pool_name(pool));
262+
return NULL;
318263
}
319264
}
320265
else if (pool->num_free <= pool->low_watermark && pool->num_buffers < pool->max_buffers)
@@ -329,89 +274,32 @@ buffer_ref_t *buffer_pool_alloc_from(buffer_pool_t *pool, size_t num_buffers, si
329274
}
330275
}
331276

332-
/* Allocate as many buffers as available (up to num_buffers) */
333-
size_t to_allocate = (pool->num_free < num_buffers) ? pool->num_free : num_buffers;
277+
buffer_ref_t *ref = pool->free_list;
278+
pool->free_list = ref->free_next;
279+
pool->num_free--;
334280

335-
if (to_allocate == 0)
281+
if (ref->segment)
336282
{
337-
if (allocated)
338-
*allocated = 0;
339-
return NULL;
283+
ref->segment->num_free--;
340284
}
341285

342-
/* Allocate buffers by taking the first to_allocate from free_list
343-
* Since free_next and process_next are union, the list is already linked! */
344-
buffer_ref_t *head = pool->free_list;
345-
buffer_ref_t *tail = head;
346-
347-
/* Traverse to find the tail and initialize each buffer */
348-
for (size_t i = 0; i < to_allocate; i++)
349-
{
350-
/* Initialize buffer */
351-
tail->data_offset = 0;
352-
tail->data_len = 0;
353-
tail->refcount = 1;
354-
tail->type = BUFFER_TYPE_MEMORY;
355-
tail->parent_ref = NULL;
356-
357-
if (tail->segment)
358-
{
359-
tail->segment->num_free--;
360-
}
361-
362-
/* Move to next, but remember current tail for cutting */
363-
if (i < to_allocate - 1)
364-
{
365-
tail = tail->process_next; /* Using union: free_next == process_next */
366-
}
367-
}
368-
369-
/* Cut the list: update pool's free_list to point after our allocated chunk */
370-
pool->free_list = tail->process_next; /* The rest of free list */
371-
tail->process_next = NULL; /* Terminate our allocated list */
372-
373-
pool->num_free -= to_allocate;
286+
ref->refcount = 1;
287+
ref->size = size;
288+
ref->send_next = NULL;
374289

375290
buffer_pool_update_stats(pool);
376291

377-
if (allocated)
378-
*allocated = to_allocate;
379-
380-
return head;
292+
return ref;
381293
}
382294

383-
buffer_ref_t *buffer_pool_alloc(void)
295+
buffer_ref_t *buffer_pool_alloc(size_t size)
384296
{
385-
return buffer_pool_alloc_from(&zerocopy_state.pool, 1, NULL);
297+
return buffer_pool_alloc_from(&zerocopy_state.pool, size);
386298
}
387299

388-
buffer_ref_t *buffer_ref_create_slice(buffer_ref_t *source, size_t offset, size_t length)
300+
buffer_ref_t *buffer_pool_alloc_control(size_t size)
389301
{
390-
if (!source)
391-
return NULL;
392-
393-
size_t available = 0;
394-
if (source->data_len >= (size_t)source->data_offset)
395-
available = source->data_len - (size_t)source->data_offset;
396-
397-
if (offset > available || length > available - offset)
398-
return NULL;
399-
400-
buffer_ref_t *slice = calloc(1, sizeof(buffer_ref_t));
401-
if (!slice)
402-
return NULL;
403-
404-
buffer_ref_get(source);
405-
406-
slice->type = BUFFER_TYPE_MEMORY_SLICE;
407-
slice->data = source->data;
408-
slice->data_offset = source->data_offset + (off_t)offset;
409-
slice->data_len = length;
410-
slice->segment = source->segment;
411-
slice->parent_ref = source;
412-
slice->refcount = 1;
413-
414-
return slice;
302+
return buffer_pool_alloc_from(&zerocopy_state.control_pool, size);
415303
}
416304

417305
static void buffer_pool_try_shrink_pool(buffer_pool_t *pool, size_t min_buffers)
@@ -524,114 +412,3 @@ void buffer_pool_try_shrink(void)
524412
buffer_pool_try_shrink_pool(&zerocopy_state.pool, BUFFER_POOL_INITIAL_SIZE);
525413
buffer_pool_try_shrink_pool(&zerocopy_state.control_pool, CONTROL_POOL_INITIAL_SIZE);
526414
}
527-
528-
/*
529-
* Batch receive packets from a socket into a linked list
530-
* Uses recvmmsg() to receive multiple packets in a single system call
531-
*/
532-
buffer_ref_t *buffer_pool_batch_recv(int sock, int save_peer_info, const char *drain_label,
533-
int *packets_received, int *packets_dropped)
534-
{
535-
int dropped = 0;
536-
537-
/* Pre-allocate buffers - supports partial allocation if pool is low */
538-
size_t buf_count = 0;
539-
buffer_ref_t *bufs_head = buffer_pool_alloc_from(&zerocopy_state.pool,
540-
MAX_RECV_PACKETS_PER_BATCH,
541-
&buf_count);
542-
543-
if (!bufs_head || buf_count == 0)
544-
{
545-
/* No buffers available - drain socket to avoid blocking sender */
546-
logger(LOG_DEBUG, "%s: No buffers available, draining socket", drain_label);
547-
uint8_t dummy[BUFFER_POOL_BUFFER_SIZE];
548-
while (1)
549-
{
550-
ssize_t drained = recv(sock, dummy, sizeof(dummy), MSG_DONTWAIT);
551-
if (drained < 0)
552-
break;
553-
dropped++;
554-
}
555-
556-
if (packets_received)
557-
*packets_received = 0;
558-
if (packets_dropped)
559-
*packets_dropped = dropped;
560-
return NULL;
561-
}
562-
563-
/* Build array of buffer pointers from linked list for recvmmsg */
564-
buffer_ref_t *bufs[MAX_RECV_PACKETS_PER_BATCH];
565-
struct mmsghdr msgs[MAX_RECV_PACKETS_PER_BATCH];
566-
struct iovec iovecs[MAX_RECV_PACKETS_PER_BATCH];
567-
568-
memset(msgs, 0, sizeof(msgs));
569-
570-
/* Convert linked list to array and setup mmsghdr structures */
571-
buffer_ref_t *cur = bufs_head;
572-
573-
for (size_t i = 0; i < buf_count && cur; i++)
574-
{
575-
bufs[i] = cur;
576-
577-
/* Setup iovec */
578-
iovecs[i].iov_base = cur->data;
579-
iovecs[i].iov_len = cur->segment->parent->buffer_size;
580-
581-
/* Setup mmsghdr */
582-
msgs[i].msg_hdr.msg_iov = &iovecs[i];
583-
msgs[i].msg_hdr.msg_iovlen = 1;
584-
585-
if (save_peer_info)
586-
{
587-
/* Direct write to buffer's recv_info - no memcpy needed later */
588-
msgs[i].msg_hdr.msg_name = &cur->recv_info.peer_addr;
589-
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
590-
}
591-
592-
cur = cur->process_next;
593-
}
594-
595-
/* Receive multiple messages in ONE system call */
596-
struct timespec timeout = {0, 0}; /* Non-blocking */
597-
int received = recvmmsg(sock, msgs, buf_count, MSG_DONTWAIT, &timeout);
598-
buffer_ref_t *result = NULL;
599-
600-
if (received < 0)
601-
{
602-
if (errno != EAGAIN)
603-
{
604-
logger(LOG_DEBUG, "%s: recvmmsg failed: %s", drain_label, strerror(errno));
605-
}
606-
received = 0; /* Treat error as 0 packets received */
607-
}
608-
else if (received > 0)
609-
{
610-
/* Update data length for received packets */
611-
for (int i = 0; i < received; i++)
612-
{
613-
bufs[i]->data_len = msgs[i].msg_len;
614-
}
615-
result = bufs_head;
616-
}
617-
618-
/* Free unused buffers (allocated but not received) */
619-
if ((size_t)received < buf_count)
620-
{
621-
buffer_ref_t *unused = (received > 0) ? bufs[received] : bufs_head;
622-
while (unused)
623-
{
624-
buffer_ref_t *next = unused->process_next;
625-
buffer_ref_put(unused);
626-
unused = next;
627-
}
628-
}
629-
630-
/* Write output parameters once at the end */
631-
if (packets_received)
632-
*packets_received = received;
633-
if (packets_dropped)
634-
*packets_dropped = dropped;
635-
636-
return result;
637-
}

0 commit comments

Comments
 (0)