Skip to content

Commit 54c1ee4

Browse files
Waiman-LongPeter Zijlstra
authored andcommitted
locking/rwsem: Conditionally wake waiters in reader/writer slowpaths
In an analysis of a recent vmcore, a reader-owned rwsem was found with 385 readers but no writer in the wait queue. That is kind of unusual but it may be caused by some race conditions that we have not fully understood yet. In such a case, all the readers in the wait queue should join the other reader-owners and acquire the read lock. In rwsem_down_write_slowpath(), an incoming writer will try to wake up the front readers under such circumstance. That is not the case for rwsem_down_read_slowpath(), add a new helper function rwsem_cond_wake_waiter() to do wakeup and use it in both reader and writer slowpaths to have a consistent and correct behavior. Signed-off-by: Waiman Long <[email protected]> Signed-off-by: Peter Zijlstra (Intel) <[email protected]> Link: https://lkml.kernel.org/r/[email protected]
1 parent f9e21aa commit 54c1ee4

File tree

1 file changed

+32
-36
lines changed

1 file changed

+32
-36
lines changed

kernel/locking/rwsem.c

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
901901
*/
902902
static inline void clear_nonspinnable(struct rw_semaphore *sem)
903903
{
904-
if (rwsem_test_oflags(sem, RWSEM_NONSPINNABLE))
904+
if (unlikely(rwsem_test_oflags(sem, RWSEM_NONSPINNABLE)))
905905
atomic_long_andnot(RWSEM_NONSPINNABLE, &sem->owner);
906906
}
907907

@@ -925,6 +925,31 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
925925
}
926926
#endif
927927

928+
/*
929+
* Prepare to wake up waiter(s) in the wait queue by putting them into the
930+
* given wake_q if the rwsem lock owner isn't a writer. If rwsem is likely
931+
* reader-owned, wake up read lock waiters in queue front or wake up any
932+
* front waiter otherwise.
933+
934+
* This is being called from both reader and writer slow paths.
935+
*/
936+
static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
937+
struct wake_q_head *wake_q)
938+
{
939+
enum rwsem_wake_type wake_type;
940+
941+
if (count & RWSEM_WRITER_MASK)
942+
return;
943+
944+
if (count & RWSEM_READER_MASK) {
945+
wake_type = RWSEM_WAKE_READERS;
946+
} else {
947+
wake_type = RWSEM_WAKE_ANY;
948+
clear_nonspinnable(sem);
949+
}
950+
rwsem_mark_wake(sem, wake_type, wake_q);
951+
}
952+
928953
/*
929954
* Wait for the read lock to be granted
930955
*/
@@ -935,7 +960,6 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
935960
long rcnt = (count >> RWSEM_READER_SHIFT);
936961
struct rwsem_waiter waiter;
937962
DEFINE_WAKE_Q(wake_q);
938-
bool wake = false;
939963

940964
/*
941965
* To prevent a constant stream of readers from starving a sleeping
@@ -996,22 +1020,11 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
9961020
/* we're now waiting on the lock, but no longer actively locking */
9971021
count = atomic_long_add_return(adjustment, &sem->count);
9981022

999-
/*
1000-
* If there are no active locks, wake the front queued process(es).
1001-
*
1002-
* If there are no writers and we are first in the queue,
1003-
* wake our own waiter to join the existing active readers !
1004-
*/
1005-
if (!(count & RWSEM_LOCK_MASK)) {
1006-
clear_nonspinnable(sem);
1007-
wake = true;
1008-
}
1009-
if (wake || (!(count & RWSEM_WRITER_MASK) &&
1010-
(adjustment & RWSEM_FLAG_WAITERS)))
1011-
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
1012-
1023+
rwsem_cond_wake_waiter(sem, count, &wake_q);
10131024
raw_spin_unlock_irq(&sem->wait_lock);
1014-
wake_up_q(&wake_q);
1025+
1026+
if (!wake_q_empty(&wake_q))
1027+
wake_up_q(&wake_q);
10151028

10161029
/* wait to be given the lock */
10171030
for (;;) {
@@ -1050,7 +1063,6 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
10501063
static struct rw_semaphore __sched *
10511064
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
10521065
{
1053-
long count;
10541066
struct rwsem_waiter waiter;
10551067
DEFINE_WAKE_Q(wake_q);
10561068

@@ -1074,23 +1086,8 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
10741086

10751087
/* we're now waiting on the lock */
10761088
if (rwsem_first_waiter(sem) != &waiter) {
1077-
count = atomic_long_read(&sem->count);
1078-
1079-
/*
1080-
* If there were already threads queued before us and:
1081-
* 1) there are no active locks, wake the front
1082-
* queued process(es) as the handoff bit might be set.
1083-
* 2) there are no active writers and some readers, the lock
1084-
* must be read owned; so we try to wake any read lock
1085-
* waiters that were queued ahead of us.
1086-
*/
1087-
if (count & RWSEM_WRITER_MASK)
1088-
goto wait;
1089-
1090-
rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
1091-
? RWSEM_WAKE_READERS
1092-
: RWSEM_WAKE_ANY, &wake_q);
1093-
1089+
rwsem_cond_wake_waiter(sem, atomic_long_read(&sem->count),
1090+
&wake_q);
10941091
if (!wake_q_empty(&wake_q)) {
10951092
/*
10961093
* We want to minimize wait_lock hold time especially
@@ -1105,7 +1102,6 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
11051102
atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
11061103
}
11071104

1108-
wait:
11091105
/* wait until we successfully acquire the lock */
11101106
set_current_state(state);
11111107
for (;;) {

0 commit comments

Comments
 (0)