Skip to content

Commit 6728b45

Browse files
Xu KuohaiKernel Patches Daemon
authored andcommitted
bpf: Add overwrite mode for bpf ring buffer
When the bpf ring buffer is full, new events can not be recorded util the consumer consumes some events to free space. This may cause critical events to be discarded, such as in fault diagnostic, where recent events are more critical than older ones. So add ovewrite mode for bpf ring buffer. In this mode, the new event overwrites the oldest event when the buffer is full. The scheme is as follows: 1. producer_pos tracks the next position to write new data. When there is enough free space, producer simply moves producer_pos forward to make space for the new event. 2. To avoid waiting for consumer to free space when the buffer is full, a new variable overwrite_pos is introduced for producer. overwrite_pos tracks the next event to be overwritten (the oldest event committed) in the buffer. producer moves it forward to discard the oldest events when the buffer is full. 3. pending_pos tracks the oldest event under committing. producer ensures producers_pos never passes pending_pos when making space for new events. So multiple producers never write to the same position at the same time. 4. producer wakes up consumer every half a round ahead to give it a chance to retrieve data. However, for an overwrite-mode ring buffer, users typically only cares about the ring buffer snapshot before a fault occurs. In this case, the producer should commit data with BPF_RB_NO_WAKEUP flag to avoid unnecessary wakeups. To make it clear, here are some example diagrams. 1. Let's say we have a ring buffer with size 4096. At first, {producer,overwrite,pending,consumer}_pos are all set to 0 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | +-----------------------------------------------------------------------+ ^ | | producer_pos = 0 overwrite_pos = 0 pending_pos = 0 consumer_pos = 0 2. Reserve event A, size 512. There is enough free space, so A is allocated at offset 0 and producer_pos is moved to 512, the end of A. Since A is not submitted, the BUSY bit is set. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | A | | | [BUSY] | | +-----------------------------------------------------------------------+ ^ ^ | | | | | producer_pos = 512 | overwrite_pos = 0 pending_pos = 0 consumer_pos = 0 3. Reserve event B, size 1024. B is allocated at offset 512 with BUSY bit set, and producer_pos is moved to the end of B. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | A | B | | | [BUSY] | [BUSY] | | +-----------------------------------------------------------------------+ ^ ^ | | | | | producer_pos = 1536 | overwrite_pos = 0 pending_pos = 0 consumer_pos = 0 4. Reserve event C, size 2048. C is allocated at offset 1536 and producer_pos becomes 3584. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | A | B | C | | | [BUSY] | [BUSY] | [BUSY] | | +-----------------------------------------------------------------------+ ^ ^ | | | | | producer_pos = 3584 | overwrite_pos = 0 pending_pos = 0 consumer_pos = 0 5. Submit event A. The BUSY bit of A is cleared. B becomes the oldest event under writing, so pending_pos is moved to 512, the start of B. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | A | B | C | | | | [BUSY] | [BUSY] | | +-----------------------------------------------------------------------+ ^ ^ ^ | | | | | | | pending_pos = 512 producer_pos = 3584 | overwrite_pos = 0 consumer_pos = 0 6. Submit event B. The BUSY bit of B is cleared, and pending_pos is moved to the start of C, which is the oldest event under writing now. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | A | B | C | | | | | [BUSY] | | +-----------------------------------------------------------------------+ ^ ^ ^ | | | | | | | pending_pos = 1536 producer_pos = 3584 | overwrite_pos = 0 consumer_pos = 0 7. Reserve event D, size 1536 (3 * 512). There are 2048 bytes not under writing between producer_pos and pending_pos, so D is allocated at offset 3584, and producer_pos is moved from 3584 to 5120. Since event D will overwrite all bytes of event A and the begining 512 bytes of event B, overwrite_pos is moved to the start of event C, the oldest event that is not overwritten. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | D End | | C | D Begin| | [BUSY] | | [BUSY] | [BUSY] | +-----------------------------------------------------------------------+ ^ ^ ^ | | | | | pending_pos = 1536 | | overwrite_pos = 1536 | | | producer_pos=5120 | consumer_pos = 0 8. Reserve event E, size 1024. Though there are 512 bytes not under writing between producer_pos and pending_pos, E can not be reserved, as it would overwrite the first 512 bytes of event C, which is still under writing. 9. Submit event C and D. pending_pos is moved to the end of D. 0 512 1024 1536 2048 2560 3072 3584 4096 +-----------------------------------------------------------------------+ | | | | | | D End | | C | D Begin| | | | | | +-----------------------------------------------------------------------+ ^ ^ ^ | | | | | overwrite_pos = 1536 | | | producer_pos=5120 | pending_pos=5120 | consumer_pos = 0 The performance data for overwrite mode will be provided in a follow-up patch that adds overwrite mode benchs. A sample of performance data for non-overwrite mode on an x86_64 and arm64 CPU, before and after this patch, is shown below. As we can see, no obvious performance regression occurs. - x86_64 (AMD EPYC 9654) Before: Ringbuf, multi-producer contention ================================== rb-libbpf nr_prod 1 13.218 ± 0.039M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 2 15.684 ± 0.015M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 3 7.771 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 4 6.281 ± 0.001M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 8 2.842 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 12 2.001 ± 0.004M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 16 1.833 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 20 1.508 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 24 1.421 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 28 1.309 ± 0.001M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 32 1.265 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 36 1.198 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 40 1.174 ± 0.001M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 44 1.113 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 48 1.097 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 52 1.070 ± 0.002M/s (drops 0.000 ± 0.000M/s) After: Ringbuf, multi-producer contention ================================== rb-libbpf nr_prod 1 13.751 ± 0.673M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 2 15.592 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 3 7.776 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 4 6.463 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 8 2.883 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 12 2.017 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 16 1.816 ± 0.004M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 20 1.512 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 24 1.396 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 28 1.303 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 32 1.267 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 36 1.210 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 40 1.181 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 44 1.136 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 48 1.090 ± 0.001M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 52 1.091 ± 0.002M/s (drops 0.000 ± 0.000M/s) - arm64 (HiSilicon Kunpeng 920) Before: Ringbuf, multi-producer contention ================================== rb-libbpf nr_prod 1 11.602 ± 0.423M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 2 9.599 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 3 6.669 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 4 4.806 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 8 3.856 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 12 3.368 ± 0.003M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 16 3.210 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 20 3.003 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 24 2.944 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 28 2.863 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 32 2.819 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 36 2.887 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 40 2.837 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 44 2.787 ± 0.012M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 48 2.738 ± 0.010M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 52 2.700 ± 0.007M/s (drops 0.000 ± 0.000M/s) After: Ringbuf, multi-producer contention ================================== rb-libbpf nr_prod 1 11.614 ± 0.268M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 2 9.917 ± 0.007M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 3 6.920 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 4 4.803 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 8 3.898 ± 0.002M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 12 3.426 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 16 3.320 ± 0.008M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 20 3.029 ± 0.013M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 24 3.068 ± 0.012M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 28 2.890 ± 0.009M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 32 2.950 ± 0.012M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 36 2.812 ± 0.006M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 40 2.834 ± 0.009M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 44 2.803 ± 0.010M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 48 2.766 ± 0.010M/s (drops 0.000 ± 0.000M/s) rb-libbpf nr_prod 52 2.754 ± 0.009M/s (drops 0.000 ± 0.000M/s) Signed-off-by: Xu Kuohai <[email protected]>
1 parent ba991b7 commit 6728b45

