1313#include <linux/btf_ids.h>
1414#include <asm/rqspinlock.h>
1515
16- #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
16+ #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE )
1717
1818/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
1919#define RINGBUF_PGOFF \
@@ -30,6 +30,7 @@ struct bpf_ringbuf {
3030 u64 mask ;
3131 struct page * * pages ;
3232 int nr_pages ;
33+ bool overwrite_mode ;
3334 rqspinlock_t spinlock ____cacheline_aligned_in_smp ;
3435 /* For user-space producer ring buffers, an atomic_t busy bit is used
3536 * to synchronize access to the ring buffers in the kernel, rather than
@@ -73,6 +74,7 @@ struct bpf_ringbuf {
7374 unsigned long consumer_pos __aligned (PAGE_SIZE );
7475 unsigned long producer_pos __aligned (PAGE_SIZE );
7576 unsigned long pending_pos ;
77+ unsigned long overwrite_pos ; /* position after the last overwritten record */
7678 char data [] __aligned (PAGE_SIZE );
7779};
7880
@@ -166,7 +168,7 @@ static void bpf_ringbuf_notify(struct irq_work *work)
166168 * considering that the maximum value of data_sz is (4GB - 1), there
167169 * will be no overflow, so just note the size limit in the comments.
168170 */
169- static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node )
171+ static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node , bool overwrite_mode )
170172{
171173 struct bpf_ringbuf * rb ;
172174
@@ -183,17 +185,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
183185 rb -> consumer_pos = 0 ;
184186 rb -> producer_pos = 0 ;
185187 rb -> pending_pos = 0 ;
188+ rb -> overwrite_mode = overwrite_mode ;
186189
187190 return rb ;
188191}
189192
190193static struct bpf_map * ringbuf_map_alloc (union bpf_attr * attr )
191194{
195+ bool overwrite_mode = false;
192196 struct bpf_ringbuf_map * rb_map ;
193197
194198 if (attr -> map_flags & ~RINGBUF_CREATE_FLAG_MASK )
195199 return ERR_PTR (- EINVAL );
196200
201+ if (attr -> map_flags & BPF_F_RB_OVERWRITE ) {
202+ if (attr -> map_type != BPF_MAP_TYPE_RINGBUF )
203+ return ERR_PTR (- EINVAL );
204+ overwrite_mode = true;
205+ }
206+
197207 if (attr -> key_size || attr -> value_size ||
198208 !is_power_of_2 (attr -> max_entries ) ||
199209 !PAGE_ALIGNED (attr -> max_entries ))
@@ -205,7 +215,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
205215
206216 bpf_map_init_from_attr (& rb_map -> map , attr );
207217
208- rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node );
218+ rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node , overwrite_mode );
209219 if (!rb_map -> rb ) {
210220 bpf_map_area_free (rb_map );
211221 return ERR_PTR (- ENOMEM );
@@ -293,13 +303,26 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
293303 return remap_vmalloc_range (vma , rb_map -> rb , vma -> vm_pgoff + RINGBUF_PGOFF );
294304}
295305
306+ /*
307+ * Return an estimate of the available data in the ring buffer.
308+ * Note: the returned value can exceed the actual ring buffer size because the
309+ * function is not synchronized with the producer. The producer acquires the
310+ * ring buffer's spinlock, but this function does not.
311+ */
296312static unsigned long ringbuf_avail_data_sz (struct bpf_ringbuf * rb )
297313{
298- unsigned long cons_pos , prod_pos ;
314+ unsigned long cons_pos , prod_pos , over_pos ;
299315
300316 cons_pos = smp_load_acquire (& rb -> consumer_pos );
301- prod_pos = smp_load_acquire (& rb -> producer_pos );
302- return prod_pos - cons_pos ;
317+
318+ if (unlikely (rb -> overwrite_mode )) {
319+ over_pos = smp_load_acquire (& rb -> overwrite_pos );
320+ prod_pos = smp_load_acquire (& rb -> producer_pos );
321+ return prod_pos - max (cons_pos , over_pos );
322+ } else {
323+ prod_pos = smp_load_acquire (& rb -> producer_pos );
324+ return prod_pos - cons_pos ;
325+ }
303326}
304327
305328static u32 ringbuf_total_data_sz (const struct bpf_ringbuf * rb )
@@ -402,11 +425,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
402425 return (void * )((addr & PAGE_MASK ) - off );
403426}
404427
428+ static bool bpf_ringbuf_has_space (const struct bpf_ringbuf * rb ,
429+ unsigned long new_prod_pos ,
430+ unsigned long cons_pos ,
431+ unsigned long pend_pos )
432+ {
433+ /*
434+ * No space if oldest not yet committed record until the newest
435+ * record span more than (ringbuf_size - 1).
436+ */
437+ if (new_prod_pos - pend_pos > rb -> mask )
438+ return false;
439+
440+ /* Ok, we have space in overwrite mode */
441+ if (unlikely (rb -> overwrite_mode ))
442+ return true;
443+
444+ /*
445+ * No space if producer position advances more than (ringbuf_size - 1)
446+ * ahead of consumer position when not in overwrite mode.
447+ */
448+ if (new_prod_pos - cons_pos > rb -> mask )
449+ return false;
450+
451+ return true;
452+ }
453+
454+ static u32 bpf_ringbuf_round_up_hdr_len (u32 hdr_len )
455+ {
456+ hdr_len &= ~BPF_RINGBUF_DISCARD_BIT ;
457+ return round_up (hdr_len + BPF_RINGBUF_HDR_SZ , 8 );
458+ }
459+
405460static void * __bpf_ringbuf_reserve (struct bpf_ringbuf * rb , u64 size )
406461{
407- unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , flags ;
462+ unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , over_pos , flags ;
408463 struct bpf_ringbuf_hdr * hdr ;
409- u32 len , pg_off , tmp_size , hdr_len ;
464+ u32 len , pg_off , hdr_len ;
410465
411466 if (unlikely (size > RINGBUF_MAX_RECORD_SZ ))
412467 return NULL ;
@@ -429,24 +484,43 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
429484 hdr_len = READ_ONCE (hdr -> len );
430485 if (hdr_len & BPF_RINGBUF_BUSY_BIT )
431486 break ;
432- tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT ;
433- tmp_size = round_up (tmp_size + BPF_RINGBUF_HDR_SZ , 8 );
434- pend_pos += tmp_size ;
487+ pend_pos += bpf_ringbuf_round_up_hdr_len (hdr_len );
435488 }
436489 rb -> pending_pos = pend_pos ;
437490
438- /* check for out of ringbuf space:
439- * - by ensuring producer position doesn't advance more than
440- * (ringbuf_size - 1) ahead
441- * - by ensuring oldest not yet committed record until newest
442- * record does not span more than (ringbuf_size - 1)
443- */
444- if (new_prod_pos - cons_pos > rb -> mask ||
445- new_prod_pos - pend_pos > rb -> mask ) {
491+ if (!bpf_ringbuf_has_space (rb , new_prod_pos , cons_pos , pend_pos )) {
446492 raw_res_spin_unlock_irqrestore (& rb -> spinlock , flags );
447493 return NULL ;
448494 }
449495
496+ /*
497+ * In overwrite mode, advance overwrite_pos when the ring buffer is full.
498+ * The key points are to stay on record boundaries and consume enough records
499+ * to fit the new one.
500+ */
501+ if (unlikely (rb -> overwrite_mode )) {
502+ over_pos = rb -> overwrite_pos ;
503+ while (new_prod_pos - over_pos > rb -> mask ) {
504+ hdr = (void * )rb -> data + (over_pos & rb -> mask );
505+ hdr_len = READ_ONCE (hdr -> len );
506+ /*
507+ * The bpf_ringbuf_has_space() check above ensures we won’t
508+ * step over a record currently being worked on by another
509+ * producer.
510+ */
511+ over_pos += bpf_ringbuf_round_up_hdr_len (hdr_len );
512+ }
513+ /*
514+ * smp_store_release(&rb->producer_pos, new_prod_pos) at
515+ * the end of the function ensures that when consumer sees
516+ * the updated rb->producer_pos, it always sees the updated
517+ * rb->overwrite_pos, so when consumer reads overwrite_pos
518+ * after smp_load_acquire(r->producer_pos), the overwrite_pos
519+ * will always be valid.
520+ */
521+ WRITE_ONCE (rb -> overwrite_pos , over_pos );
522+ }
523+
450524 hdr = (void * )rb -> data + (prod_pos & rb -> mask );
451525 pg_off = bpf_ringbuf_rec_pg_off (rb , hdr );
452526 hdr -> len = size | BPF_RINGBUF_BUSY_BIT ;
@@ -576,6 +650,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
576650 return smp_load_acquire (& rb -> consumer_pos );
577651 case BPF_RB_PROD_POS :
578652 return smp_load_acquire (& rb -> producer_pos );
653+ case BPF_RB_OVERWRITE_POS :
654+ return smp_load_acquire (& rb -> overwrite_pos );
579655 default :
580656 return 0 ;
581657 }
0 commit comments