Skip to content

Commit 991effa

Browse files
committed
Updates to parker
- No longer possibility for spurious wake - Now reports unpark reason (timeout or unpark())Updates to parker
1 parent 99c3230 commit 991effa

File tree

3 files changed

+87
-55
lines changed

3 files changed

+87
-55
lines changed

crossbeam-utils/src/sync/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ mod parker;
88
mod sharded_lock;
99
mod wait_group;
1010

11-
pub use self::parker::{Parker, Unparker};
11+
pub use self::parker::{Parker, UnparkReason, Unparker};
1212
pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
1313
pub use self::wait_group::WaitGroup;

crossbeam-utils/src/sync/parker.rs

Lines changed: 73 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
1-
use std::fmt;
21
use std::marker::PhantomData;
32
use std::sync::atomic::AtomicUsize;
43
use std::sync::atomic::Ordering::SeqCst;
54
use std::sync::{Arc, Condvar, Mutex};
65
use std::time::Duration;
6+
use std::{fmt, time::Instant};
77

88
/// A thread parking primitive.
99
///
1010
/// Conceptually, each `Parker` has an associated token which is initially not present:
1111
///
1212
/// * The [`park`] method blocks the current thread unless or until the token is available, at
13-
/// which point it automatically consumes the token. It may also return *spuriously*, without
14-
/// consuming the token.
13+
/// which point it automatically consumes the token.
1514
///
16-
/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
17-
/// time.
15+
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16+
/// a specified maximum time.
1817
///
1918
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
2019
/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
@@ -43,13 +42,13 @@ use std::time::Duration;
4342
/// u.unpark();
4443
/// });
4544
///
46-
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
47-
/// // spuriously before that without consuming the token.
45+
/// // Wakes up when `u.unpark()` provides the token.
4846
/// p.park();
4947
/// ```
5048
///
5149
/// [`park`]: Parker::park
5250
/// [`park_timeout`]: Parker::park_timeout
51+
/// [`park_deadline`]: Parker::park_deadline
5352
/// [`unpark`]: Unparker::unpark
5453
pub struct Parker {
5554
unparker: Unparker,
@@ -90,9 +89,6 @@ impl Parker {
9089

9190
/// Blocks the current thread until the token is made available.
9291
///
93-
/// A call to `park` may wake up spuriously without consuming the token, and callers should be
94-
/// prepared for this possibility.
95-
///
9692
/// # Examples
9793
///
9894
/// ```
@@ -113,9 +109,6 @@ impl Parker {
113109

114110
/// Blocks the current thread until the token is made available, but only for a limited time.
115111
///
116-
/// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
117-
/// should be prepared for this possibility.
118-
///
119112
/// # Examples
120113
///
121114
/// ```
@@ -127,8 +120,26 @@ impl Parker {
127120
/// // Waits for the token to become available, but will not wait longer than 500 ms.
128121
/// p.park_timeout(Duration::from_millis(500));
129122
/// ```
130-
pub fn park_timeout(&self, timeout: Duration) {
131-
self.unparker.inner.park(Some(timeout));
123+
pub fn park_timeout(&self, timeout: Duration) -> UnparkReason {
124+
self.park_deadline(Instant::now() + timeout)
125+
}
126+
127+
/// Blocks the current thread until the token is made available, or until a certain deadline.
128+
///
129+
/// # Examples
130+
///
131+
/// ```
132+
/// use std::time::{Duration, Instant};
133+
/// use crossbeam_utils::sync::Parker;
134+
///
135+
/// let p = Parker::new();
136+
/// let deadline = Instant::now() + Duration::from_millis(500);
137+
///
138+
/// // Waits for the token to become available, but will not wait longer than 500 ms.
139+
/// p.park_deadline(deadline);
140+
/// ```
141+
pub fn park_deadline(&self, deadline: Instant) -> UnparkReason {
142+
self.unparker.inner.park(Some(deadline))
132143
}
133144

134145
/// Returns a reference to an associated [`Unparker`].
@@ -227,8 +238,7 @@ impl Unparker {
227238
/// u.unpark();
228239
/// });
229240
///
230-
/// // Wakes up when `u.unpark()` provides the token, but may also wake up
231-
/// // spuriously before that without consuming the token.
241+
/// // Wakes up when `u.unpark()` provides the token.
232242
/// p.park();
233243
/// ```
234244
///
@@ -291,6 +301,18 @@ impl Clone for Unparker {
291301
}
292302
}
293303

304+
/// An enum that reports whether a `Parker::park_timeout` or
305+
/// `Parker::park_deadline` returned because another thread called `unpark` or
306+
/// because of a timeout.
307+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
308+
pub enum UnparkReason {
309+
/// The park method returned due to a call to `unpark`.
310+
Unparked,
311+
312+
/// The park method returned due to a timeout.
313+
Timeout,
314+
}
315+
294316
const EMPTY: usize = 0;
295317
const PARKED: usize = 1;
296318
const NOTIFIED: usize = 2;
@@ -302,20 +324,20 @@ struct Inner {
302324
}
303325

304326
impl Inner {
305-
fn park(&self, timeout: Option<Duration>) {
327+
fn park(&self, deadline: Option<Instant>) -> UnparkReason {
306328
// If we were previously notified then we consume this notification and return quickly.
307329
if self
308330
.state
309331
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
310332
.is_ok()
311333
{
312-
return;
334+
return UnparkReason::Unparked;
313335
}
314336

315337
// If the timeout is zero, then there is no need to actually block.
316-
if let Some(ref dur) = timeout {
317-
if *dur == Duration::from_millis(0) {
318-
return;
338+
if let Some(deadline) = deadline {
339+
if deadline <= Instant::now() {
340+
return UnparkReason::Timeout;
319341
}
320342
}
321343

@@ -333,41 +355,42 @@ impl Inner {
333355
// do that we must read from the write it made to `state`.
334356
let old = self.state.swap(EMPTY, SeqCst);
335357
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
336-
return;
358+
return UnparkReason::Unparked;
337359
}
338360
Err(n) => panic!("inconsistent park_timeout state: {}", n),
339361
}
340362

341-
match timeout {
342-
None => {
343-
loop {
344-
// Block the current thread on the conditional variable.
345-
m = self.cvar.wait(m).unwrap();
346-
347-
if self
348-
.state
349-
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
350-
.is_ok()
351-
{
352-
// got a notification
353-
return;
363+
loop {
364+
// Block the current thread on the conditional variable.
365+
m = match deadline {
366+
None => self.cvar.wait(m).unwrap(),
367+
Some(deadline) => match deadline.checked_duration_since(Instant::now()) {
368+
// We could check for a timeout here, but in the case that a timeout and an
369+
// unpark arrive simultaneously, we prefer to report the former.
370+
Some(duration) if duration > Duration::from_secs(0) => {
371+
self.cvar.wait_timeout(m, duration).unwrap().0
354372
}
355373

356-
// spurious wakeup, go back to sleep
357-
}
358-
}
359-
Some(timeout) => {
360-
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
361-
// notification we just want to unconditionally set `state` back to `EMPTY`, either
362-
// consuming a notification or un-flagging ourselves as parked.
363-
let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
364-
365-
match self.state.swap(EMPTY, SeqCst) {
366-
NOTIFIED => {} // got a notification
367-
PARKED => {} // no notification
368-
n => panic!("inconsistent park_timeout state: {}", n),
369-
}
374+
// We've timed out; swap out the state back to empty on our way out
375+
_ => match self.state.swap(EMPTY, SeqCst) {
376+
NOTIFIED => return UnparkReason::Unparked, // got a notification
377+
PARKED => return UnparkReason::Timeout, // no notification
378+
n => panic!("inconsistent park_timeout state: {}", n),
379+
},
380+
},
381+
};
382+
383+
if self
384+
.state
385+
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
386+
.is_ok()
387+
{
388+
// got a notification
389+
return UnparkReason::Unparked;
370390
}
391+
392+
// Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
393+
// in the branch above, when we discover the deadline is in the past
371394
}
372395
}
373396

crossbeam-utils/tests/parker.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,29 @@ use std::thread::sleep;
22
use std::time::Duration;
33
use std::u32;
44

5-
use crossbeam_utils::sync::Parker;
5+
use crossbeam_utils::sync::{Parker, UnparkReason};
66
use crossbeam_utils::thread;
77

88
#[test]
99
fn park_timeout_unpark_before() {
1010
let p = Parker::new();
1111
for _ in 0..10 {
1212
p.unparker().unpark();
13-
p.park_timeout(Duration::from_millis(u32::MAX as u64));
13+
assert_eq!(
14+
p.park_timeout(Duration::from_millis(u32::MAX as u64)),
15+
UnparkReason::Unparked,
16+
);
1417
}
1518
}
1619

1720
#[test]
1821
fn park_timeout_unpark_not_called() {
1922
let p = Parker::new();
2023
for _ in 0..10 {
21-
p.park_timeout(Duration::from_millis(10));
24+
assert_eq!(
25+
p.park_timeout(Duration::from_millis(10)),
26+
UnparkReason::Timeout,
27+
);
2228
}
2329
}
2430

@@ -34,7 +40,10 @@ fn park_timeout_unpark_called_other_thread() {
3440
u.unpark();
3541
});
3642

37-
p.park_timeout(Duration::from_millis(u32::MAX as u64));
43+
assert_eq!(
44+
p.park_timeout(Duration::from_millis(u32::MAX as u64)),
45+
UnparkReason::Unparked,
46+
);
3847
})
3948
.unwrap();
4049
}

0 commit comments

Comments
 (0)