Skip to content

Commit 9959333

Browse files
committed
Merge branch 'pipe-exclusive-wakeup'
Merge thundering herd avoidance on pipe IO. This would have been applied for 5.5 already, but got delayed because of a user-space race condition in the GNU make jobserver code. Now that there's a new GNU make 4.3 release, and most distributions seem to have at least applied the (almost three year old) fix for the problem, let's see if people notice. And it might have been just bad random timing luck on my machine. If you do hit the race condition, things will still work, but the symptom is that you don't get nearly the expected parallelism when using "make -j<N>". The jobserver bug can definitely happen without this patch too, but seems to be easier to trigger when we no longer wake up pipe waiters unnecessarily. * pipe-exclusive-wakeup: pipe: use exclusive waits when reading or writing
2 parents f757165 + 0ddad21 commit 9959333

File tree

4 files changed

+51
-30
lines changed

4 files changed

+51
-30
lines changed

fs/coredump.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,15 +517,15 @@ static void wait_for_dump_helpers(struct file *file)
517517
pipe_lock(pipe);
518518
pipe->readers++;
519519
pipe->writers--;
520-
wake_up_interruptible_sync(&pipe->wait);
520+
wake_up_interruptible_sync(&pipe->rd_wait);
521521
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
522522
pipe_unlock(pipe);
523523

524524
/*
525525
* We actually want wait_event_freezable() but then we need
526526
* to clear TIF_SIGPENDING and improve dump_interrupted().
527527
*/
528-
wait_event_interruptible(pipe->wait, pipe->readers == 1);
528+
wait_event_interruptible(pipe->rd_wait, pipe->readers == 1);
529529

530530
pipe_lock(pipe);
531531
pipe->readers--;

fs/pipe.c

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,19 @@ void pipe_double_lock(struct pipe_inode_info *pipe1,
108108
/* Drop the inode semaphore and wait for a pipe event, atomically */
109109
void pipe_wait(struct pipe_inode_info *pipe)
110110
{
111-
DEFINE_WAIT(wait);
111+
DEFINE_WAIT(rdwait);
112+
DEFINE_WAIT(wrwait);
112113

113114
/*
114115
* Pipes are system-local resources, so sleeping on them
115116
* is considered a noninteractive wait:
116117
*/
117-
prepare_to_wait(&pipe->wait, &wait, TASK_INTERRUPTIBLE);
118+
prepare_to_wait(&pipe->rd_wait, &rdwait, TASK_INTERRUPTIBLE);
119+
prepare_to_wait(&pipe->wr_wait, &wrwait, TASK_INTERRUPTIBLE);
118120
pipe_unlock(pipe);
119121
schedule();
120-
finish_wait(&pipe->wait, &wait);
122+
finish_wait(&pipe->rd_wait, &rdwait);
123+
finish_wait(&pipe->wr_wait, &wrwait);
121124
pipe_lock(pipe);
122125
}
123126

@@ -286,7 +289,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
286289
size_t total_len = iov_iter_count(to);
287290
struct file *filp = iocb->ki_filp;
288291
struct pipe_inode_info *pipe = filp->private_data;
289-
bool was_full;
292+
bool was_full, wake_next_reader = false;
290293
ssize_t ret;
291294

292295
/* Null read succeeds. */
@@ -344,10 +347,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
344347

345348
if (!buf->len) {
346349
pipe_buf_release(pipe, buf);
347-
spin_lock_irq(&pipe->wait.lock);
350+
spin_lock_irq(&pipe->rd_wait.lock);
348351
tail++;
349352
pipe->tail = tail;
350-
spin_unlock_irq(&pipe->wait.lock);
353+
spin_unlock_irq(&pipe->rd_wait.lock);
351354
}
352355
total_len -= chars;
353356
if (!total_len)
@@ -384,7 +387,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
384387
* no data.
385388
*/
386389
if (unlikely(was_full)) {
387-
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
390+
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
388391
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
389392
}
390393

@@ -394,18 +397,23 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
394397
* since we've done any required wakeups and there's no need
395398
* to mark anything accessed. And we've dropped the lock.
396399
*/
397-
if (wait_event_interruptible(pipe->wait, pipe_readable(pipe)) < 0)
400+
if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
398401
return -ERESTARTSYS;
399402

400403
__pipe_lock(pipe);
401404
was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
405+
wake_next_reader = true;
402406
}
407+
if (pipe_empty(pipe->head, pipe->tail))
408+
wake_next_reader = false;
403409
__pipe_unlock(pipe);
404410

405411
if (was_full) {
406-
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
412+
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
407413
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
408414
}
415+
if (wake_next_reader)
416+
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
409417
if (ret > 0)
410418
file_accessed(filp);
411419
return ret;
@@ -437,6 +445,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
437445
size_t total_len = iov_iter_count(from);
438446
ssize_t chars;
439447
bool was_empty = false;
448+
bool wake_next_writer = false;
440449

441450
/* Null write succeeds. */
442451
if (unlikely(total_len == 0))
@@ -515,16 +524,16 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
515524
* it, either the reader will consume it or it'll still
516525
* be there for the next write.
517526
*/
518-
spin_lock_irq(&pipe->wait.lock);
527+
spin_lock_irq(&pipe->rd_wait.lock);
519528

520529
head = pipe->head;
521530
if (pipe_full(head, pipe->tail, pipe->max_usage)) {
522-
spin_unlock_irq(&pipe->wait.lock);
531+
spin_unlock_irq(&pipe->rd_wait.lock);
523532
continue;
524533
}
525534

526535
pipe->head = head + 1;
527-
spin_unlock_irq(&pipe->wait.lock);
536+
spin_unlock_irq(&pipe->rd_wait.lock);
528537

