Skip to content

Commit 63bd30f

Browse files
committed
Merge tag 'trace-ring-buffer-v6.8-rc7-2' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace
Pull tracing updates from Steven Rostedt: - Do not update shortest_full in rb_watermark_hit() if the watermark is hit. The shortest_full field was being updated regardless if the task was going to wait or not. If the watermark is hit, then the task is not going to wait, so do not update the shortest_full field (used by the waker). - Update shortest_full field before setting the full_waiters_pending flag In the poll logic, the full_waiters_pending flag was being set before the shortest_full field was set. If the full_waiters_pending flag is set, writers will check the shortest_full field which has the least percentage of data that the ring buffer needs to be filled before waking up. The writer will check shortest_full if full_waiters_pending is set, and if the ring buffer percentage filled is greater than shortest full, then it will call the irq_work to wake up the waiters. The problem was that the poll logic set the full_waiters_pending flag before updating shortest_full, which when zero will always trigger the writer to call the irq_work to wake up the waiters. The irq_work will reset the shortest_full field back to zero as the woken waiters is suppose to reset it. - There's some optimized logic in the rb_watermark_hit() that is used in ring_buffer_wait(). Use that helper function in the poll logic as well. - Restructure ring_buffer_wait() to use wait_event_interruptible() The logic to wake up pending readers when the file descriptor is closed is racy. Restructure ring_buffer_wait() to allow callers to pass in conditions besides the ring buffer having enough data in it by using wait_event_interruptible(). - Update the tracing_wait_on_pipe() to call ring_buffer_wait() with its own conditions to exit the wait loop. * tag 'trace-ring-buffer-v6.8-rc7-2' of git://git.kernel.org/pub/scm/linux/kernel/git/trace/linux-trace: tracing/ring-buffer: Fix wait_on_pipe() race ring-buffer: Use wait_event_interruptible() in ring_buffer_wait() ring-buffer: Reuse rb_watermark_hit() for the poll logic ring-buffer: Fix full_waiters_pending in poll ring-buffer: Do not set shortest_full when full target is hit
2 parents 0173275 + 2aa043a commit 63bd30f

File tree

4 files changed

+132
-78
lines changed

4 files changed

+132
-78
lines changed

include/linux/ring_buffer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ __ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *k
9898
__ring_buffer_alloc((size), (flags), &__key); \
9999
})
100100

101-
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full);
101+
typedef bool (*ring_buffer_cond_fn)(void *data);
102+
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
103+
ring_buffer_cond_fn cond, void *data);
102104
__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
103105
struct file *filp, poll_table *poll_table, int full);
104106
void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu);

include/linux/trace_events.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,16 @@ struct trace_iterator {
103103
unsigned int temp_size;
104104
char *fmt; /* modified format holder */
105105
unsigned int fmt_size;
106-
long wait_index;
106+
atomic_t wait_index;
107107

108108
/* trace_seq for __print_flags() and __print_symbolic() etc. */
109109
struct trace_seq tmp_seq;
110110

111111
cpumask_var_t started;
112112

113+
/* Set when the file is closed to prevent new waiters */
114+
bool closed;
115+
113116
/* it's true when current open file is snapshot */
114117
bool snapshot;
115118

kernel/trace/ring_buffer.c

Lines changed: 94 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -834,51 +834,24 @@ static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
834834
pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
835835
ret = !pagebusy && full_hit(buffer, cpu, full);
836836

837-
if (!cpu_buffer->shortest_full ||
838-
cpu_buffer->shortest_full > full)
839-
cpu_buffer->shortest_full = full;
837+
if (!ret && (!cpu_buffer->shortest_full ||
838+
cpu_buffer->shortest_full > full)) {
839+
cpu_buffer->shortest_full = full;
840+
}
840841
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
841842
}
842843
return ret;
843844
}
844845

