@@ -27,10 +27,13 @@ struct ring {
2727 ring_buffer_sample_fn sample_cb ;
2828 void * ctx ;
2929 void * data ;
30+ void * read_buffer ;
3031 unsigned long * consumer_pos ;
3132 unsigned long * producer_pos ;
33+ unsigned long * overwrite_pos ;
3234 unsigned long mask ;
3335 int map_fd ;
36+ bool overwrite_mode ;
3437};
3538
3639struct ring_buffer {
@@ -69,6 +72,9 @@ static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
6972 r -> producer_pos = NULL ;
7073 }
7174
75+ if (r -> read_buffer )
76+ free (r -> read_buffer );
77+
7278 free (r );
7379}
7480
@@ -119,6 +125,14 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
119125 r -> sample_cb = sample_cb ;
120126 r -> ctx = ctx ;
121127 r -> mask = info .max_entries - 1 ;
128+ r -> overwrite_mode = info .map_flags & BPF_F_OVERWRITE ;
129+ if (unlikely (r -> overwrite_mode )) {
130+ r -> read_buffer = malloc (info .max_entries );
131+ if (!r -> read_buffer ) {
132+ err = - ENOMEM ;
133+ goto err_out ;
134+ }
135+ }
122136
123137 /* Map writable consumer page */
124138 tmp = mmap (NULL , rb -> page_size , PROT_READ | PROT_WRITE , MAP_SHARED , map_fd , 0 );
@@ -148,6 +162,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
148162 goto err_out ;
149163 }
150164 r -> producer_pos = tmp ;
165+ r -> overwrite_pos = r -> producer_pos + 1 ; /* overwrite_pos is next to producer_pos */
151166 r -> data = tmp + rb -> page_size ;
152167
153168 e = & rb -> events [rb -> ring_cnt ];
@@ -232,7 +247,7 @@ static inline int roundup_len(__u32 len)
232247 return (len + 7 ) / 8 * 8 ;
233248}
234249
235- static int64_t ringbuf_process_ring (struct ring * r , size_t n )
250+ static int64_t ringbuf_process_normal_ring (struct ring * r , size_t n )
236251{
237252 int * len_ptr , len , err ;
238253 /* 64-bit to avoid overflow in case of extreme application behavior */
@@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n)
278293 return cnt ;
279294}
280295
296+ static int64_t ringbuf_process_overwrite_ring (struct ring * r , size_t n )
297+ {
298+
299+ int err ;
300+ uint32_t * len_ptr , len ;
301+ /* 64-bit to avoid overflow in case of extreme application behavior */
302+ int64_t cnt = 0 ;
303+ size_t size , offset ;
304+ unsigned long cons_pos , prod_pos , over_pos , tmp_pos ;
305+ bool got_new_data ;
306+ void * sample ;
307+ bool copied ;
308+
309+ size = r -> mask + 1 ;
310+
311+ cons_pos = smp_load_acquire (r -> consumer_pos );
312+ do {
313+ got_new_data = false;
314+
315+ /* grab a copy of data */
316+ prod_pos = smp_load_acquire (r -> producer_pos );
317+ do {
318+ over_pos = READ_ONCE (* r -> overwrite_pos );
319+ /* prod_pos may be outdated now */
320+ if (over_pos < prod_pos ) {
321+ tmp_pos = max (cons_pos , over_pos );
322+ /* smp_load_acquire(r->producer_pos) before
323+ * READ_ONCE(*r->overwrite_pos) ensures that
324+ * over_pos + r->mask < prod_pos never occurs,
325+ * so size is never larger than r->mask
326+ */
327+ size = prod_pos - tmp_pos ;
328+ if (!size )
329+ goto done ;
330+ memcpy (r -> read_buffer ,
331+ r -> data + (tmp_pos & r -> mask ), size );
332+ copied = true;
333+ } else {
334+ copied = false;
335+ }
336+ prod_pos = smp_load_acquire (r -> producer_pos );
337+ /* retry if data is overwritten by producer */
338+ } while (!copied || prod_pos - tmp_pos > r -> mask );
339+
340+ cons_pos = tmp_pos ;
341+
342+ for (offset = 0 ; offset < size ; offset += roundup_len (len )) {
343+ len_ptr = r -> read_buffer + (offset & r -> mask );
344+ len = * len_ptr ;
345+
346+ if (len & BPF_RINGBUF_BUSY_BIT )
347+ goto done ;
348+
349+ got_new_data = true;
350+ cons_pos += roundup_len (len );
351+
352+ if ((len & BPF_RINGBUF_DISCARD_BIT ) == 0 ) {
353+ sample = (void * )len_ptr + BPF_RINGBUF_HDR_SZ ;
354+ err = r -> sample_cb (r -> ctx , sample , len );
355+ if (err < 0 ) {
356+ /* update consumer pos and bail out */
357+ smp_store_release (r -> consumer_pos ,
358+ cons_pos );
359+ return err ;
360+ }
361+ cnt ++ ;
362+ }
363+
364+ if (cnt >= n )
365+ goto done ;
366+ }
367+ } while (got_new_data );
368+
369+ done :
370+ smp_store_release (r -> consumer_pos , cons_pos );
371+ return cnt ;
372+ }
373+
374+ static int64_t ringbuf_process_ring (struct ring * r , size_t n )
375+ {
376+ if (likely (!r -> overwrite_mode ))
377+ return ringbuf_process_normal_ring (r , n );
378+ else
379+ return ringbuf_process_overwrite_ring (r , n );
380+ }
381+
281382/* Consume available ring buffer(s) data without event polling, up to n
282383 * records.
283384 *
0 commit comments