Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,9 @@ enum {

/* Do not translate kernel bpf_arena pointers to user pointers */
BPF_F_NO_USER_CONV = (1U << 18),

/* Enable BPF ringbuf overwrite mode */
BPF_F_RB_OVERWRITE = (1U << 19),
};

/* Flags for BPF_PROG_QUERY. */
Expand Down Expand Up @@ -6231,6 +6234,7 @@ enum {
BPF_RB_RING_SIZE = 1,
BPF_RB_CONS_POS = 2,
BPF_RB_PROD_POS = 3,
BPF_RB_OVERWRITE_POS = 4,
};

/* BPF ring buffer constants */
Expand Down
109 changes: 90 additions & 19 deletions kernel/bpf/ringbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <linux/btf_ids.h>
#include <asm/rqspinlock.h>

#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
#define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE | BPF_F_RB_OVERWRITE)

/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
#define RINGBUF_PGOFF \
Expand All @@ -30,6 +30,7 @@ struct bpf_ringbuf {
u64 mask;
struct page **pages;
int nr_pages;
bool overwrite_mode;
rqspinlock_t spinlock ____cacheline_aligned_in_smp;
/* For user-space producer ring buffers, an atomic_t busy bit is used
* to synchronize access to the ring buffers in the kernel, rather than
Expand Down Expand Up @@ -72,6 +73,8 @@ struct bpf_ringbuf {
*/
unsigned long consumer_pos __aligned(PAGE_SIZE);
unsigned long producer_pos __aligned(PAGE_SIZE);
/* points to the record right after the last overwritten one */
unsigned long overwrite_pos;
unsigned long pending_pos;
char data[] __aligned(PAGE_SIZE);
};
Expand Down Expand Up @@ -166,7 +169,7 @@ static void bpf_ringbuf_notify(struct irq_work *work)
* considering that the maximum value of data_sz is (4GB - 1), there
* will be no overflow, so just note the size limit in the comments.
*/
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node, bool overwrite_mode)
{
struct bpf_ringbuf *rb;

Expand All @@ -183,17 +186,25 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
rb->consumer_pos = 0;
rb->producer_pos = 0;
rb->pending_pos = 0;
rb->overwrite_mode = overwrite_mode;

return rb;
}

static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
{
bool overwrite_mode = false;
struct bpf_ringbuf_map *rb_map;

if (attr->map_flags & ~RINGBUF_CREATE_FLAG_MASK)
return ERR_PTR(-EINVAL);

if (attr->map_flags & BPF_F_RB_OVERWRITE) {
if (attr->map_type == BPF_MAP_TYPE_USER_RINGBUF)
return ERR_PTR(-EINVAL);
overwrite_mode = true;
}

if (attr->key_size || attr->value_size ||
!is_power_of_2(attr->max_entries) ||
!PAGE_ALIGNED(attr->max_entries))
Expand All @@ -205,7 +216,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)

bpf_map_init_from_attr(&rb_map->map, attr);

rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node, overwrite_mode);
if (!rb_map->rb) {
bpf_map_area_free(rb_map);
return ERR_PTR(-ENOMEM);
Expand Down Expand Up @@ -293,13 +304,25 @@ static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma
return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
}

/* Return an estimate of the available data in the ring buffer.
* Note: the returned value can exceed the actual ring buffer size because the
* function is not synchronized with the producer. The producer acquires the
* ring buffer's spinlock, but this function does not.
*/
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
unsigned long cons_pos, prod_pos;
unsigned long cons_pos, prod_pos, over_pos;

cons_pos = smp_load_acquire(&rb->consumer_pos);
prod_pos = smp_load_acquire(&rb->producer_pos);
return prod_pos - cons_pos;

if (unlikely(rb->overwrite_mode)) {
over_pos = smp_load_acquire(&rb->overwrite_pos);
prod_pos = smp_load_acquire(&rb->producer_pos);
return prod_pos - max(cons_pos, over_pos);
} else {
prod_pos = smp_load_acquire(&rb->producer_pos);
return prod_pos - cons_pos;
}
}