File tree

3 files changed

+141
-26
lines changed

3 files changed

+141
-26
lines changed

include/uapi/linux/bpf.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,9 @@ enum {
14301430

14311431
/* Do not translate kernel bpf_arena pointers to user pointers */
14321432
BPF_F_NO_USER_CONV = (1U << 18),
1433+
1434+
/* bpf ringbuf works in overwrite mode? */
1435+
BPF_F_OVERWRITE = (1U << 19),
14331436
};
14341437

14351438
/* Flags for BPF_PROG_QUERY. */
@@ -6215,6 +6218,7 @@ enum {
62156218
BPF_RB_RING_SIZE = 1,
62166219
BPF_RB_CONS_POS = 2,
62176220
BPF_RB_PROD_POS = 3,
6221+
BPF_RB_OVER_POS = 4,
62186222
};
62196223

62206224
/* BPF ring buffer constants */

kernel/bpf/ringbuf.c

Lines changed: 133 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#include <linux/btf_ids.h>
1414
#include <asm/rqspinlock.h>
1515

16-
#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
16+
#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_OVERWRITE)
1717

1818
/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
1919
#define RINGBUF_PGOFF \
@@ -27,7 +27,8 @@
2727
struct bpf_ringbuf {
2828
wait_queue_head_t waitq;
2929
struct irq_work work;
30-
u64 mask;
30+
u64 mask:48;
31+
u64 overwrite_mode:1;
3132
struct page **pages;
3233
int nr_pages;
3334
rqspinlock_t spinlock ____cacheline_aligned_in_smp;
@@ -72,6 +73,7 @@ struct bpf_ringbuf {
7273
*/
7374
unsigned long consumer_pos __aligned(PAGE_SIZE);
7475
unsigned long producer_pos __aligned(PAGE_SIZE);
76+
unsigned long overwrite_pos; /* to be overwritten in overwrite mode */
7577
unsigned long pending_pos;
7678
char data[] __aligned(PAGE_SIZE);
7779
};
@@ -166,7 +168,8 @@ static void bpf_ringbuf_notify(struct irq_work *work)
166168
* considering that the maximum value of data_sz is (4GB - 1), there
167169
* will be no overflow, so just note the size limit in the comments.
168170
*/
169-
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
171+
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node,
172+
int overwrite_mode)
170173
{
171174
struct bpf_ringbuf *rb;
172175

@@ -183,17 +186,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
183186
rb->consumer_pos = 0;
184187
rb->producer_pos = 0;
185188
rb->pending_pos = 0;
189+
rb->overwrite_mode = overwrite_mode;
186190

187191
return rb;
188192
}
189193

