Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,9 @@ enum {

/* Do not translate kernel bpf_arena pointers to user pointers */
BPF_F_NO_USER_CONV = (1U << 18),

/* bpf ringbuf works in overwrite mode? */
BPF_F_OVERWRITE = (1U << 19),
};

/* Flags for BPF_PROG_QUERY. */
Expand Down Expand Up @@ -6215,6 +6218,7 @@ enum {
BPF_RB_RING_SIZE = 1,
BPF_RB_CONS_POS = 2,
BPF_RB_PROD_POS = 3,
BPF_RB_OVER_POS = 4,
};

/* BPF ring buffer constants */
Expand Down
159 changes: 133 additions & 26 deletions kernel/bpf/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <linux/btf_ids.h>
#include <asm/rqspinlock.h>

#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_OVERWRITE)

/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
#define RINGBUF_PGOFF \
Expand All @@ -27,7 +27,8 @@
struct bpf_ringbuf {
wait_queue_head_t waitq;
struct irq_work work;
u64 mask;
u64 mask:48;
u64 overwrite_mode:1;
struct page **pages;
int nr_pages;
rqspinlock_t spinlock ____cacheline_aligned_in_smp;
Expand Down Expand Up @@ -72,6 +73,7 @@ struct bpf_ringbuf {
*/
unsigned long consumer_pos __aligned(PAGE_SIZE);
unsigned long producer_pos __aligned(PAGE_SIZE);
unsigned long overwrite_pos; /* to be overwritten in overwrite mode */
unsigned long pending_pos;
char data[] __aligned(PAGE_SIZE);
};
Expand Down Expand Up @@ -166,7 +168,8 @@ static void bpf_ringbuf_notify(struct irq_work *work)
* considering that the maximum value of data_sz is (4GB - 1), there
* will be no overflow, so just note the size limit in the comments.
*/
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node,
int overwrite_mode)
{
struct bpf_ringbuf *rb;

Expand All @@ -183,17 +186,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
rb->consumer_pos = 0;
rb->producer_pos = 0;
rb->pending_pos = 0;
rb->overwrite_mode = overwrite_mode;

return rb;
}

static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
{
int overwrite_mode = 0;
struct bpf_ringbuf_map *rb_map;

if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
return ERR_PTR(-EINVAL);

if (attr->map_flags & BPF_F_OVERWRITE) {
if (attr->map_type == BPF_MAP_TYPE_USER_RINGBUF)
return ERR_PTR(-EINVAL);
overwrite_mode = 1;
}

if (attr->key_size || attr->value_size ||
!is_power_of_2(attr->max_entries) ||
!PAGE_ALIGNED(attr->max_entries))
Expand All @@ -205,7 +216,8 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)

bpf_map_init_from_attr(&rb_map->map, attr);

rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node,
overwrite_mode);
if (!rb_map->rb) {
bpf_map_area_free(rb_map);
return ERR_PTR(-ENOMEM);
Expand Down Expand Up @@ -295,11 +307,16 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma

static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
unsigned long cons_pos, prod_pos;
unsigned long cons_pos, prod_pos, over_pos;

cons_pos = smp_load_acquire(&rb->consumer_pos);
prod_pos = smp_load_acquire(&rb->producer_pos);
return prod_pos - cons_pos;

if (likely(!rb->overwrite_mode))
return prod_pos - cons_pos;

over_pos = READ_ONCE(rb->overwrite_pos);
return min(prod_pos - max(cons_pos, over_pos), rb->mask + 1);
}

static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
Expand Down Expand Up @@ -402,11 +419,43 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
return (void*)((addr & PAGE_MASK) - off);
}


static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb,
unsigned long new_prod_pos,
unsigned long cons_pos,
unsigned long pend_pos)
{
/* no space if oldest not yet committed record until the newest
* record span more than (ringbuf_size - 1)
*/
if (new_prod_pos - pend_pos > rb->mask)
return false;

/* ok, we have space in ovewrite mode */
if (unlikely(rb->overwrite_mode))
return true;

/* no space if producer position advances more than (ringbuf_size - 1)
* ahead than consumer position when not in overwrite mode
*/
if (new_prod_pos - cons_pos > rb->mask)
return false;

return true;
}

static u32 ringbuf_round_up_hdr_len(u32 hdr_len)
{
hdr_len &= ~BPF_RINGBUF_DISCARD_BIT;
return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8);
}

static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
{
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, flags;
unsigned long flags;
struct bpf_ringbuf_hdr *hdr;
u32 len, pg_off, tmp_size, hdr_len;
u32 len, pg_off, hdr_len;
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos;

if (unlikely(size > RINGBUF_MAX_RECORD_SZ))
return NULL;
Expand All @@ -429,24 +478,39 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
hdr_len = READ_ONCE(hdr->len);
if (hdr_len & BPF_RINGBUF_BUSY_BIT)
break;
tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT;
tmp_size = round_up(tmp_size + BPF_RINGBUF_HDR_SZ, 8);
pend_pos += tmp_size;
pend_pos += ringbuf_round_up_hdr_len(hdr_len);
}
rb->pending_pos = pend_pos;

