Skip to content

Commit 82534b8

Browse files
Xu KuohaiKernel Patches Daemon
authored andcommitted
libbpf: ringbuf: Add overwrite ring buffer process
In overwrite mode, the producer does not wait for the consumer, so the consumer is responsible for handling conflicts. An optimistic method is used to resolve the conflicts: the consumer first reads consumer_pos, producer_pos and overwrite_pos, then calculates a read window and copies data in the window from the ring buffer. After copying, it checks the positions to decide if the data in the copy window have been overwritten by be the producer. If so, it discards the copy and tries again. Once success, the consumer processes the events in the copy. Signed-off-by: Xu Kuohai <[email protected]>
1 parent 6d3df25 commit 82534b8

File tree

1 file changed

+102
-1
lines changed

1 file changed

+102
-1
lines changed

tools/lib/bpf/ringbuf.c

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ struct ring {
2727
ring_buffer_sample_fn sample_cb;
2828
void *ctx;
2929
void *data;
30+
void *read_buffer;
3031
unsigned long *consumer_pos;
3132
unsigned long *producer_pos;
33+
unsigned long *overwrite_pos;
3234
unsigned long mask;
3335
int map_fd;
36+
bool overwrite_mode;
3437
};
3538

3639
struct ring_buffer {
@@ -69,6 +72,9 @@ static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
6972
r->producer_pos = NULL;
7073
}
7174

75+
if (r->read_buffer)
76+
free(r->read_buffer);
77+
7278
free(r);
7379
}
7480

@@ -119,6 +125,14 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
119125
r->sample_cb = sample_cb;
120126
r->ctx = ctx;
121127
r->mask = info.max_entries - 1;
128+
r->overwrite_mode = info.map_flags & BPF_F_OVERWRITE;
129+
if (unlikely(r->overwrite_mode)) {
130+
r->read_buffer = malloc(info.max_entries);
131+
if (!r->read_buffer) {
132+
err = -ENOMEM;
133+
goto err_out;
134+
}
135+
}
122136

123137
/* Map writable consumer page */
124138
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0);
@@ -148,6 +162,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
148162
goto err_out;
149163
}
150164
r->producer_pos = tmp;
165+
r->overwrite_pos = r->producer_pos + 1; /* overwrite_pos is next to producer_pos */
151166
r->data = tmp + rb->page_size;
152167

153168
e = &rb->events[rb->ring_cnt];
@@ -232,7 +247,7 @@ static inline int roundup_len(__u32 len)
232247
return (len + 7) / 8 * 8;
233248
}
234249

235-
static int64_t ringbuf_process_ring(struct ring *r, size_t n)
250+
static int64_t ringbuf_process_normal_ring(struct ring *r, size_t n)
236251
{
237252
int *len_ptr, len, err;
238253
/* 64-bit to avoid overflow in case of extreme application behavior */
@@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n)
278293
return cnt;
279294
}
280295

296+
static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n)
297+
{
298+
299+
int err;
300+
uint32_t *len_ptr, len;
301+
/* 64-bit to avoid overflow in case of extreme application behavior */
302+
int64_t cnt = 0;
303+
size_t size, offset;
304+
unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
305+
bool got_new_data;
306+
void *sample;
307+
bool copied;
308+
309+
size = r->mask + 1;
310+
311+
cons_pos = smp_load_acquire(r->consumer_pos);
312+
do {
313+
got_new_data = false;
314+
315+
/* grab a copy of data */
316+
prod_pos = smp_load_acquire(r->producer_pos);
317+
do {
318+
over_pos = READ_ONCE(*r->overwrite_pos);
319+
/* prod_pos may be outdated now */
320+
if (over_pos < prod_pos) {
321+
tmp_pos = max(cons_pos, over_pos);
322+
/* smp_load_acquire(r->producer_pos) before
323+
* READ_ONCE(*r->overwrite_pos) ensures that
324+
* over_pos + r->mask < prod_pos never occurs,
325+
* so size is never larger than r->mask
326+
*/
327+
size = prod_pos - tmp_pos;
328+
if (!size)
329+
goto done;
330+
memcpy(r->read_buffer,
331+
r->data + (tmp_pos & r->mask), size);
332+
copied = true;
333+
} else {
334+
copied = false;
335+
}
336+
prod_pos = smp_load_acquire(r->producer_pos);
337+
/* retry if data is overwritten by producer */
338+
} while (!copied || prod_pos - tmp_pos > r->mask);
339+
340+
cons_pos = tmp_pos;
341+
342+
for (offset = 0; offset < size; offset += roundup_len(len)) {
343+
len_ptr = r->read_buffer + (offset & r->mask);
344+
len = *len_ptr;
345+
346+
if (len & BPF_RINGBUF_BUSY_BIT)
347+
goto done;
348+
349+
got_new_data = true;
350+
cons_pos += roundup_len(len);
351+
352+
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
353+
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
354+
err = r->sample_cb(r->ctx, sample, len);
355+
if (err < 0) {
356+
/* update consumer pos and bail out */
357+
smp_store_release(r->consumer_pos,
358+
cons_pos);
359+
return err;
360+
}
361+
cnt++;
362+
}
363+
364+
if (cnt >= n)
365+
goto done;
366+
}
367+
} while (got_new_data);
368+
369+
done:
370+
smp_store_release(r->consumer_pos, cons_pos);
371+
return cnt;
372+
}
373+
374+
static int64_t ringbuf_process_ring(struct ring *r, size_t n)
375+
{
376+
if (likely(!r->overwrite_mode))
377+
return ringbuf_process_normal_ring(r, n);
378+
else
379+
return ringbuf_process_overwrite_ring(r, n);
380+
}
381+
281382
/* Consume available ring buffer(s) data without event polling, up to n
282383
* records.
283384
*

0 commit comments

Comments
 (0)