190194
static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
191195
{
196+
int overwrite_mode = 0;
192197
struct bpf_ringbuf_map *rb_map;
193198

194199
if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
195200
return ERR_PTR(-EINVAL);
196201

202+
if (attr->map_flags & BPF_F_OVERWRITE) {
203+
if (attr->map_type == BPF_MAP_TYPE_USER_RINGBUF)
204+
return ERR_PTR(-EINVAL);
205+
overwrite_mode = 1;
206+
}
207+
197208
if (attr->key_size || attr->value_size ||
198209
!is_power_of_2(attr->max_entries) ||
199210
!PAGE_ALIGNED(attr->max_entries))
@@ -205,7 +216,8 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
205216

206217
bpf_map_init_from_attr(&rb_map->map, attr);
207218

208-
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
219+
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node,
220+
overwrite_mode);
209221
if (!rb_map->rb) {
210222
bpf_map_area_free(rb_map);
211223
return ERR_PTR(-ENOMEM);
@@ -295,11 +307,16 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
295307

296308
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
297309
{
298-
unsigned long cons_pos, prod_pos;
310+
unsigned long cons_pos, prod_pos, over_pos;
299311

300312
cons_pos = smp_load_acquire(&rb->consumer_pos);
301313
prod_pos = smp_load_acquire(&rb->producer_pos);
302-
return prod_pos - cons_pos;
314+
315+
if (likely(!rb->overwrite_mode))
316+
return prod_pos - cons_pos;
317+
318+
over_pos = READ_ONCE(rb->overwrite_pos);
319+
return min(prod_pos - max(cons_pos, over_pos), rb->mask + 1);
303320
}
304321

305322
static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
@@ -402,11 +419,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
402419
return (void*)((addr & PAGE_MASK) - off);
403420
}
404421

