Skip to content

Commit 7943238

Browse files
committed
Implement condvar using futex requeue in Linux
1 parent d6f8829 commit 7943238

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed

library/std/src/sys/pal/unix/futex.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ pub fn futex_wake_all(futex: &AtomicU32) {
115115
}
116116
}
117117

118+
/// Wakes one waiter on `futex` and requeue the remaining waiters to `futex2`.
119+
#[cfg(any(target_os = "linux", target_os = "android"))]
120+
pub fn futex_requeue(futex: &AtomicU32, futex2: &AtomicU32) {
121+
let ptr = futex as *const AtomicU32;
122+
let ptr2 = futex2 as *const AtomicU32;
123+
let op = libc::FUTEX_REQUEUE | libc::FUTEX_PRIVATE_FLAG;
124+
unsafe {
125+
libc::syscall(libc::SYS_futex, ptr, op, 1, i32::MAX, ptr2);
126+
}
127+
}
128+
118129
// FreeBSD doesn't tell us how many threads are woken up, so this always returns false.
119130
#[cfg(target_os = "freebsd")]
120131
pub fn futex_wake(futex: &AtomicU32) -> bool {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use crate::sync::atomic::Ordering::Relaxed;
2+
use crate::sys::futex::{Futex, futex_requeue, futex_wait, futex_wake};
3+
use crate::sys::sync::Mutex;
4+
use crate::time::Duration;
5+
6+
pub struct Condvar {
7+
// The value of this atomic is simply incremented on every notification.
8+
// This is used by `.wait()` to not miss any notifications after
9+
// unlocking the mutex and before waiting for notifications.
10+
futex: Futex,
11+
// The futex to requeue to.
12+
// Its value is simply incremented when a waiter has been woken and acquired mutex.
13+
futex2: Futex,
14+
}
15+
16+
impl Condvar {
17+
#[inline]
18+
pub const fn new() -> Self {
19+
Self { futex: Futex::new(0), futex2: Futex::new(0) }
20+
}
21+
22+
// All the memory orderings here are `Relaxed`,
23+
// because synchronization is done by unlocking and locking the mutex.
24+
25+
pub fn notify_one(&self) {
26+
self.futex.fetch_add(1, Relaxed);
27+
futex_wake(&self.futex);
28+
}
29+
30+
pub fn notify_all(&self) {
31+
self.futex.fetch_add(1, Relaxed);
32+
futex_requeue(&self.futex, &self.futex2);
33+
}
34+
35+
pub unsafe fn wait(&self, mutex: &Mutex) {
36+
self.wait_optional_timeout(mutex, None);
37+
}
38+
39+
pub unsafe fn wait_timeout(&self, mutex: &Mutex, timeout: Duration) -> bool {
40+
self.wait_optional_timeout(mutex, Some(timeout))
41+
}
42+
43+
unsafe fn wait_optional_timeout(&self, mutex: &Mutex, timeout: Option<Duration>) -> bool {
44+
// Examine the notification counter _before_ we unlock the mutex.
45+
let futex_value = self.futex.load(Relaxed);
46+
47+
// Unlock the mutex before going to sleep.
48+
mutex.unlock();
49+
50+
// Wait, but only if there hasn't been any
51+
// notification since we unlocked the mutex.
52+
let r = futex_wait(&self.futex, futex_value, timeout);
53+
54+
// Lock the mutex again.
55+
mutex.lock();
56+
57+
// Wake another waiter.
58+
if r {
59+
self.futex2.fetch_add(1, Relaxed);
60+
futex_wake(&self.futex2);
61+
}
62+
63+
r
64+
}
65+
}

library/std/src/sys/sync/condvar/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
cfg_if::cfg_if! {
2-
if #[cfg(any(
2+
if #[cfg(any(target_os = "linux", target_os = "android"))] {
3+
mod futex_requeue;
4+
pub use futex_requeue::Condvar;
5+
} else if #[cfg(any(
36
all(target_os = "windows", not(target_vendor="win7")),
4-
target_os = "linux",
5-
target_os = "android",
67
target_os = "freebsd",
78
target_os = "openbsd",
89
target_os = "dragonfly",

0 commit comments

Comments
 (0)