/* check for out of ringbuf space:
* - by ensuring producer position doesn't advance more than
* (ringbuf_size - 1) ahead
* - by ensuring oldest not yet committed record until newest
* record does not span more than (ringbuf_size - 1)
*/
if (new_prod_pos - cons_pos > rb->mask ||
new_prod_pos - pend_pos > rb->mask) {
if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) {
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
return NULL;
}

/* In overwrite mode, move overwrite_pos to the next record to be
* overwritten if the ring buffer is full
*/
if (unlikely(rb->overwrite_mode)) {
over_pos = rb->overwrite_pos;
while (new_prod_pos - over_pos > rb->mask) {
hdr = (void *)rb->data + (over_pos & rb->mask);
hdr_len = READ_ONCE(hdr->len);
/* since pending_pos is the first record with BUSY
* bit set and overwrite_pos is never bigger than
* pending_pos, no need to check BUSY bit here.
*/
over_pos += ringbuf_round_up_hdr_len(hdr_len);
}
/* smp_store_release(&rb->producer_pos, new_prod_pos) at
* the end of the function ensures that when consumer sees
* the updated rb->producer_pos, it always sees the updated
* rb->overwrite_pos, so when consumer reads overwrite_pos
* after smp_load_acquire(r->producer_pos), the overwrite_pos
* will always be valid.
*/
WRITE_ONCE(rb->overwrite_pos, over_pos);
}

hdr = (void *)rb->data + (prod_pos & rb->mask);
pg_off = bpf_ringbuf_rec_pg_off(rb, hdr);
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
Expand Down Expand Up @@ -479,7 +543,50 @@ const struct bpf_func_proto bpf_ringbuf_reserve_proto = {
.arg3_type = ARG_ANYTHING,
};

static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
static __always_inline
bool ringbuf_should_wakeup(const struct bpf_ringbuf *rb,
unsigned long rec_pos,
unsigned long cons_pos,
u32 len, u64 flags)
{
unsigned long rec_end;

if (flags & BPF_RB_FORCE_WAKEUP)
return true;

if (flags & BPF_RB_NO_WAKEUP)
return false;

/* for non-overwrite mode, if consumer caught up and is waiting for
* our record, notify about new data availability
*/
if (likely(!rb->overwrite_mode))
return cons_pos == rec_pos;

/* for overwrite mode, to give the consumer a chance to catch up
* before being overwritten, wake up consumer every half a round
* ahead.
*/
rec_end = rec_pos + ringbuf_round_up_hdr_len(len);

cons_pos &= (rb->mask >> 1);
rec_pos &= (rb->mask >> 1);
rec_end &= (rb->mask >> 1);

if (cons_pos == rec_pos)
return true;

if (rec_pos < cons_pos && cons_pos < rec_end)
return true;

if (rec_end < rec_pos && (cons_pos > rec_pos || cons_pos < rec_end))
return true;

return false;
}

static __always_inline
void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
{
unsigned long rec_pos, cons_pos;
struct bpf_ringbuf_hdr *hdr;
Expand All @@ -495,15 +602,10 @@ static void bpf_ringbuf_commit(void *sample, u64 flags, bool discard)
/* update record header with correct final size prefix */
xchg(&hdr->len, new_len);

/* if consumer caught up and is waiting for our record, notify about
* new data availability
*/
rec_pos = (void *)hdr - (void *)rb->data;
cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;

if (flags & BPF_RB_FORCE_WAKEUP)
irq_work_queue(&rb->work);
else if (cons_pos == rec_pos && !(flags & BPF_RB_NO_WAKEUP))
if (ringbuf_should_wakeup(rb, rec_pos, cons_pos, new_len, flags))
irq_work_queue(&rb->work);
}

Expand Down Expand Up @@ -576,6 +678,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
return smp_load_acquire(&rb->consumer_pos);
case BPF_RB_PROD_POS:
return smp_load_acquire(&rb->producer_pos);
case BPF_RB_OVER_POS:
return READ_ONCE(rb->overwrite_pos);
default:
return 0;
}
Expand Down Expand Up @@ -749,6 +853,9 @@ BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,

rb = container_of(map, struct bpf_ringbuf_map, map)->rb;

if (unlikely(rb->overwrite_mode))
return -EOPNOTSUPP;

/* If another consumer is already consuming a sample, wait for them to finish. */
if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
return -EBUSY;
Expand Down
4 changes: 4 additions & 0 deletions tools/include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,9 @@ enum {

/* Do not translate kernel bpf_arena pointers to user pointers */
BPF_F_NO_USER_CONV = (1U << 18),

/* bpf ringbuf works in overwrite mode? */
BPF_F_OVERWRITE = (1U << 19),
};

/* Flags for BPF_PROG_QUERY. */
Expand Down Expand Up @@ -6215,6 +6218,7 @@ enum {
BPF_RB_RING_SIZE = 1,
BPF_RB_CONS_POS = 2,
BPF_RB_PROD_POS = 3,
BPF_RB_OVER_POS = 4,
};

/* BPF ring buffer constants */
Expand Down
Loading
Loading