422+
423+
static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb,
424+
unsigned long new_prod_pos,
425+
unsigned long cons_pos,
426+
unsigned long pend_pos)
427+
{
428+
/* no space if oldest not yet committed record until the newest
429+
* record span more than (ringbuf_size - 1)
430+
*/
431+
if (new_prod_pos - pend_pos > rb->mask)
432+
return false;
433+
434+
/* ok, we have space in ovewrite mode */
435+
if (unlikely(rb->overwrite_mode))
436+
return true;
437+
438+
/* no space if producer position advances more than (ringbuf_size - 1)
439+
* ahead than consumer position when not in overwrite mode
440+
*/
441+
if (new_prod_pos - cons_pos > rb->mask)
442+
return false;
443+
444+
return true;
445+
}
446+
447+
static u32 ringbuf_round_up_hdr_len(u32 hdr_len)
448+
{
449+
hdr_len &= ~BPF_RINGBUF_DISCARD_BIT;
450+
return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8);
451+
}
452+
405453
static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
406454
{
407-
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, flags;
455+
unsigned long flags;
408456
struct bpf_ringbuf_hdr *hdr;
409-
u32 len, pg_off, tmp_size, hdr_len;
457+
u32 len, pg_off, hdr_len;
458+
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos;
410459

411460
if (unlikely(size > RINGBUF_MAX_RECORD_SZ))
412461
return NULL;
@@ -429,24 +478,39 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
429478
hdr_len = READ_ONCE(hdr->len);
430479
if (hdr_len & BPF_RINGBUF_BUSY_BIT)
431480
break;
432-
tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT;
433-
tmp_size = round_up(tmp_size + BPF_RINGBUF_HDR_SZ, 8);
434-
pend_pos += tmp_size;
481+
pend_pos += ringbuf_round_up_hdr_len(hdr_len);
435482
}
436483
rb->pending_pos = pend_pos;
437484

438-
/* check for out of ringbuf space:
439-
* - by ensuring producer position doesn't advance more than
440-
* (ringbuf_size - 1) ahead
441-
* - by ensuring oldest not yet committed record until newest
442-
* record does not span more than (ringbuf_size - 1)
443-
*/
444-
if (new_prod_pos - cons_pos > rb->mask ||
445-
new_prod_pos - pend_pos > rb->mask) {
485+
if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) {
446486
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
447487
return NULL;
448488
}
449489

490+
/* In overwrite mode, move overwrite_pos to the next record to be
491+
* overwritten if the ring buffer is full
492+
*/
493+
if (unlikely(rb->overwrite_mode)) {
494+
over_pos = rb->overwrite_pos;
495+
while (new_prod_pos - over_pos > rb->mask) {
496+
hdr = (void *)rb->data + (over_pos & rb->mask);
497+
hdr_len = READ_ONCE(hdr->len);
498+
/* since pending_pos is the first record with BUSY
499+
* bit set and overwrite_pos is never bigger than
500+
* pending_pos, no need to check BUSY bit here.
501+
*/
502+
over_pos += ringbuf_round_up_hdr_len(hdr_len);
503+
}
504+
/* smp_store_release(&rb->producer_pos, new_prod_pos) at
505+
* the end of the function ensures that when consumer sees
506+
* the updated rb->producer_pos, it always sees the updated
507+
* rb->overwrite_pos, so when consumer reads overwrite_pos
508+
* after smp_load_acquire(r->producer_pos), the overwrite_pos
509+
* will always be valid.
510+
*/
511+
WRITE_ONCE(rb->overwrite_pos, over_pos);
512+
}
513+
450514
hdr = (void *)rb->data + (prod_pos & rb->mask);
451515
pg_off = bpf_ringbuf_rec_pg_off(rb, hdr);
452516
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
@@ -479,7 +543,50 @@ const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
479543
.arg3_type = ARG_ANYTHING,
480544
};
481545

