Skip to content

Commit 785888c

Browse files
committed
ring-buffer: Have rb_iter_head_event() handle concurrent writer
Have the ring_buffer_iter structure have a place to store an event, such that it can not be overwritten by a writer, and load it in such a way via rb_iter_head_event() that it will return NULL and reset the iter to the start of the current page if a writer updated the page. Link: http://lkml.kernel.org/r/[email protected] Signed-off-by: Steven Rostedt (VMware) <[email protected]>
1 parent 28e3fc5 commit 785888c

File tree

1 file changed

+75
-31
lines changed

1 file changed

+75
-31
lines changed

kernel/trace/ring_buffer.c

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -503,11 +503,13 @@ struct trace_buffer {
503503
struct ring_buffer_iter {
504504
struct ring_buffer_per_cpu *cpu_buffer;
505505
unsigned long head;
506+
unsigned long next_event;
506507
struct buffer_page *head_page;
507508
struct buffer_page *cache_reader_page;
508509
unsigned long cache_read;
509510
u64 read_stamp;
510511
u64 page_stamp;
512+
struct ring_buffer_event *event;
511513
};
512514

513515
/**
@@ -1914,15 +1916,59 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
19141916
cpu_buffer->reader_page->read);
19151917
}
19161918

1917-
static __always_inline struct ring_buffer_event *
1918-
rb_iter_head_event(struct ring_buffer_iter *iter)
1919+
static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
19191920
{
1920-
return __rb_page_index(iter->head_page, iter->head);
1921+
return local_read(&bpage->page->commit);
19211922
}
19221923

1923-
static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
1924+
static struct ring_buffer_event *
1925+
rb_iter_head_event(struct ring_buffer_iter *iter)
19241926
{
1925-
return local_read(&bpage->page->commit);
1927+
struct ring_buffer_event *event;
1928+
struct buffer_page *iter_head_page = iter->head_page;
1929+
unsigned long commit;
1930+
unsigned length;
1931+
1932+
/*
1933+
* When the writer goes across pages, it issues a cmpxchg which
1934+
* is a mb(), which will synchronize with the rmb here.
1935+
* (see rb_tail_page_update() and __rb_reserve_next())
1936+
*/
1937+
commit = rb_page_commit(iter_head_page);
1938+
smp_rmb();
1939+
event = __rb_page_index(iter_head_page, iter->head);
1940+
length = rb_event_length(event);
1941+
1942+
/*
1943+
* READ_ONCE() doesn't work on functions and we don't want the
1944+
* compiler doing any crazy optimizations with length.
1945+
*/
1946+
barrier();
1947+
1948+
if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE)
1949+
/* Writer corrupted the read? */
1950+
goto reset;
1951+
1952+
memcpy(iter->event, event, length);
1953+
/*
1954+
* If the page stamp is still the same after this rmb() then the
1955+
* event was safely copied without the writer entering the page.
1956+
*/
1957+
smp_rmb();
1958+
1959+
/* Make sure the page didn't change since we read this */
1960+
if (iter->page_stamp != iter_head_page->page->time_stamp ||
1961+
commit > rb_page_commit(iter_head_page))
1962+
goto reset;
1963+
1964+
iter->next_event = iter->head + length;
1965+
return iter->event;
1966+
reset:
1967+
/* Reset to the beginning */
1968+
iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
1969+
iter->head = 0;
1970+
iter->next_event = 0;
1971+
return NULL;
19261972
}
19271973

19281974
/* Size is determined by what has been committed */
@@ -1962,6 +2008,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
19622008

19632009
iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
19642010
iter->head = 0;
2011+
iter->next_event = 0;
19652012
}
19662013

19672014
/*
@@ -3548,6 +3595,7 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
35483595
/* Iterator usage is expected to have record disabled */
35493596
iter->head_page = cpu_buffer->reader_page;
35503597
iter->head = cpu_buffer->reader_page->read;
3598+
iter->next_event = iter->head;
35513599