static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
Expand Down Expand Up @@ -402,11 +425,41 @@ bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
return (void*)((addr & PAGE_MASK) - off);
}

static bool bpf_ringbuf_has_space(const struct bpf_ringbuf *rb,
unsigned long new_prod_pos,
unsigned long cons_pos,
unsigned long pend_pos)
{
/* no space if oldest not yet committed record until the newest
* record span more than (ringbuf_size - 1).
*/
if (new_prod_pos - pend_pos > rb->mask)
return false;

/* ok, we have space in overwrite mode */
if (unlikely(rb->overwrite_mode))
return true;

/* no space if producer position advances more than (ringbuf_size - 1)
* ahead of consumer position when not in overwrite mode.
*/
if (new_prod_pos - cons_pos > rb->mask)
return false;

return true;
}

static u32 bpf_ringbuf_round_up_hdr_len(u32 hdr_len)
{
hdr_len &= ~BPF_RINGBUF_DISCARD_BIT;
return round_up(hdr_len + BPF_RINGBUF_HDR_SZ, 8);
}

static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
{
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, flags;
unsigned long cons_pos, prod_pos, new_prod_pos, pend_pos, over_pos, flags;
struct bpf_ringbuf_hdr *hdr;
u32 len, pg_off, tmp_size, hdr_len;
u32 len, pg_off, hdr_len;

if (unlikely(size > RINGBUF_MAX_RECORD_SZ))
return NULL;
Expand All @@ -429,24 +482,40 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
hdr_len = READ_ONCE(hdr->len);
if (hdr_len & BPF_RINGBUF_BUSY_BIT)
break;
tmp_size = hdr_len & ~BPF_RINGBUF_DISCARD_BIT;
tmp_size = round_up(tmp_size + BPF_RINGBUF_HDR_SZ, 8);
pend_pos += tmp_size;
pend_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
}
rb->pending_pos = pend_pos;

/* check for out of ringbuf space:
* - by ensuring producer position doesn't advance more than
* (ringbuf_size - 1) ahead
* - by ensuring oldest not yet committed record until newest
* record does not span more than (ringbuf_size - 1)
*/
if (new_prod_pos - cons_pos > rb->mask ||
new_prod_pos - pend_pos > rb->mask) {
if (!bpf_ringbuf_has_space(rb, new_prod_pos, cons_pos, pend_pos)) {
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags);
return NULL;
}

/* In overwrite mode, advance overwrite_pos when the ring buffer is full.
* The key points are to stay on record boundaries and consume enough records
* to fit the new one.
*/
if (unlikely(rb->overwrite_mode)) {
over_pos = rb->overwrite_pos;
while (new_prod_pos - over_pos > rb->mask) {
hdr = (void *)rb->data + (over_pos & rb->mask);
hdr_len = READ_ONCE(hdr->len);
/* The bpf_ringbuf_has_space() check above ensures we won’t
* step over a record currently being worked on by another
* producer.
*/
over_pos += bpf_ringbuf_round_up_hdr_len(hdr_len);
}
/* smp_store_release(&rb->producer_pos, new_prod_pos) at
* the end of the function ensures that when consumer sees
* the updated rb->producer_pos, it always sees the updated
* rb->overwrite_pos, so when consumer reads overwrite_pos
* after smp_load_acquire(r->producer_pos), the overwrite_pos
* will always be valid.
*/
WRITE_ONCE(rb->overwrite_pos, over_pos);
}

hdr = (void *)rb->data + (prod_pos & rb->mask);
pg_off = bpf_ringbuf_rec_pg_off(rb, hdr);
hdr->len = size | BPF_RINGBUF_BUSY_BIT;
Expand Down Expand Up @@ -576,6 +645,8 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
return smp_load_acquire(&rb->consumer_pos);
case BPF_RB_PROD_POS:
return smp_load_acquire(&rb->producer_pos);
case BPF_RB_OVERWRITE_POS:
return smp_load_acquire(&rb->overwrite_pos);
default:
return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions tools/include/uapi/linux/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,9 @@ enum {

/* Do not translate kernel bpf_arena pointers to user pointers */
BPF_F_NO_USER_CONV = (1U << 18),

/* Enable BPF ringbuf overwrite mode */
BPF_F_RB_OVERWRITE = (1U << 19),
};