482-
static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
546+
static __always_inline
547+
bool ringbuf_should_wakeup(const struct bpf_ringbuf *rb,
548+
unsigned long rec_pos,
549+
unsigned long cons_pos,
550+
u32 len, u64 flags)
551+
{
552+
unsigned long rec_end;
553+
554+
if (flags & BPF_RB_FORCE_WAKEUP)
555+
return true;
556+
557+
if (flags & BPF_RB_NO_WAKEUP)
558+
return false;
559+
560+
/* for non-overwrite mode, if consumer caught up and is waiting for
561+
* our record, notify about new data availability
562+
*/
563+
if (likely(!rb->overwrite_mode))
564+
return cons_pos == rec_pos;
565+
566+
/* for overwrite mode, to give the consumer a chance to catch up
567+
* before being overwritten, wake up consumer every half a round
568+
* ahead.
569+
*/
570+
rec_end = rec_pos + ringbuf_round_up_hdr_len(len);
571+
572+
cons_pos &= (rb->mask >> 1);
573+
rec_pos &= (rb->mask >> 1);
574+
rec_end &= (rb->mask >> 1);
575+
576+
if (cons_pos == rec_pos)
577+
return true;
578+
579+
if (rec_pos < cons_pos && cons_pos < rec_end)
580+
return true;
581+
582+
if (rec_end < rec_pos && (cons_pos > rec_pos || cons_pos < rec_end))
583+
return true;
584+
585+
return false;
586+
}
587+
588+
static __always_inline
589+
void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
483590
{
484591
unsigned long rec_pos, cons_pos;
485592
struct bpf_ringbuf_hdr *hdr;
@@ -495,15 +602,10 @@ static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
495602
/* update record header with correct final size prefix */
496603
xchg(&hdr->len, new_len);
497604

498-
/* if consumer caught up and is waiting for our record, notify about
499-
* new data availability
500-
*/
501605
rec_pos = (void *)hdr - (void *)rb->data;
502606
cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;
503607

504-
if (flags & BPF_RB_FORCE_WAKEUP)
505-
irq_work_queue(&rb->work);
506-
else if (cons_pos == rec_pos && !(flags & BPF_RB_NO_WAKEUP))
608+
if (ringbuf_should_wakeup(rb, rec_pos, cons_pos, new_len, flags))
507609
irq_work_queue(&rb->work);
508610
}
509611

@@ -576,6 +678,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
576678
return smp_load_acquire(&rb->consumer_pos);
577679
case BPF_RB_PROD_POS:
578680
return smp_load_acquire(&rb->producer_pos);
681+
case BPF_RB_OVER_POS:
682+
return READ_ONCE(rb->overwrite_pos);
579683
default:
580684
return 0;
581685
}
@@ -749,6 +853,9 @@ BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
749853

750854
rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
751855

856+
if (unlikely(rb->overwrite_mode))
857+
return -EOPNOTSUPP;
858+
752859
/* If another consumer is already consuming a sample, wait for them to finish. */
753860
if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
754861
return -EBUSY;

tools/include/uapi/linux/bpf.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,6 +1430,9 @@ enum {
14301430

14311431
/* Do not translate kernel bpf_arena pointers to user pointers */
14321432
BPF_F_NO_USER_CONV = (1U << 18),
1433+
1434+
/* bpf ringbuf works in overwrite mode? */
1435+
BPF_F_OVERWRITE = (1U << 19),
14331436
};
14341437

14351438
/* Flags for BPF_PROG_QUERY. */
@@ -6215,6 +6218,7 @@ enum {
62156218
BPF_RB_RING_SIZE = 1,
62166219
BPF_RB_CONS_POS = 2,
62176220
BPF_RB_PROD_POS = 3,
6221+
BPF_RB_OVER_POS = 4,
62186222
};
62196223

62206224
/* BPF ring buffer constants */

0 commit comments

Comments
 (0)