845-
/**
846-
* ring_buffer_wait - wait for input to the ring buffer
847-
* @buffer: buffer to wait on
848-
* @cpu: the cpu buffer to wait on
849-
* @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
850-
*
851-
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
852-
* as data is added to any of the @buffer's cpu buffers. Otherwise
853-
* it will wait for data to be added to a specific cpu buffer.
854-
*/
855-
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
846+
static inline bool
847+
rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
848+
int cpu, int full, ring_buffer_cond_fn cond, void *data)
856849
{
857-
struct ring_buffer_per_cpu *cpu_buffer;
858-
DEFINE_WAIT(wait);
859-
struct rb_irq_work *work;
860-
int ret = 0;
861-
862-
/*
863-
* Depending on what the caller is waiting for, either any
864-
* data in any cpu buffer, or a specific buffer, put the
865-
* caller on the appropriate wait queue.
866-
*/
867-
if (cpu == RING_BUFFER_ALL_CPUS) {
868-
work = &buffer->irq_work;
869-
/* Full only makes sense on per cpu reads */
870-
full = 0;
871-
} else {
872-
if (!cpumask_test_cpu(cpu, buffer->cpumask))
873-
return -ENODEV;
874-
cpu_buffer = buffer->buffers[cpu];
875-
work = &cpu_buffer->irq_work;
876-
}
850+
if (rb_watermark_hit(buffer, cpu, full))
851+
return true;
877852

878-
if (full)
879-
prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
880-
else
881-
prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
853+
if (cond(data))
854+
return true;
882855

883856
/*
884857
* The events can happen in critical sections where
@@ -901,27 +874,78 @@ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
901874
* a task has been queued. It's OK for spurious wake ups.
902875
*/
903876
if (full)
904-
work->full_waiters_pending = true;
877+
rbwork->full_waiters_pending = true;
905878
else
906-
work->waiters_pending = true;
879+
rbwork->waiters_pending = true;
907880

908-
if (rb_watermark_hit(buffer, cpu, full))
909-
goto out;
881+
return false;
882+
}
910883

911-
if (signal_pending(current)) {
912-
ret = -EINTR;
913-
goto out;
884+
/*
885+
* The default wait condition for ring_buffer_wait() is to just to exit the
886+
* wait loop the first time it is woken up.
887+
*/
888+
static bool rb_wait_once(void *data)
889+
{
890+
long *once = data;
891+
892+
/* wait_event() actually calls this twice before scheduling*/
893+
if (*once > 1)
894+
return true;
895+
896+
(*once)++;
897+
return false;
898+
}
899+
900+
/**
901+
* ring_buffer_wait - wait for input to the ring buffer
902+
* @buffer: buffer to wait on
903+
* @cpu: the cpu buffer to wait on
904+
* @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
905+
* @cond: condition function to break out of wait (NULL to run once)
906+
* @data: the data to pass to @cond.
907+
*
908+
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
909+
* as data is added to any of the @buffer's cpu buffers. Otherwise
910+
* it will wait for data to be added to a specific cpu buffer.
911+
*/
912+
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
913+
ring_buffer_cond_fn cond, void *data)
914+
{
915+
struct ring_buffer_per_cpu *cpu_buffer;
916+
struct wait_queue_head *waitq;
917+
struct rb_irq_work *rbwork;
918+
long once = 0;
919+
int ret = 0;
920+
921+
if (!cond) {
922+
cond = rb_wait_once;
923+
data = &once;
924+
}
925+
926+
/*
927+
* Depending on what the caller is waiting for, either any
928+
* data in any cpu buffer, or a specific buffer, put the
929+
* caller on the appropriate wait queue.
930+
*/
931+
if (cpu == RING_BUFFER_ALL_CPUS) {
932+
rbwork = &buffer->irq_work;
933+
/* Full only makes sense on per cpu reads */
934+
full = 0;
935+
} else {
936+
if (!cpumask_test_cpu(cpu, buffer->cpumask))
937+
return -ENODEV;
938+
cpu_buffer = buffer->buffers[cpu];
939+
rbwork = &cpu_buffer->irq_work;
914940
}
915941

916-
schedule();
917-
out:
918942
if (full)
919-
finish_wait(&work->full_waiters, &wait);
943+
waitq = &rbwork->full_waiters;
920944
else
921-
finish_wait(&work->waiters, &wait);
945+
waitq = &rbwork->waiters;
922946

923-
if (!ret && !rb_watermark_hit(buffer, cpu, full) && signal_pending(current))
924-
ret = -EINTR;
947+
ret = wait_event_interruptible((*waitq),
948+
rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
925949

926950
return ret;
927951
}
@@ -959,21 +983,30 @@ __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
959983
}
960984

961985
if (full) {
962-
unsigned long flags;
963-
964986
poll_wait(filp, &rbwork->full_waiters, poll_table);
965987

966-
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
988+
if (rb_watermark_hit(buffer, cpu, full))
989+
return EPOLLIN | EPOLLRDNORM;
990+
/*
991+
* Only allow full_waiters_pending update to be seen after
992+
* the shortest_full is set (in rb_watermark_hit). If the
993+
* writer sees the full_waiters_pending flag set, it will
994+
* compare the amount in the ring buffer to shortest_full.
995+
* If the amount in the ring buffer is greater than the
996+
* shortest_full percent, it will call the irq_work handler
997+
* to wake up this list. The irq_handler will reset shortest_full
998+
* back to zero. That's done under the reader_lock, but
999+
* the below smp_mb() makes sure that the update to
1000+
* full_waiters_pending doesn't leak up into the above.
1001+
*/
1002+
smp_mb();
9671003
rbwork->full_waiters_pending = true;
968-
if (!cpu_buffer->shortest_full ||
969-
cpu_buffer->shortest_full > full)
970-
cpu_buffer->shortest_full = full;
971-
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
972-
} else {
973-
poll_wait(filp, &rbwork->waiters, poll_table);
974-
rbwork->waiters_pending = true;
1004+
return 0;
9751005
}
9761006

