|
| 1 | +// Copyright 2016 Amanieu d'Antras |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or |
| 4 | +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or |
| 5 | +// http://opensource.org/licenses/MIT>, at your option. This file may not be |
| 6 | +// copied, modified, or distributed except according to those terms. |
| 7 | + |
| 8 | +use libc; |
| 9 | +use ptr; |
| 10 | +use sync::atomic::{AtomicI32, Ordering}; |
| 11 | +use time::Instant; |
| 12 | +use super::tv_nsec_t; |
| 13 | + |
| 14 | +const FUTEX_WAIT: i32 = 0; |
| 15 | +const FUTEX_WAKE: i32 = 1; |
| 16 | +const FUTEX_PRIVATE: i32 = 128; |
| 17 | + |
| 18 | +// Helper type for putting a thread to sleep until some other thread wakes it up |
| 19 | +pub struct ThreadParker { |
| 20 | + futex: AtomicI32, |
| 21 | +} |
| 22 | + |
| 23 | +impl ThreadParker { |
| 24 | + pub const IS_CHEAP_TO_CONSTRUCT: bool = true; |
| 25 | + |
| 26 | + pub fn new() -> ThreadParker { |
| 27 | + ThreadParker { |
| 28 | + futex: AtomicI32::new(0), |
| 29 | + } |
| 30 | + } |
| 31 | + |
| 32 | + // Prepares the parker. This should be called before adding it to the queue. |
| 33 | + pub fn prepare_park(&self) { |
| 34 | + self.futex.store(1, Ordering::Relaxed); |
| 35 | + } |
| 36 | + |
| 37 | + // Checks if the park timed out. This should be called while holding the |
| 38 | + // queue lock after park_until has returned false. |
| 39 | + pub fn timed_out(&self) -> bool { |
| 40 | + self.futex.load(Ordering::Relaxed) != 0 |
| 41 | + } |
| 42 | + |
| 43 | + // Parks the thread until it is unparked. This should be called after it has |
| 44 | + // been added to the queue, after unlocking the queue. |
| 45 | + pub fn park(&self) { |
| 46 | + while self.futex.load(Ordering::Acquire) != 0 { |
| 47 | + self.futex_wait(None); |
| 48 | + } |
| 49 | + } |
| 50 | + |
| 51 | + // Parks the thread until it is unparked or the timeout is reached. This |
| 52 | + // should be called after it has been added to the queue, after unlocking |
| 53 | + // the queue. Returns true if we were unparked and false if we timed out. |
| 54 | + pub fn park_until(&self, timeout: Instant) -> bool { |
| 55 | + while self.futex.load(Ordering::Acquire) != 0 { |
| 56 | + let now = Instant::now(); |
| 57 | + if timeout <= now { |
| 58 | + return false; |
| 59 | + } |
| 60 | + let diff = timeout - now; |
| 61 | + if diff.as_secs() as libc::time_t as u64 != diff.as_secs() { |
| 62 | + // Timeout overflowed, just sleep indefinitely |
| 63 | + self.park(); |
| 64 | + return true; |
| 65 | + } |
| 66 | + let ts = libc::timespec { |
| 67 | + tv_sec: diff.as_secs() as libc::time_t, |
| 68 | + tv_nsec: diff.subsec_nanos() as tv_nsec_t, |
| 69 | + }; |
| 70 | + self.futex_wait(Some(ts)); |
| 71 | + } |
| 72 | + true |
| 73 | + } |
| 74 | + |
| 75 | + #[inline] |
| 76 | + fn futex_wait(&self, ts: Option<libc::timespec>) { |
| 77 | + let ts_ptr = ts |
| 78 | + .as_ref() |
| 79 | + .map(|ts_ref| ts_ref as *const _) |
| 80 | + .unwrap_or(ptr::null()); |
| 81 | + let r = unsafe { |
| 82 | + libc::syscall( |
| 83 | + libc::SYS_futex, |
| 84 | + &self.futex, |
| 85 | + FUTEX_WAIT | FUTEX_PRIVATE, |
| 86 | + 1, |
| 87 | + ts_ptr, |
| 88 | + ) |
| 89 | + }; |
| 90 | + debug_assert!(r == 0 || r == -1); |
| 91 | + if r == -1 { |
| 92 | + unsafe { |
| 93 | + debug_assert!( |
| 94 | + *libc::__errno_location() == libc::EINTR |
| 95 | + || *libc::__errno_location() == libc::EAGAIN |
| 96 | + || (ts.is_some() && *libc::__errno_location() == libc::ETIMEDOUT) |
| 97 | + ); |
| 98 | + } |
| 99 | + } |
| 100 | + } |
| 101 | + |
| 102 | + // Locks the parker to prevent the target thread from exiting. This is |
| 103 | + // necessary to ensure that thread-local ThreadData objects remain valid. |
| 104 | + // This should be called while holding the queue lock. |
| 105 | + pub fn unpark_lock(&self) -> UnparkHandle { |
| 106 | + // We don't need to lock anything, just clear the state |
| 107 | + self.futex.store(0, Ordering::Release); |
| 108 | + |
| 109 | + UnparkHandle { futex: &self.futex } |
| 110 | + } |
| 111 | +} |
| 112 | + |
| 113 | +// Handle for a thread that is about to be unparked. We need to mark the thread |
| 114 | +// as unparked while holding the queue lock, but we delay the actual unparking |
| 115 | +// until after the queue lock is released. |
| 116 | +pub struct UnparkHandle { |
| 117 | + futex: *const AtomicI32, |
| 118 | +} |
| 119 | + |
| 120 | +impl UnparkHandle { |
| 121 | + // Wakes up the parked thread. This should be called after the queue lock is |
| 122 | + // released to avoid blocking the queue for too long. |
| 123 | + pub fn unpark(self) { |
| 124 | + // The thread data may have been freed at this point, but it doesn't |
| 125 | + // matter since the syscall will just return EFAULT in that case. |
| 126 | + let r = |
| 127 | + unsafe { libc::syscall(libc::SYS_futex, self.futex, FUTEX_WAKE | FUTEX_PRIVATE, 1) }; |
| 128 | + debug_assert!(r == 0 || r == 1 || r == -1); |
| 129 | + if r == -1 { |
| 130 | + debug_assert_eq!(unsafe { *libc::__errno_location() }, libc::EFAULT); |
| 131 | + } |
| 132 | + } |
| 133 | +} |
0 commit comments