35523600
iter->cache_reader_page = iter->head_page;
35533601
iter->cache_read = cpu_buffer->read;
@@ -3625,7 +3673,7 @@ int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
36253673
return 0;
36263674

36273675
/* Still racy, as it may return a false positive, but that's OK */
3628-
return ((iter->head_page == commit_page && iter->head == commit) ||
3676+
return ((iter->head_page == commit_page && iter->head >= commit) ||
36293677
(iter->head_page == reader && commit_page == head_page &&
36303678
head_page->read == commit &&
36313679
iter->head == rb_page_commit(cpu_buffer->reader_page)));
@@ -3853,43 +3901,30 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
38533901
static void rb_advance_iter(struct ring_buffer_iter *iter)
38543902
{
38553903
struct ring_buffer_per_cpu *cpu_buffer;
3856-
struct ring_buffer_event *event;
3857-
unsigned length;
38583904

38593905
cpu_buffer = iter->cpu_buffer;
38603906

3907+
/* If head == next_event then we need to jump to the next event */
3908+
if (iter->head == iter->next_event) {
3909+
/* If the event gets overwritten again, there's nothing to do */
3910+
if (rb_iter_head_event(iter) == NULL)
3911+
return;
3912+
}
3913+
3914+
iter->head = iter->next_event;
3915+
38613916
/*
38623917
* Check if we are at the end of the buffer.
38633918
*/
3864-
if (iter->head >= rb_page_size(iter->head_page)) {
3919+
if (iter->next_event >= rb_page_size(iter->head_page)) {
38653920
/* discarded commits can make the page empty */
38663921
if (iter->head_page == cpu_buffer->commit_page)
38673922
return;
38683923
rb_inc_iter(iter);
38693924
return;
38703925
}
38713926

3872-
event = rb_iter_head_event(iter);
3873-
3874-
length = rb_event_length(event);
3875-
3876-
/*
3877-
* This should not be called to advance the header if we are
3878-
* at the tail of the buffer.
3879-
*/
3880-
if (RB_WARN_ON(cpu_buffer,
3881-
(iter->head_page == cpu_buffer->commit_page) &&
3882-
(iter->head + length > rb_commit_index(cpu_buffer))))
3883-
return;
3884-
3885-
rb_update_iter_read_stamp(iter, event);
3886-
3887-
iter->head += length;
3888-
3889-
/* check for end of page padding */
3890-
if ((iter->head >= rb_page_size(iter->head_page)) &&
3891-
(iter->head_page != cpu_buffer->commit_page))
3892-
rb_inc_iter(iter);
3927+
rb_update_iter_read_stamp(iter, iter->event);
38933928
}
38943929

38953930
static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
@@ -4017,6 +4052,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
40174052
}
40184053

40194054
event = rb_iter_head_event(iter);
4055+
if (!event)
4056+
goto again;
40204057

40214058
switch (event->type_len) {
40224059
case RINGBUF_TYPE_PADDING:
@@ -4233,10 +4270,16 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
42334270
if (!cpumask_test_cpu(cpu, buffer->cpumask))
42344271
return NULL;
42354272

4236-
iter = kmalloc(sizeof(*iter), flags);
4273+
iter = kzalloc(sizeof(*iter), flags);
42374274
if (!iter)
42384275
return NULL;
42394276

4277+
iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags);
4278+
if (!iter->event) {
4279+
kfree(iter);
4280+
return NULL;
4281+
}
4282+
42404283
cpu_buffer = buffer->buffers[cpu];
42414284

42424285
iter->cpu_buffer = cpu_buffer;
@@ -4317,6 +4360,7 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter)
43174360

43184361
atomic_dec(&cpu_buffer->record_disabled);
43194362
atomic_dec(&cpu_buffer->buffer->resize_disabled);
4363+
kfree(iter->event);
43204364
kfree(iter);
43214365
}
43224366
EXPORT_SYMBOL_GPL(ring_buffer_read_finish);

0 commit comments

Comments
 (0)