diff --git a/Cargo.toml b/Cargo.toml index cfe7151d..80fd782b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ time = "0.3" corosensei = "0.2" core_affinity = "0.8" crossbeam-utils = "0.8" +crossbeam-skiplist = "0.1" nix = "0.29" io-uring = "0.7" windows-sys = "0.59" diff --git a/core/Cargo.toml b/core/Cargo.toml index bc058abf..fb221583 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -11,13 +11,6 @@ license.workspace = true readme.workspace = true [dependencies] -cfg-if.workspace = true -once_cell.workspace = true -dashmap.workspace = true -num_cpus.workspace = true -rand.workspace = true -st3.workspace = true -crossbeam-deque.workspace = true tracing = { workspace = true, default-features = false, optional = true } tracing-subscriber = { workspace = true, features = [ "fmt", @@ -32,6 +25,14 @@ uuid = { workspace = true, features = [ educe = { workspace = true, optional = true } core_affinity = { workspace = true, optional = true } crossbeam-utils = { workspace = true, optional = true } +cfg-if.workspace = true +once_cell.workspace = true +dashmap.workspace = true +num_cpus.workspace = true +rand.workspace = true +st3.workspace = true +crossbeam-deque.workspace = true +crossbeam-skiplist.workspace = true psm.workspace = true [target.'cfg(unix)'.dependencies] diff --git a/core/src/common/mod.rs b/core/src/common/mod.rs index 44150798..19f62213 100644 --- a/core/src/common/mod.rs +++ b/core/src/common/mod.rs @@ -56,6 +56,48 @@ pub mod timer; /// pub mod work_steal; +/// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To +/// find an available task, it might do the following: +/// +/// 1. Try popping one task from the local worker queue. +/// 2. Try popping and stealing tasks from another local worker queue. +/// 3. Try popping and stealing a batch of tasks from the global injector queue. +/// +/// A queue implementation of work-stealing strategy: +/// +/// # Examples +/// +/// ``` +/// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue; +/// +/// let queue = OrderedWorkStealQueue::new(2, 64); +/// for i in 6..8 { +/// queue.push_with_priority(i, i); +/// } +/// let local0 = queue.local_queue(); +/// for i in 2..6 { +/// local0.push_with_priority(i, i); +/// } +/// let local1 = queue.local_queue(); +/// for i in 0..2 { +/// local1.push_with_priority(i, i); +/// } +/// for i in 0..2 { +/// assert_eq!(local1.pop_front(), Some(i)); +/// } +/// for i in (2..6).rev() { +/// assert_eq!(local1.pop_front(), Some(i)); +/// } +/// for i in 6..8 { +/// assert_eq!(local1.pop_front(), Some(i)); +/// } +/// assert_eq!(local0.pop_front(), None); +/// assert_eq!(local1.pop_front(), None); +/// assert_eq!(queue.pop(), None); +/// ``` +/// +pub mod ordered_work_steal; + #[cfg(target_os = "linux")] extern "C" { fn linux_version_code() -> c_int; diff --git a/core/src/common/ordered_work_steal.rs b/core/src/common/ordered_work_steal.rs new file mode 100644 index 00000000..c2bf8dab --- /dev/null +++ b/core/src/common/ordered_work_steal.rs @@ -0,0 +1,397 @@ +use crossbeam_deque::{Injector, Steal}; +use crossbeam_skiplist::SkipMap; +use rand::Rng; +use st3::fifo::Worker; +use std::collections::VecDeque; +use std::ffi::c_longlong; +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; + +/// Work stealing global queue, shared by multiple threads. +#[repr(C)] +#[derive(Debug)] +pub struct OrderedWorkStealQueue { + shared_queue: SkipMap>, + /// Number of pending tasks in the queue. This helps prevent unnecessary + /// locking in the hot path. + len: AtomicUsize, + local_capacity: usize, + local_queues: VecDeque>>, + index: AtomicUsize, +} + +impl Drop for OrderedWorkStealQueue { + fn drop(&mut self) { + if !std::thread::panicking() { + for local_queue in &self.local_queues { + for entry in local_queue { + assert!(entry.value().pop().is_none(), "local queue not empty"); + } + } + assert!(self.pop().is_none(), "global queue not empty"); + } + } +} + +impl OrderedWorkStealQueue { + /// Create a new `WorkStealQueue` instance. + #[must_use] + pub fn new(local_queues_size: usize, local_capacity: usize) -> Self { + OrderedWorkStealQueue { + shared_queue: SkipMap::new(), + len: AtomicUsize::new(0), + local_capacity, + local_queues: (0..local_queues_size).map(|_| SkipMap::new()).collect(), + index: AtomicUsize::new(0), + } + } + + /// Returns `true` if the global queue is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Get the size of the global queue. + pub fn len(&self) -> usize { + self.len.load(Ordering::Acquire) + } + + /// Push an element to the global queue. + pub fn push_with_priority(&self, priority: c_longlong, item: T) { + self.shared_queue + .get_or_insert_with(priority, Injector::new) + .value() + .push(item); + //add count + self.len + .store(self.len().saturating_add(1), Ordering::Release); + } + + /// Pop an element from the global queue. + pub fn pop(&self) -> Option { + // Fast path, if len == 0, then there are no values + if self.is_empty() { + return None; + } + for entry in &self.shared_queue { + loop { + match entry.value().steal() { + Steal::Success(item) => { + // Decrement the count. + self.len + .store(self.len().saturating_sub(1), Ordering::Release); + return Some(item); + } + Steal::Retry => continue, + Steal::Empty => break, + } + } + } + None + } + + /// Get a local queue, this method should be called up to `local_queue_size` times. + /// + /// # Panics + /// should never happen + pub fn local_queue(&self) -> OrderedLocalQueue<'_, T> { + let mut index = self.index.fetch_add(1, Ordering::Relaxed); + if index == usize::MAX { + self.index.store(0, Ordering::Relaxed); + } + index %= self.local_queues.len(); + let local = self + .local_queues + .get(index) + .unwrap_or_else(|| panic!("local queue {index} init failed!")); + OrderedLocalQueue::new(self, local) + } +} + +impl Default for OrderedWorkStealQueue { + fn default() -> Self { + Self::new(num_cpus::get(), 256) + } +} + +/// The work stealing local queue, exclusive to thread. +#[repr(C)] +#[derive(Debug)] +pub struct OrderedLocalQueue<'l, T: Debug> { + /// Used to schedule bookkeeping tasks every so often. + tick: AtomicU32, + shared: &'l OrderedWorkStealQueue, + stealing: AtomicBool, + queue: &'l SkipMap>, + len: AtomicUsize, +} + +impl Drop for OrderedLocalQueue<'_, T> { + fn drop(&mut self) { + if !std::thread::panicking() { + for entry in self.queue { + assert!(entry.value().pop().is_none(), "local queue not empty"); + } + } + } +} + +impl<'l, T: Debug> OrderedLocalQueue<'l, T> { + fn new( + shared: &'l OrderedWorkStealQueue, + queue: &'l SkipMap>, + ) -> Self { + OrderedLocalQueue { + tick: AtomicU32::new(0), + shared, + stealing: AtomicBool::new(false), + queue, + len: AtomicUsize::new(0), + } + } + + /// Returns `true` if the local queue is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns `true` if the local queue is full. + /// + /// # Examples + /// + /// ``` + /// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue; + /// + /// let queue = OrderedWorkStealQueue::new(1, 2); + /// let local = queue.local_queue(); + /// assert!(local.is_empty()); + /// for i in 0..2 { + /// local.push_with_priority(i, i); + /// } + /// assert!(local.is_full()); + /// assert_eq!(local.pop_front(), Some(0)); + /// assert_eq!(local.len(), 1); + /// assert_eq!(local.pop_front(), Some(1)); + /// assert_eq!(local.pop_front(), None); + /// assert!(local.is_empty()); + /// ``` + pub fn is_full(&self) -> bool { + self.len() >= self.shared.local_capacity + } + + fn max_steal(&self) -> usize { + //最多偷取本地最长的一半 + self.shared + .local_capacity + .saturating_add(1) + .saturating_div(2) + .saturating_sub(self.len()) + } + + fn can_steal(&self) -> bool { + self.len() + < self + .shared + .local_capacity + .saturating_add(1) + .saturating_div(2) + } + + /// Returns the number of elements in the queue. + pub fn len(&self) -> usize { + self.len.load(Ordering::Acquire) + } + + fn try_lock(&self) -> bool { + self.stealing + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + fn release_lock(&self) { + self.stealing.store(false, Ordering::Release); + } + + /// If the queue is full, first push half to global, + /// then push the item to global. + /// + /// # Examples + /// + /// ``` + /// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue; + /// + /// let queue = OrderedWorkStealQueue::new(1, 2); + /// let local = queue.local_queue(); + /// for i in 0..4 { + /// local.push_with_priority(i, i); + /// } + /// for i in 0..4 { + /// assert_eq!(local.pop_front(), Some(i)); + /// } + /// assert_eq!(local.pop_front(), None); + /// ``` + pub fn push_with_priority(&self, priority: c_longlong, item: T) { + if self.is_full() { + self.push_to_global(priority, item); + return; + } + if let Err(item) = self + .queue + .get_or_insert_with(priority, || Worker::new(self.shared.local_capacity)) + .value() + .push(item) + { + self.push_to_global(priority, item); + } else { + //add count + self.len + .store(self.len().saturating_add(1), Ordering::Release); + } + } + + fn push_to_global(&self, priority: c_longlong, item: T) { + //把本地队列的一半放到全局队列 + let count = self.len() / 2; + for _ in 0..count { + for entry in self.queue.iter().rev() { + if let Some(item) = entry.value().pop() { + self.shared.push_with_priority(*entry.key(), item); + } + } + } + //直接放到全局队列 + self.shared.push_with_priority(priority, item); + } + + /// Increment the tick + fn tick(&self) -> u32 { + let val = self.tick.fetch_add(1, Ordering::Release); + if val == u32::MAX { + self.tick.store(0, Ordering::Release); + return 0; + } + val.saturating_add(1) + } + + /// If the queue is empty, first try steal from global, + /// then try steal from siblings. + /// + /// # Examples + /// + /// ``` + /// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue; + /// + /// let queue = OrderedWorkStealQueue::new(1, 32); + /// for i in 0..4 { + /// queue.push_with_priority(i, i); + /// } + /// let local = queue.local_queue(); + /// for i in 0..4 { + /// assert_eq!(local.pop_front(), Some(i)); + /// } + /// assert_eq!(local.pop_front(), None); + /// assert_eq!(queue.pop(), None); + /// ``` + /// + /// # Examples + /// ``` + /// use open_coroutine_core::common::ordered_work_steal::OrderedWorkStealQueue; + /// + /// let queue = OrderedWorkStealQueue::new(2, 64); + /// let local0 = queue.local_queue(); + /// for i in 2..6 { + /// local0.push_with_priority(i, i); + /// } + /// assert_eq!(local0.len(), 4); + /// let local1 = queue.local_queue(); + /// for i in 0..2 { + /// local1.push_with_priority(i, i); + /// } + /// assert_eq!(local1.len(), 2); + /// for i in 0..2 { + /// assert_eq!(local1.pop_front(), Some(i)); + /// } + /// for i in (2..6).rev() { + /// assert_eq!(local1.pop_front(), Some(i)); + /// } + /// assert_eq!(local0.pop_front(), None); + /// assert_eq!(local1.pop_front(), None); + /// assert_eq!(queue.pop(), None); + /// ``` + pub fn pop_front(&self) -> Option { + //每从本地弹出61次,就从全局队列弹出 + if self.tick() % 61 == 0 { + if let Some(val) = self.shared.pop() { + return Some(val); + } + } + if let Some(val) = self.pop() { + return Some(val); + } + if self.try_lock() { + //尝试从其他本地队列steal + let local_queues = &self.shared.local_queues; + let num = local_queues.len(); + let start = rand::thread_rng().gen_range(0..num); + for i in 0..num { + let i = (start + i) % num; + if let Some(another) = local_queues.get(i) { + if !self.can_steal() { + //本地队列超过一半,不再steal + break; + } + if std::ptr::eq(&another, &self.queue) { + //不能偷自己 + continue; + } + for entry in another.iter().rev() { + let worker = entry.value(); + if worker.is_empty() { + //其他队列为空 + continue; + } + let into_entry = self.queue.get_or_insert_with(*entry.key(), || { + Worker::new(self.shared.local_capacity) + }); + let into_queue = into_entry.value(); + if worker + .stealer() + .steal(into_queue, |n| { + //可偷取的最大长度与本地队列可偷长度做比较 + n.min(self.max_steal()) + //与其他队列当前长度的一半做比较 + .min( + worker + .capacity() + .saturating_sub(worker.spare_capacity()) + .saturating_add(1) + .saturating_div(2), + ) + }) + .is_ok() + { + self.release_lock(); + return self.pop(); + } + } + } + } + self.release_lock(); + } + //都steal不到,只好从shared里pop + self.shared.pop() + } + + fn pop(&self) -> Option { + //从本地队列弹出元素 + for entry in self.queue { + if let Some(val) = entry.value().pop() { + // Decrement the count. + self.len + .store(self.len().saturating_sub(1), Ordering::Release); + return Some(val); + } + } + None + } +} diff --git a/core/src/common/work_steal.rs b/core/src/common/work_steal.rs index 93ac58df..a2590b34 100644 --- a/core/src/common/work_steal.rs +++ b/core/src/common/work_steal.rs @@ -56,7 +56,8 @@ impl WorkStealQueue { pub fn push(&self, item: T) { self.shared_queue.push(item); //add count - self.len.store(self.len() + 1, Ordering::Release); + self.len + .store(self.len().saturating_add(1), Ordering::Release); } /// Pop an element from the global queue. @@ -69,7 +70,8 @@ impl WorkStealQueue { match self.shared_queue.steal() { Steal::Success(item) => { // Decrement the count. - self.len.store(self.len() - 1, Ordering::Release); + self.len + .store(self.len().saturating_sub(1), Ordering::Release); return Some(item); } Steal::Retry => continue, @@ -81,7 +83,7 @@ impl WorkStealQueue { /// Get a local queue, this method should be called up to `local_queue_size` times. /// /// # Panics - /// should never happens + /// should never happen pub fn local_queue(&self) -> LocalQueue<'_, T> { let mut index = self.index.fetch_add(1, Ordering::Relaxed); if index == usize::MAX { @@ -160,9 +162,23 @@ impl<'l, T: Debug> LocalQueue<'l, T> { self.queue.spare_capacity() == 0 } + fn max_steal(&self) -> usize { + self.queue + .capacity() + .saturating_add(1) + .saturating_div(2) + .saturating_sub(self.len()) + } + + fn can_steal(&self) -> bool { + self.queue.spare_capacity() >= self.queue.capacity().saturating_add(1).saturating_div(2) + } + /// Returns the number of elements in the queue. pub fn len(&self) -> usize { - self.queue.capacity() - self.queue.spare_capacity() + self.queue + .capacity() + .saturating_sub(self.queue.spare_capacity()) } fn try_lock(&self) -> bool { @@ -215,7 +231,7 @@ impl<'l, T: Debug> LocalQueue<'l, T> { self.tick.store(0, Ordering::Release); return 0; } - val + 1 + val.saturating_add(1) } /// If the queue is empty, first try steal from global, @@ -248,9 +264,11 @@ impl<'l, T: Debug> LocalQueue<'l, T> { /// local0.push_back(3); /// local0.push_back(4); /// local0.push_back(5); + /// assert_eq!(local0.len(), 4); /// let local1 = queue.local_queue(); /// local1.push_back(0); /// local1.push_back(1); + /// assert_eq!(local1.len(), 2); /// for i in 0..6 { /// assert_eq!(local1.pop_front(), Some(i)); /// } @@ -277,6 +295,10 @@ impl<'l, T: Debug> LocalQueue<'l, T> { for i in 0..num { let i = (start + i) % num; if let Some(another) = local_queues.get(i) { + if !self.can_steal() { + //本地队列超过一半,不再steal + break; + } if std::ptr::eq(&another, &self.queue) { //不能偷自己 continue; @@ -285,17 +307,19 @@ impl<'l, T: Debug> LocalQueue<'l, T> { //其他队列为空 continue; } - if self.queue.spare_capacity() == 0 { - //本地队列已满 - continue; - } if another .stealer() .steal(self.queue, |n| { - //可偷取的最大长度与本地队列空闲长度做比较 - n.min(self.queue.spare_capacity()) + //可偷取的最大长度与本地队列可偷长度做比较 + n.min(self.max_steal()) //与其他队列当前长度的一半做比较 - .min(((another.capacity() - another.spare_capacity()) + 1) / 2) + .min( + another + .capacity() + .saturating_sub(another.spare_capacity()) + .saturating_add(1) + .saturating_div(2), + ) }) .is_ok() { diff --git a/core/src/coroutine/local.rs b/core/src/coroutine/local.rs index 42ddf52a..f9611eb7 100644 --- a/core/src/coroutine/local.rs +++ b/core/src/coroutine/local.rs @@ -4,7 +4,6 @@ use std::ffi::c_void; use std::fmt::Debug; /// todo provide macro like [`std::thread_local`] - /// A struct for coroutines handles local args. #[repr(C)] #[derive(Debug, Default)] diff --git a/core/src/coroutine/suspender.rs b/core/src/coroutine/suspender.rs index 725125d7..197f8f9b 100644 --- a/core/src/coroutine/suspender.rs +++ b/core/src/coroutine/suspender.rs @@ -49,7 +49,7 @@ impl Suspender<'_, Param, Yield> { } #[allow(clippy::must_use_candidate)] -impl<'s, Param> Suspender<'s, Param, ()> { +impl Suspender<'_, Param, ()> { /// see the `suspend_with` documents. pub fn suspend(&self) -> Param { self.suspend_with(()) diff --git a/core/src/syscall/unix/select.rs b/core/src/syscall/unix/select.rs index c63af454..806f5d4b 100644 --- a/core/src/syscall/unix/select.rs +++ b/core/src/syscall/unix/select.rs @@ -97,7 +97,7 @@ impl SelectSyscall for NioSelectSyscall { } _ = EventLoops::wait_event(Some(Duration::from_millis(u64::from(t.min(x))))); if t != c_uint::MAX { - t = if t > x { t - x } else { 0 }; + t = t.saturating_sub(x); } if x < 16 { x <<= 1;