1007+
poll_wait(filp, &rbwork->waiters, poll_table);
1008+
rbwork->waiters_pending = true;
1009+
9771010
/*
9781011
* There's a tight race between setting the waiters_pending and
9791012
* checking if the ring buffer is empty. Once the waiters_pending bit
@@ -989,9 +1022,6 @@ __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
9891022
*/
9901023
smp_mb();
9911024

992-
if (full)
993-
return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
994-
9951025
if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
9961026
(cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
9971027
return EPOLLIN | EPOLLRDNORM;

kernel/trace/trace.c

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1955,15 +1955,36 @@ update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu)
19551955

19561956
#endif /* CONFIG_TRACER_MAX_TRACE */
19571957

1958+
struct pipe_wait {
1959+
struct trace_iterator *iter;
1960+
int wait_index;
1961+
};
1962+
1963+
static bool wait_pipe_cond(void *data)
1964+
{
1965+
struct pipe_wait *pwait = data;
1966+
struct trace_iterator *iter = pwait->iter;
1967+
1968+
if (atomic_read_acquire(&iter->wait_index) != pwait->wait_index)
1969+
return true;
1970+
1971+
return iter->closed;
1972+
}
1973+
19581974
static int wait_on_pipe(struct trace_iterator *iter, int full)
19591975
{
1976+
struct pipe_wait pwait;
19601977
int ret;
19611978

19621979
/* Iterators are static, they should be filled or empty */
19631980
if (trace_buffer_iter(iter, iter->cpu_file))
19641981
return 0;
19651982

1966-
ret = ring_buffer_wait(iter->array_buffer->buffer, iter->cpu_file, full);
1983+
pwait.wait_index = atomic_read_acquire(&iter->wait_index);
1984+
pwait.iter = iter;
1985+
1986+
ret = ring_buffer_wait(iter->array_buffer->buffer, iter->cpu_file, full,
1987+
wait_pipe_cond, &pwait);
19671988

19681989
#ifdef CONFIG_TRACER_MAX_TRACE
19691990
/*
@@ -8397,9 +8418,9 @@ static int tracing_buffers_flush(struct file *file, fl_owner_t id)
83978418
struct ftrace_buffer_info *info = file->private_data;
83988419
struct trace_iterator *iter = &info->iter;
83998420

8400-
iter->wait_index++;
8421+
iter->closed = true;
84018422
/* Make sure the waiters see the new wait_index */
8402-
smp_wmb();
8423+
(void)atomic_fetch_inc_release(&iter->wait_index);
84038424

84048425
ring_buffer_wake_waiters(iter->array_buffer->buffer, iter->cpu_file);
84058426

@@ -8499,6 +8520,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
84998520
.spd_release = buffer_spd_release,
85008521
};
85018522
struct buffer_ref *ref;
8523+
bool woken = false;
85028524
int page_size;
85038525
int entries, i;
85048526
ssize_t ret = 0;
@@ -8572,17 +8594,17 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
85728594

85738595
/* did we read anything? */
85748596
if (!spd.nr_pages) {
8575-
long wait_index;
85768597

85778598
if (ret)
85788599
goto out;
85798600

8601+
if (woken)
8602+
goto out;
8603+
85808604
ret = -EAGAIN;
85818605
if ((file->f_flags & O_NONBLOCK) || (flags & SPLICE_F_NONBLOCK))
85828606
goto out;
85838607

8584-
wait_index = READ_ONCE(iter->wait_index);
8585-
85868608
ret = wait_on_pipe(iter, iter->snapshot ? 0 : iter->tr->buffer_percent);
85878609
if (ret)
85888610
goto out;
@@ -8591,10 +8613,8 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
85918613
if (!tracer_tracing_is_on(iter->tr))
85928614
goto out;
85938615

8594-
/* Make sure we see the new wait_index */
8595-
smp_rmb();
8596-
if (wait_index != iter->wait_index)
8597-
goto out;
8616+
/* Iterate one more time to collect any new data then exit */
8617+
woken = true;
85988618

85998619
goto again;
86008620
}
@@ -8617,9 +8637,8 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
86178637

86188638
mutex_lock(&trace_types_lock);
86198639

8620-
iter->wait_index++;
86218640
/* Make sure the waiters see the new wait_index */
8622-
smp_wmb();
8641+
(void)atomic_fetch_inc_release(&iter->wait_index);
86238642

86248643
ring_buffer_wake_waiters(iter->array_buffer->buffer, iter->cpu_file);
86258644

0 commit comments

Comments
 (0)