/* Flags for BPF_PROG_QUERY. */
Expand Down Expand Up @@ -6231,6 +6234,7 @@ enum {
BPF_RB_RING_SIZE = 1,
BPF_RB_CONS_POS = 2,
BPF_RB_PROD_POS = 3,
BPF_RB_OVERWRITE_POS = 4,
};

/* BPF ring buffer constants */
Expand Down
3 changes: 2 additions & 1 deletion tools/testing/selftests/bpf/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ LINKED_SKELS := test_static_linked.skel.h linked_funcs.skel.h \

LSKELS := fexit_sleep.c trace_printk.c trace_vprintk.c map_ptr_kern.c \
core_kern.c core_kern_overflow.c test_ringbuf.c \
test_ringbuf_n.c test_ringbuf_map_key.c test_ringbuf_write.c
test_ringbuf_n.c test_ringbuf_map_key.c test_ringbuf_write.c \
test_ringbuf_overwrite.c

LSKELS_SIGNED := fentry_test.c fexit_test.c atomics.c

Expand Down
66 changes: 60 additions & 6 deletions tools/testing/selftests/bpf/benchs/bench_ringbufs.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ static struct {
int ringbuf_sz; /* per-ringbuf, in bytes */
bool ringbuf_use_output; /* use slower output API */
int perfbuf_sz; /* per-CPU size, in pages */
bool overwrite;
bool bench_producer;
} args = {
.back2back = false,
.batch_cnt = 500,
Expand All @@ -27,6 +29,8 @@ static struct {
.ringbuf_sz = 512 * 1024,
.ringbuf_use_output = false,
.perfbuf_sz = 128,
.overwrite = false,
.bench_producer = false,
};

enum {
Expand All @@ -35,6 +39,8 @@ enum {
ARG_RB_BATCH_CNT = 2002,
ARG_RB_SAMPLED = 2003,
ARG_RB_SAMPLE_RATE = 2004,
ARG_RB_OVERWRITE = 2005,
ARG_RB_BENCH_PRODUCER = 2006,
};

static const struct argp_option opts[] = {
Expand All @@ -43,6 +49,8 @@ static const struct argp_option opts[] = {
{ "rb-batch-cnt", ARG_RB_BATCH_CNT, "CNT", 0, "Set BPF-side record batch count"},
{ "rb-sampled", ARG_RB_SAMPLED, NULL, 0, "Notification sampling"},
{ "rb-sample-rate", ARG_RB_SAMPLE_RATE, "RATE", 0, "Notification sample rate"},
{ "rb-overwrite", ARG_RB_OVERWRITE, NULL, 0, "Overwrite mode"},
{ "rb-bench-producer", ARG_RB_BENCH_PRODUCER, NULL, 0, "Benchmark producer"},
{},
};

Expand Down Expand Up @@ -72,6 +80,12 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
argp_usage(state);
}
break;
case ARG_RB_OVERWRITE:
args.overwrite = true;
break;
case ARG_RB_BENCH_PRODUCER:
args.bench_producer = true;
break;
default:
return ARGP_ERR_UNKNOWN;
}
Expand All @@ -95,8 +109,33 @@ static inline void bufs_trigger_batch(void)

static void bufs_validate(void)
{
if (env.consumer_cnt != 1) {
fprintf(stderr, "rb-libbpf benchmark needs one consumer!\n");
if (args.bench_producer && strcmp(env.bench_name, "rb-libbpf")) {
fprintf(stderr, "--rb-bench-producer only works with rb-libbpf!\n");
exit(1);
}

if (args.overwrite && !args.bench_producer) {
fprintf(stderr, "overwrite mode only works with --rb-bench-producer for now!\n");
exit(1);
}

if (args.bench_producer && env.consumer_cnt != 0) {
fprintf(stderr, "no consumer is needed for --rb-bench-producer!\n");
exit(1);
}

if (args.bench_producer && args.back2back) {
fprintf(stderr, "back-to-back mode makes no sense for --rb-bench-producer!\n");
exit(1);
}

if (args.bench_producer && args.sampled) {
fprintf(stderr, "sampling mode makes no sense for --rb-bench-producer!\n");
exit(1);
}

if (!args.bench_producer && env.consumer_cnt != 1) {
fprintf(stderr, "benchmarks without --rb-bench-producer require exactly one consumer!\n");
exit(1);
}

Expand Down Expand Up @@ -128,12 +167,17 @@ static void ringbuf_libbpf_measure(struct bench_res *res)
{
struct ringbuf_libbpf_ctx *ctx = &ringbuf_libbpf_ctx;

res->hits = atomic_swap(&buf_hits.value, 0);
if (args.bench_producer)
res->hits = atomic_swap(&ctx->skel->bss->hits, 0);
else
res->hits = atomic_swap(&buf_hits.value, 0);
res->drops = atomic_swap(&ctx->skel->bss->dropped, 0);
}

static struct ringbuf_bench *ringbuf_setup_skeleton(void)
{
__u32 flags;
struct bpf_map *ringbuf;
struct ringbuf_bench *skel;

setup_libbpf();
Expand All @@ -146,12 +190,19 @@ static struct ringbuf_bench *ringbuf_setup_skeleton(void)

skel->rodata->batch_cnt = args.batch_cnt;
skel->rodata->use_output = args.ringbuf_use_output ? 1 : 0;
skel->rodata->bench_producer = args.bench_producer;

if (args.sampled)
/* record data + header take 16 bytes */
skel->rodata->wakeup_data_size = args.sample_rate * 16;

bpf_map__set_max_entries(skel->maps.ringbuf, args.ringbuf_sz);
ringbuf = skel->maps.ringbuf;
if (args.overwrite) {
flags = bpf_map__map_flags(ringbuf) | BPF_F_RB_OVERWRITE;
bpf_map__set_map_flags(ringbuf, flags);
}

bpf_map__set_max_entries(ringbuf, args.ringbuf_sz);

if (ringbuf_bench__load(skel)) {
fprintf(stderr, "failed to load skeleton\n");
Expand All @@ -171,10 +222,13 @@ static void ringbuf_libbpf_setup(void)
{
struct ringbuf_libbpf_ctx *ctx = &ringbuf_libbpf_ctx;
struct bpf_link *link;
int map_fd;

ctx->skel = ringbuf_setup_skeleton();
ctx->ringbuf = ring_buffer__new(bpf_map__fd(ctx->skel->maps.ringbuf),
buf_process_sample, NULL, NULL);

map_fd = bpf_map__fd(ctx->skel->maps.ringbuf);
ctx->ringbuf = ring_buffer__new(map_fd, buf_process_sample,
NULL, NULL);
if (!ctx->ringbuf) {
fprintf(stderr, "failed to create ringbuf\n");
exit(1);
Expand Down
4 changes: 4 additions & 0 deletions tools/testing/selftests/bpf/benchs/run_bench_ringbufs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ for b in 1 2 3 4 8 12 16 20 24 28 32 36 40 44 48 52; do
summarize "rb-libbpf nr_prod $b" "$($RUN_RB_BENCH -p$b --rb-batch-cnt 50 rb-libbpf)"
done

header "Ringbuf, multi-producer contention in overwrite mode, no consumer"
for b in 1 2 3 4 8 12 16 20 24 28 32 36 40 44 48 52; do
summarize "rb-prod nr_prod $b" "$($RUN_BENCH -p$b --rb-batch-cnt 50 --rb-overwrite --rb-bench-producer rb-libbpf)"
done
Loading
Loading