@@ -45,13 +45,13 @@ static inline buffer_ref_t *connection_alloc_output_buffer(connection_t *c)
4545
4646 if (c -> buffer_class == CONNECTION_BUFFER_CONTROL )
4747 {
48- buf_ref = buffer_pool_alloc_control (BUFFER_POOL_BUFFER_SIZE );
48+ buf_ref = buffer_pool_alloc_control ();
4949 if (!buf_ref )
50- buf_ref = buffer_pool_alloc (BUFFER_POOL_BUFFER_SIZE );
50+ buf_ref = buffer_pool_alloc ();
5151 }
5252 else
5353 {
54- buf_ref = buffer_pool_alloc (BUFFER_POOL_BUFFER_SIZE );
54+ buf_ref = buffer_pool_alloc ();
5555 }
5656
5757 return buf_ref ;
@@ -63,8 +63,8 @@ static size_t connection_compute_limit_bytes(buffer_pool_t *pool, size_t fair_by
6363
6464 if (pool -> max_buffers > 0 )
6565 {
66- size_t global_cap = pool -> max_buffers * pool -> buffer_size ;
67- size_t reserve = CONN_QUEUE_MIN_BUFFERS * pool -> buffer_size ;
66+ size_t global_cap = pool -> max_buffers * BUFFER_POOL_BUFFER_SIZE ;
67+ size_t reserve = CONN_QUEUE_MIN_BUFFERS * BUFFER_POOL_BUFFER_SIZE ;
6868 if (global_cap > reserve )
6969 {
7070 size_t hard_cap = global_cap - reserve ;
@@ -111,8 +111,8 @@ static size_t connection_calculate_queue_limit(connection_t *c, int64_t now_ms)
111111 if (pool -> num_free < pool -> low_watermark / 2 || utilization >= CONN_QUEUE_DRAIN_UTIL_THRESHOLD )
112112 burst_factor = CONN_QUEUE_BURST_FACTOR_DRAIN ;
113113
114- size_t fair_bytes = share_buffers * pool -> buffer_size ;
115- double queue_mem_bytes = (double )c -> zc_queue .num_queued * (double )pool -> buffer_size ;
114+ size_t fair_bytes = share_buffers * BUFFER_POOL_BUFFER_SIZE ;
115+ double queue_mem_bytes = (double )c -> zc_queue .num_queued * (double )BUFFER_POOL_BUFFER_SIZE ;
116116
117117 if (c -> queue_avg_bytes <= 0.0 )
118118 c -> queue_avg_bytes = queue_mem_bytes ;
@@ -169,32 +169,14 @@ static inline void connection_record_drop(connection_t *c, size_t len)
169169 c -> backpressure_events ++ ;
170170}
171171
172- static void connection_maybe_report_queue (connection_t * c , int64_t now_ms , int force )
172+ static void connection_report_queue (connection_t * c )
173173{
174174 if (c -> status_index < 0 )
175175 return ;
176176
177177 size_t queue_buffers = c -> zc_queue .num_queued ;
178178 size_t queue_bytes = c -> zc_queue .num_queued * BUFFER_POOL_BUFFER_SIZE ;
179179
180- int need_report = 0 ;
181-
182- if (c -> last_queue_report_ts == 0 )
183- {
184- need_report = 1 ;
185- }
186- else if (now_ms - c -> last_queue_report_ts >= CONNECTION_QUEUE_REPORT_INTERVAL_MS )
187- {
188- need_report = 1 ;
189- }
190- else if (force && queue_bytes == 0 && queue_buffers == 0 )
191- {
192- need_report = 1 ;
193- }
194-
195- if (!need_report )
196- return ;
197-
198180 status_update_client_queue (c -> status_index ,
199181 queue_bytes ,
200182 queue_buffers ,
@@ -205,10 +187,6 @@ static void connection_maybe_report_queue(connection_t *c, int64_t now_ms, int f
205187 c -> dropped_bytes ,
206188 c -> backpressure_events ,
207189 c -> slow_active );
208-
209- c -> last_queue_report_ts = now_ms ;
210- c -> last_reported_queue_bytes = queue_bytes ;
211- c -> last_reported_drops = c -> dropped_packets ;
212190}
213191int connection_set_nonblocking (int fd )
214192{
@@ -270,9 +248,6 @@ connection_t *connection_create(int fd, int epfd,
270248 c -> queue_buffers_highwater = 0 ;
271249 c -> dropped_packets = 0 ;
272250 c -> dropped_bytes = 0 ;
273- c -> last_reported_queue_bytes = 0 ;
274- c -> last_reported_drops = 0 ;
275- c -> last_queue_report_ts = 0 ;
276251 c -> backpressure_events = 0 ;
277252 c -> stream_registered = 0 ;
278253 c -> queue_avg_bytes = 0.0 ;
@@ -384,9 +359,10 @@ int connection_queue_output(connection_t *c, const uint8_t *data, size_t len)
384359
385360 /* Copy data into the buffer */
386361 memcpy (buf_ref -> data , src , chunk_size );
362+ buf_ref -> data_size = chunk_size ;
387363
388364 /* Queue this buffer for zero-copy send */
389- if (connection_queue_zerocopy (c , buf_ref , 0 , chunk_size ) < 0 )
365+ if (connection_queue_zerocopy (c , buf_ref ) < 0 )
390366 {
391367 /* Queue full - release the buffer and fail */
392368 buffer_ref_put (buf_ref );
@@ -419,11 +395,9 @@ connection_write_status_t connection_handle_write(connection_t *c)
419395 if (!c )
420396 return CONNECTION_WRITE_IDLE ;
421397
422- int64_t now_ms = get_time_ms ();
423-
424398 if (!c -> zc_queue .head )
425399 {
426- connection_maybe_report_queue ( c , now_ms , 0 );
400+ connection_report_queue ( c );
427401 if (c -> state == CONN_CLOSING && !c -> zc_queue .pending_head )
428402 return CONNECTION_WRITE_CLOSED ;
429403 return CONNECTION_WRITE_IDLE ;
@@ -435,24 +409,24 @@ connection_write_status_t connection_handle_write(connection_t *c)
435409 if (ret < 0 && ret != -2 )
436410 {
437411 c -> state = CONN_CLOSING ;
438- connection_maybe_report_queue ( c , now_ms , 1 );
412+ connection_report_queue ( c );
439413 return CONNECTION_WRITE_CLOSED ;
440414 }
441415
442416 if (ret == -2 )
443417 {
444- connection_maybe_report_queue ( c , now_ms , 0 );
418+ connection_report_queue ( c );
445419 return CONNECTION_WRITE_BLOCKED ;
446420 }
447421
448422 if (c -> zc_queue .head )
449423 {
450- connection_maybe_report_queue ( c , now_ms , 0 );
424+ connection_report_queue ( c );
451425 return CONNECTION_WRITE_PENDING ;
452426 }
453427
454428 connection_epoll_update_events (c -> epfd , c -> fd , EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR );
455- connection_maybe_report_queue ( c , now_ms , 1 );
429+ connection_report_queue ( c );
456430
457431 if (c -> state == CONN_CLOSING && !c -> zc_queue .pending_head )
458432 return CONNECTION_WRITE_CLOSED ;
@@ -796,34 +770,34 @@ int connection_route_and_start(connection_t *c)
796770 }
797771}
798772
799- int connection_queue_zerocopy (connection_t * c , buffer_ref_t * buf_ref , size_t offset , size_t len )
773+ int connection_queue_zerocopy (connection_t * c , buffer_ref_t * buf_ref )
800774{
801- if (!c || !buf_ref || len == 0 )
775+ if (!c || !buf_ref || buf_ref -> data_size == 0 )
802776 return 0 ;
803777
804778 int64_t now_ms = get_time_ms ();
805779 size_t limit_bytes = connection_calculate_queue_limit (c , now_ms );
806780 size_t queued_bytes = c -> zc_queue .num_queued * BUFFER_POOL_BUFFER_SIZE ;
807- size_t projected_bytes = queued_bytes + len ;
781+ size_t projected_bytes = queued_bytes + buf_ref -> data_size ;
808782
809783 c -> queue_limit_bytes = limit_bytes ;
810784
811785 if (projected_bytes > limit_bytes )
812786 {
813- connection_record_drop (c , len );
787+ connection_record_drop (c , buf_ref -> data_size );
814788
815789 if (c -> backpressure_events == 1 || (c -> backpressure_events % 200 ) == 0 )
816790 {
817791 logger (LOG_DEBUG , "Backpressure: dropping %zu bytes for client fd=%d (queued=%zu limit=%zu drops=%llu)" ,
818- len , c -> fd , queued_bytes , limit_bytes , (unsigned long long )c -> dropped_packets );
792+ buf_ref -> data_size , c -> fd , queued_bytes , limit_bytes , (unsigned long long )c -> dropped_packets );
819793 }
820794
821- connection_maybe_report_queue ( c , now_ms , 1 );
795+ connection_report_queue ( c );
822796 return -1 ;
823797 }
824798
825799 /* Add to zero-copy queue with offset information */
826- int ret = zerocopy_queue_add (& c -> zc_queue , buf_ref , offset , len );
800+ int ret = zerocopy_queue_add (& c -> zc_queue , buf_ref );
827801 if (ret < 0 )
828802 return -1 ; /* Queue full */
829803
@@ -833,17 +807,14 @@ int connection_queue_zerocopy(connection_t *c, buffer_ref_t *buf_ref, size_t off
833807 if (c -> zc_queue .num_queued > c -> queue_buffers_highwater )
834808 c -> queue_buffers_highwater = c -> zc_queue .num_queued ;
835809
836- connection_maybe_report_queue ( c , now_ms , 0 );
810+ connection_report_queue ( c );
837811
838812 /* Batching optimization: Only enable EPOLLOUT when flush threshold is reached
839- * This accumulates small RTP packets (200-1400 bytes) before sending:
840- * - Flush when accumulated >= 10KB (ZEROCOPY_BATCH_BYTES)
841- * - Flush when timeout >= 5ms (ZEROCOPY_BATCH_TIMEOUT_US)
842813 * Benefits:
843814 * - Reduces sendmsg() syscall overhead (fewer calls)
844815 * - Reduces MSG_ZEROCOPY optmem consumption (fewer operations)
845816 * - Better batching with iovec (up to 64 packets per sendmsg)
846- * - Lower latency impact (5ms is acceptable for streaming)
817+ * - Lower latency impact (100ms is acceptable for streaming)
847818 */
848819 if (zerocopy_should_flush (& c -> zc_queue ))
849820 {
0 commit comments