529538
/* Insert it into the buffer array */
530539
buf = &pipe->bufs[head & mask];
@@ -576,14 +585,17 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
576585
*/
577586
__pipe_unlock(pipe);
578587
if (was_empty) {
579-
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
588+
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
580589
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
581590
}
582-
wait_event_interruptible(pipe->wait, pipe_writable(pipe));
591+
wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
583592
__pipe_lock(pipe);
584593
was_empty = pipe_empty(pipe->head, pipe->tail);
594+
wake_next_writer = true;
585595
}
586596
out:
597+
if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
598+
wake_next_writer = false;
587599
__pipe_unlock(pipe);
588600

589601
/*
@@ -596,9 +608,11 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
596608
* wake up pending jobs
597609
*/
598610
if (was_empty) {
599-
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
611+
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
600612
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
601613
}
614+
if (wake_next_writer)
615+
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
602616
if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
603617
int err = file_update_time(filp);
604618
if (err)
@@ -642,12 +656,15 @@ pipe_poll(struct file *filp, poll_table *wait)
642656
unsigned int head, tail;
643657

644658
/*
645-
* Reading only -- no need for acquiring the semaphore.
659+
* Reading pipe state only -- no need for acquiring the semaphore.
646660
*
647661
* But because this is racy, the code has to add the
648662
* entry to the poll table _first_ ..
649663
*/
650-
poll_wait(filp, &pipe->wait, wait);
664+
if (filp->f_mode & FMODE_READ)
665+
poll_wait(filp, &pipe->rd_wait, wait);
666+
if (filp->f_mode & FMODE_WRITE)
667+
poll_wait(filp, &pipe->wr_wait, wait);
651668

652669
/*
653670
* .. and only then can you do the racy tests. That way,
@@ -706,7 +723,8 @@ pipe_release(struct inode *inode, struct file *file)
706723
pipe->writers--;
707724

708725
if (pipe->readers || pipe->writers) {
709-
wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLOUT | EPOLLRDNORM | EPOLLWRNORM | EPOLLERR | EPOLLHUP);
726+
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM | EPOLLERR | EPOLLHUP);
727+
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM | EPOLLERR | EPOLLHUP);
710728
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
711729
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
712730
}
@@ -789,7 +807,8 @@ struct pipe_inode_info *alloc_pipe_info(void)
789807
GFP_KERNEL_ACCOUNT);
790808

791809
if (pipe->bufs) {
792-
init_waitqueue_head(&pipe->wait);
810+
init_waitqueue_head(&pipe->rd_wait);
811+
init_waitqueue_head(&pipe->wr_wait);
793812
pipe->r_counter = pipe->w_counter = 1;
794813
pipe->max_usage = pipe_bufs;
795814
pipe->ring_size = pipe_bufs;
@@ -1007,7 +1026,8 @@ static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt)
10071026

10081027
static void wake_up_partner(struct pipe_inode_info *pipe)
10091028
{
1010-
wake_up_interruptible(&pipe->wait);
1029+
wake_up_interruptible(&pipe->rd_wait);
1030+
wake_up_interruptible(&pipe->wr_wait);
10111031
}
10121032

10131033
static int fifo_open(struct inode *inode, struct file *filp)
@@ -1118,13 +1138,13 @@ static int fifo_open(struct inode *inode, struct file *filp)
11181138

11191139
err_rd:
11201140
if (!--pipe->readers)
1121-
wake_up_interruptible(&pipe->wait);
1141+
wake_up_interruptible(&pipe->wr_wait);
11221142
ret = -ERESTARTSYS;
11231143
goto err;
11241144

11251145
err_wr:
11261146
if (!--pipe->writers)
1127-
wake_up_interruptible(&pipe->wait);
1147+
wake_up_interruptible(&pipe->rd_wait);
11281148
ret = -ERESTARTSYS;
11291149
goto err;
11301150

@@ -1251,7 +1271,8 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
12511271
pipe->max_usage = nr_slots;
12521272
pipe->tail = tail;
12531273
pipe->head = head;
1254-
wake_up_interruptible_all(&pipe->wait);
1274+
wake_up_interruptible_all(&pipe->rd_wait);
1275+
wake_up_interruptible_all(&pipe->wr_wait);
12551276
return pipe->max_usage * PAGE_SIZE;
12561277

12571278
out_revert_acct:

fs/splice.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ static const struct pipe_buf_operations user_page_pipe_buf_ops = {
165165
static void wakeup_pipe_readers(struct pipe_inode_info *pipe)
166166
{
167167
smp_mb();
168-
if (waitqueue_active(&pipe->wait))
169-
wake_up_interruptible(&pipe->wait);
168+
if (waitqueue_active(&pipe->rd_wait))
169+
wake_up_interruptible(&pipe->rd_wait);
170170
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
171171
}
172172

@@ -462,8 +462,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe,
462462
static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
463463
{
464464
smp_mb();
465-
if (waitqueue_active(&pipe->wait))
466-
wake_up_interruptible(&pipe->wait);
465+
if (waitqueue_active(&pipe->wr_wait))
466+
wake_up_interruptible(&pipe->wr_wait);
467467
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
468468
}
469469

include/linux/pipe_fs_i.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ struct pipe_buffer {
4747
**/
4848
struct pipe_inode_info {
4949
struct mutex mutex;
50-
wait_queue_head_t wait;
50+
wait_queue_head_t rd_wait, wr_wait;
5151
unsigned int head;
5252
unsigned int tail;
5353
unsigned int max_usage;

0 commit comments

Comments
 (0)