1313#include <linux/btf_ids.h>
1414#include <asm/rqspinlock.h>
1515
16- #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
16+ #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_OVERWRITE )
1717
1818/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
1919#define RINGBUF_PGOFF \
2727struct bpf_ringbuf {
2828 wait_queue_head_t waitq ;
2929 struct irq_work work ;
30- u64 mask ;
30+ u64 mask :48 ;
31+ u64 overwrite_mode :1 ;
3132 struct page * * pages ;
3233 int nr_pages ;
3334 rqspinlock_t spinlock ____cacheline_aligned_in_smp ;
@@ -72,6 +73,7 @@ struct bpf_ringbuf {
7273 */
7374 unsigned long consumer_pos __aligned (PAGE_SIZE );
7475 unsigned long producer_pos __aligned (PAGE_SIZE );
76+ unsigned long overwrite_pos ; /* to be overwritten in overwrite mode */
7577 unsigned long pending_pos ;
7678 char data [] __aligned (PAGE_SIZE );
7779};
@@ -166,7 +168,8 @@ static void bpf_ringbuf_notify(struct irq_work *work)
166168 * considering that the maximum value of data_sz is (4GB - 1), there
167169 * will be no overflow, so just note the size limit in the comments.
168170 */
169- static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node )
171+ static struct bpf_ringbuf * bpf_ringbuf_alloc (size_t data_sz , int numa_node ,
172+ int overwrite_mode )
170173{
171174 struct bpf_ringbuf * rb ;
172175
@@ -183,17 +186,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
183186 rb -> consumer_pos = 0 ;
184187 rb -> producer_pos = 0 ;
185188 rb -> pending_pos = 0 ;
189+ rb -> overwrite_mode = overwrite_mode ;
186190
187191 return rb ;
188192}
189193
190194static struct bpf_map * ringbuf_map_alloc (union bpf_attr * attr )
191195{
196+ int overwrite_mode = 0 ;
192197 struct bpf_ringbuf_map * rb_map ;
193198
194199 if (attr -> map_flags & ~RINGBUF_CREATE_FLAG_MASK )
195200 return ERR_PTR (- EINVAL );
196201
202+ if (attr -> map_flags & BPF_F_OVERWRITE ) {
203+ if (attr -> map_type == BPF_MAP_TYPE_USER_RINGBUF )
204+ return ERR_PTR (- EINVAL );
205+ overwrite_mode = 1 ;
206+ }
207+
197208 if (attr -> key_size || attr -> value_size ||
198209 !is_power_of_2 (attr -> max_entries ) ||
199210 !PAGE_ALIGNED (attr -> max_entries ))
@@ -205,7 +216,8 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
205216
206217 bpf_map_init_from_attr (& rb_map -> map , attr );
207218
208- rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node );
219+ rb_map -> rb = bpf_ringbuf_alloc (attr -> max_entries , rb_map -> map .numa_node ,
220+ overwrite_mode );
209221 if (!rb_map -> rb ) {
210222 bpf_map_area_free (rb_map );
211223 return ERR_PTR (- ENOMEM );
@@ -295,11 +307,16 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
295307
296308static unsigned long ringbuf_avail_data_sz (struct bpf_ringbuf * rb )
297309{
298- unsigned long cons_pos , prod_pos ;
310+ unsigned long cons_pos , prod_pos , over_pos ;
299311
300312 cons_pos = smp_load_acquire (& rb -> consumer_pos );
301313 prod_pos = smp_load_acquire (& rb -> producer_pos );
302- return prod_pos - cons_pos ;
314+
315+ if (likely (!rb -> overwrite_mode ))
316+ return prod_pos - cons_pos ;
317+
318+ over_pos = READ_ONCE (rb -> overwrite_pos );
319+ return min (prod_pos - max (cons_pos , over_pos ), rb -> mask + 1 );
303320}
304321
305322static u32 ringbuf_total_data_sz (const struct bpf_ringbuf * rb )
@@ -402,11 +419,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
402419 return (void * )((addr & PAGE_MASK ) - off );
403420}
404421
422+
423+ static bool bpf_ringbuf_has_space (const struct bpf_ringbuf * rb ,
424+ unsigned long new_prod_pos ,
425+ unsigned long cons_pos ,
426+ unsigned long pend_pos )
427+ {
428+ /* no space if oldest not yet committed record until the newest
429+ * record span more than (ringbuf_size - 1)
430+ */
431+ if (new_prod_pos - pend_pos > rb -> mask )
432+ return false;
433+
434+ /* ok, we have space in ovewrite mode */
435+ if (unlikely (rb -> overwrite_mode ))
436+ return true;
437+
438+ /* no space if producer position advances more than (ringbuf_size - 1)
439+ * ahead than consumer position when not in overwrite mode
440+ */
441+ if (new_prod_pos - cons_pos > rb -> mask )
442+ return false;
443+
444+ return true;
445+ }
446+
447+ static u32 ringbuf_round_up_hdr_len (u32 hdr_len )
448+ {
449+ hdr_len &= ~BPF_RINGBUF_DISCARD_BIT ;
450+ return round_up (hdr_len + BPF_RINGBUF_HDR_SZ , 8 );
451+ }
452+
405453static void * __bpf_ringbuf_reserve (struct bpf_ringbuf * rb , u64 size )
406454{
407- unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , flags ;
455+ unsigned long flags ;
408456 struct bpf_ringbuf_hdr * hdr ;
409- u32 len , pg_off , tmp_size , hdr_len ;
457+ u32 len , pg_off , hdr_len ;
458+ unsigned long cons_pos , prod_pos , new_prod_pos , pend_pos , over_pos ;
410459
411460 if (unlikely (size > RINGBUF_MAX_RECORD_SZ ))
412461 return NULL ;
@@ -429,24 +478,39 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
429478 hdr_len = READ_ONCE (hdr -> len );
430479 if (hdr_len & BPF_RINGBUF_BUSY_BIT )
431480 break ;
432- tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT ;
433- tmp_size = round_up (tmp_size + BPF_RINGBUF_HDR_SZ , 8 );
434- pend_pos += tmp_size ;
481+ pend_pos += ringbuf_round_up_hdr_len (hdr_len );
435482 }
436483 rb -> pending_pos = pend_pos ;
437484
438- /* check for out of ringbuf space:
439- * - by ensuring producer position doesn't advance more than
440- * (ringbuf_size - 1) ahead
441- * - by ensuring oldest not yet committed record until newest
442- * record does not span more than (ringbuf_size - 1)
443- */
444- if (new_prod_pos - cons_pos > rb -> mask ||
445- new_prod_pos - pend_pos > rb -> mask ) {
485+ if (!bpf_ringbuf_has_space (rb , new_prod_pos , cons_pos , pend_pos )) {
446486 raw_res_spin_unlock_irqrestore (& rb -> spinlock , flags );
447487 return NULL ;
448488 }
449489
490+ /* In overwrite mode, move overwrite_pos to the next record to be
491+ * overwritten if the ring buffer is full
492+ */
493+ if (unlikely (rb -> overwrite_mode )) {
494+ over_pos = rb -> overwrite_pos ;
495+ while (new_prod_pos - over_pos > rb -> mask ) {
496+ hdr = (void * )rb -> data + (over_pos & rb -> mask );
497+ hdr_len = READ_ONCE (hdr -> len );
498+ /* since pending_pos is the first record with BUSY
499+ * bit set and overwrite_pos is never bigger than
500+ * pending_pos, no need to check BUSY bit here.
501+ */
502+ over_pos += ringbuf_round_up_hdr_len (hdr_len );
503+ }
504+ /* smp_store_release(&rb->producer_pos, new_prod_pos) at
505+ * the end of the function ensures that when consumer sees
506+ * the updated rb->producer_pos, it always sees the updated
507+ * rb->overwrite_pos, so when consumer reads overwrite_pos
508+ * after smp_load_acquire(r->producer_pos), the overwrite_pos
509+ * will always be valid.
510+ */
511+ WRITE_ONCE (rb -> overwrite_pos , over_pos );
512+ }
513+
450514 hdr = (void * )rb -> data + (prod_pos & rb -> mask );
451515 pg_off = bpf_ringbuf_rec_pg_off (rb , hdr );
452516 hdr -> len = size | BPF_RINGBUF_BUSY_BIT ;
@@ -479,7 +543,50 @@ const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
479543 .arg3_type = ARG_ANYTHING ,
480544};
481545
482- static void bpf_ringbuf_commit (void * sample , u64 flags , bool discard )
546+ static __always_inline
547+ bool ringbuf_should_wakeup (const struct bpf_ringbuf * rb ,
548+ unsigned long rec_pos ,
549+ unsigned long cons_pos ,
550+ u32 len , u64 flags )
551+ {
552+ unsigned long rec_end ;
553+
554+ if (flags & BPF_RB_FORCE_WAKEUP )
555+ return true;
556+
557+ if (flags & BPF_RB_NO_WAKEUP )
558+ return false;
559+
560+ /* for non-overwrite mode, if consumer caught up and is waiting for
561+ * our record, notify about new data availability
562+ */
563+ if (likely (!rb -> overwrite_mode ))
564+ return cons_pos == rec_pos ;
565+
566+ /* for overwrite mode, to give the consumer a chance to catch up
567+ * before being overwritten, wake up consumer every half a round
568+ * ahead.
569+ */
570+ rec_end = rec_pos + ringbuf_round_up_hdr_len (len );
571+
572+ cons_pos &= (rb -> mask >> 1 );
573+ rec_pos &= (rb -> mask >> 1 );
574+ rec_end &= (rb -> mask >> 1 );
575+
576+ if (cons_pos == rec_pos )
577+ return true;
578+
579+ if (rec_pos < cons_pos && cons_pos < rec_end )
580+ return true;
581+
582+ if (rec_end < rec_pos && (cons_pos > rec_pos || cons_pos < rec_end ))
583+ return true;
584+
585+ return false;
586+ }
587+
588+ static __always_inline
589+ void bpf_ringbuf_commit (void * sample , u64 flags , bool discard )
483590{
484591 unsigned long rec_pos , cons_pos ;
485592 struct bpf_ringbuf_hdr * hdr ;
@@ -495,15 +602,10 @@ static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
495602 /* update record header with correct final size prefix */
496603 xchg (& hdr -> len , new_len );
497604
498- /* if consumer caught up and is waiting for our record, notify about
499- * new data availability
500- */
501605 rec_pos = (void * )hdr - (void * )rb -> data ;
502606 cons_pos = smp_load_acquire (& rb -> consumer_pos ) & rb -> mask ;
503607
504- if (flags & BPF_RB_FORCE_WAKEUP )
505- irq_work_queue (& rb -> work );
506- else if (cons_pos == rec_pos && !(flags & BPF_RB_NO_WAKEUP ))
608+ if (ringbuf_should_wakeup (rb , rec_pos , cons_pos , new_len , flags ))
507609 irq_work_queue (& rb -> work );
508610}
509611
@@ -576,6 +678,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
576678 return smp_load_acquire (& rb -> consumer_pos );
577679 case BPF_RB_PROD_POS :
578680 return smp_load_acquire (& rb -> producer_pos );
681+ case BPF_RB_OVER_POS :
682+ return READ_ONCE (rb -> overwrite_pos );
579683 default :
580684 return 0 ;
581685 }
@@ -749,6 +853,9 @@ BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
749853
750854 rb = container_of (map , struct bpf_ringbuf_map , map )-> rb ;
751855
856+ if (unlikely (rb -> overwrite_mode ))
857+ return - EOPNOTSUPP ;
858+
752859 /* If another consumer is already consuming a sample, wait for them to finish. */
753860 if (!atomic_try_cmpxchg (& rb -> busy , & busy , 1 ))
754861 return - EBUSY ;
0 commit comments