From d641e7d5e8bb414156460164ea0ff75adbf0d1fb Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 22 Jul 2025 18:18:32 -0700 Subject: [PATCH 01/22] First attempt at a more optimized LocalExecutor --- src/lib.rs | 232 +------------ src/local_executor.rs | 742 ++++++++++++++++++++++++++++++++++++++++ src/static_executors.rs | 50 ++- tests/drop.rs | 131 ++++++- tests/larger_tasks.rs | 56 ++- tests/local_queue.rs | 24 +- tests/panic_prop.rs | 14 +- 7 files changed, 1002 insertions(+), 247 deletions(-) create mode 100644 src/local_executor.rs diff --git a/src/lib.rs b/src/lib.rs index 74e02db..61ae9ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,6 @@ use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::task::{Context, Poll, Waker}; @@ -55,11 +54,13 @@ use futures_lite::{future, prelude::*}; use pin_project_lite::pin_project; use slab::Slab; +mod local_executor; #[cfg(feature = "static")] mod static_executors; #[doc(no_inline)] pub use async_task::{FallibleTask, Task}; +pub use local_executor::*; #[cfg(feature = "static")] #[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))] pub use static_executors::*; @@ -431,235 +432,6 @@ impl<'a> Default for Executor<'a> { } } -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -/// -/// # Examples -/// -/// ``` -/// use async_executor::LocalExecutor; -/// use futures_lite::future; -/// -/// let local_ex = LocalExecutor::new(); -/// -/// future::block_on(local_ex.run(async { -/// println!("Hello world!"); -/// })); -/// ``` -pub struct LocalExecutor<'a> { - /// The inner executor. - inner: Executor<'a>, - - /// Makes the type `!Send` and `!Sync`. - _marker: PhantomData>, -} - -impl UnwindSafe for LocalExecutor<'_> {} -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl fmt::Debug for LocalExecutor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_executor(&self.inner, "LocalExecutor", f) - } -} - -impl<'a> LocalExecutor<'a> { - /// Creates a single-threaded executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// ``` - pub const fn new() -> LocalExecutor<'a> { - LocalExecutor { - inner: Executor::new(), - _marker: PhantomData, - } - } - - /// Returns `true` if there are no unfinished tasks. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// assert!(local_ex.is_empty()); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(!local_ex.is_empty()); - /// - /// assert!(local_ex.try_tick()); - /// assert!(local_ex.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - self.inner().is_empty() - } - - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active(); - - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - } - - /// Spawns many tasks onto the executor. - /// - /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and - /// spawns all of the tasks in one go. With large amounts of tasks this can improve - /// contention. - /// - /// It is assumed that the iterator provided does not block; blocking iterators can lock up - /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the - /// mutex is not released, as there are no other threads that can poll this executor. - /// - /// ## Example - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::{stream, prelude::*}; - /// use std::future::ready; - /// - /// # futures_lite::future::block_on(async { - /// let mut ex = LocalExecutor::new(); - /// - /// let futures = [ - /// ready(1), - /// ready(2), - /// ready(3) - /// ]; - /// - /// // Spawn all of the futures onto the executor at once. - /// let mut tasks = vec![]; - /// ex.spawn_many(futures, &mut tasks); - /// - /// // Await all of them. - /// let results = ex.run(async move { - /// stream::iter(tasks).then(|x| x).collect::>().await - /// }).await; - /// assert_eq!(results, [1, 2, 3]); - /// # }); - /// ``` - /// - /// [`spawn`]: LocalExecutor::spawn - /// [`Executor::spawn_many`]: Executor::spawn_many - pub fn spawn_many + 'a>( - &self, - futures: impl IntoIterator, - handles: &mut impl Extend>, - ) { - let mut active = self.inner().state().active(); - - // Convert all of the futures to tasks. - let tasks = futures.into_iter().map(|future| { - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - - // As only one thread can spawn or poll tasks at a time, there is no need - // to release lock contention here. - }); - - // Push them to the user's collection. - handles.extend(tasks); - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let ex = LocalExecutor::new(); - /// assert!(!ex.try_tick()); // no tasks to run - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.inner().try_tick() - } - - /// Runs a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let ex = LocalExecutor::new(); - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// future::block_on(ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.inner().tick().await - } - - /// Runs the executor until the given future completes. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = future::block_on(local_ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, future: impl Future) -> T { - self.inner().run(future).await - } - - /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor<'a> { - &self.inner - } -} - -impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { - LocalExecutor::new() - } -} - /// The state of a executor. struct State { /// The global queue. diff --git a/src/local_executor.rs b/src/local_executor.rs new file mode 100644 index 0000000..21170fe --- /dev/null +++ b/src/local_executor.rs @@ -0,0 +1,742 @@ +use std::cell::{BorrowError, Cell, RefCell, RefMut}; +use std::collections::VecDeque; +use std::fmt; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Poll, Waker}; + +use async_task::{Builder, Runnable}; +use futures_lite::{future, prelude::*}; +use slab::Slab; + +use crate::AsyncCallOnDrop; +#[doc(no_inline)] +pub use async_task::Task; + +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +/// +/// # Examples +/// +/// ``` +/// use async_executor::LocalExecutor; +/// use futures_lite::future; +/// +/// let local_ex = LocalExecutor::new(); +/// +/// future::block_on(local_ex.run(async { +/// println!("Hello world!"); +/// })); +/// ``` +pub struct LocalExecutor<'a> { + /// The executor state. + state: Cell<*mut State>, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "LocalExecutor", f) + } +} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// ``` + pub const fn new() -> LocalExecutor<'a> { + LocalExecutor { + state: Cell::new(std::ptr::null_mut()), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// assert!(local_ex.is_empty()); + /// + /// let task = local_ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(!local_ex.is_empty()); + /// + /// assert!(local_ex.try_tick()); + /// assert!(local_ex.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.state().active().is_empty() + } + + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&self, future: impl Future + 'a) -> Task { + let mut active = self.state().active(); + + // SAFETY: `T` and the future are `Send`. + unsafe { self.spawn_inner(future, &mut active) } + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::{stream, prelude::*}; + /// use std::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = LocalExecutor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: LocalExecutor::spawn + /// [`Executor::spawn_many`]: Executor::spawn_many + pub fn spawn_many + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let mut active = Some(self.state().active()); + + // Convert the futures into tasks. + let tasks = futures.into_iter().enumerate().map(move |(i, future)| { + // SAFETY: `T` and the future are `Send`. + let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; + + // Yield the lock every once in a while to ease contention. + if i.wrapping_sub(1) % 500 == 0 { + drop(active.take()); + active = Some(self.state().active()); + } + + task + }); + + // Push the tasks to the user's collection. + handles.extend(tasks); + } + + /// Spawn a future while holding the inner lock. + /// + /// # Safety + /// + /// If this is an `Executor`, `F` and `T` must be `Send`. + unsafe fn spawn_inner( + &self, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let state = self.state_as_rc(); + let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index))); + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()); + entry.insert(runnable.waker()); + + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state().tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(local_ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state().run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + 'static { + let state = self.state_as_rc(); + + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.borrow_mut().push_front(runnable); + state.notify(); + } + } + + /// Returns a pointer to the inner state. + #[inline] + pub(crate) fn state_ptr(&self) -> *const State { + #[cold] + fn alloc_state(cell: &Cell<*mut State>) -> *mut State { + debug_assert!(cell.get().is_null()); + let state = Rc::new(State::new()); + let ptr = Rc::into_raw(state) as *mut State; + cell.set(ptr); + ptr + } + + let mut ptr = self.state.get(); + if ptr.is_null() { + ptr = alloc_state(&self.state); + } + ptr + } + + /// Returns a reference to the inner state. + #[inline] + fn state(&self) -> &State { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. + unsafe { &*self.state_ptr() } + } + + // Clones the inner state Arc + #[inline] + fn state_as_rc(&self) -> Rc { + // SAFETY: So long as an Executor lives, it's state pointer will always be a valid + // Arc when accessed through state_ptr. + let arc = unsafe { Rc::from_raw(self.state_ptr()) }; + let clone = arc.clone(); + std::mem::forget(arc); + clone + } +} + +impl Drop for LocalExecutor<'_> { + fn drop(&mut self) { + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Arc::new and converted + // via Arc::into_raw in state_ptr. + let state = unsafe { Rc::from_raw(ptr) }; + + let mut active = state.active(); + for w in active.drain() { + w.wake(); + } + drop(active); + + state.queue.borrow_mut().clear(); + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} + +/// The state of a executor. +pub(crate) struct State { + /// The global queue. + pub(crate) queue: RefCell>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: RefCell, + + /// Currently active tasks. + pub(crate) active: RefCell>, +} + +impl State { + /// Creates state for a new executor. + pub(crate) const fn new() -> State { + State { + queue: RefCell::new(VecDeque::new()), + notified: AtomicBool::new(true), + sleepers: RefCell::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: RefCell::new(Slab::new()), + } + } + + /// Returns a reference to currently active tasks. + fn active(&self) -> RefMut<'_, Slab> { + self.active.borrow_mut() + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let waker = self.sleepers.borrow_mut().notify(); + if let Some(w) = waker { + w.wake(); + } + } + } + + pub(crate) fn try_tick(&self) -> bool { + match self.queue.borrow_mut().pop_back() { + None => false, + Some(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable().await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } +} + +/// A list of sleeping tickers. +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(usize, Waker)>, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = match self.free_ids.pop() { + Some(id) => id, + None => self.count + 1, + }; + self.count += 1; + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + item.1.clone_from(waker); + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.borrow_mut(); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.borrow_mut(); + sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self) -> Runnable { + self.runnable_with(|| self.state.queue.borrow_mut().pop_back()) + .await + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { + future::poll_fn(|cx| { + loop { + match search() { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + return Poll::Ready(r); + } + } + } + }) + .await + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.borrow_mut(); + let notified = sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +struct Runner<'a> { + /// The executor state. + state: &'a State, + + /// Inner ticker. + ticker: Ticker<'a>, + + /// Bumped every time a runnable task is found. + ticks: usize, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let runner = Runner { + state, + ticker: Ticker::new(state), + ticks: 0, + }; + runner + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self) -> Runnable { + let runnable = self + .ticker + .runnable_with(|| { + // Try popping from the queue. + self.state.queue.borrow_mut().pop_back() + }) + .await; + + // Bump the tick counter. + self.ticks = self.ticks.wrapping_add(1); + + runnable + } +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_executor( + executor: &LocalExecutor<'_>, + name: &str, + f: &mut fmt::Formatter<'_>, +) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.get(); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Arc::new and converted via Arc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a RefCell>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_borrow() { + Ok(lock) => fmt::Debug::fmt(&lock.len(), f), + Err(BorrowError { .. }) => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a RefCell); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_borrow() { + Ok(sleepers) => fmt::Debug::fmt(&sleepers.count, f), + Err(BorrowError { .. }) => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + .field("global_tasks", &state.queue.borrow().len()) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} diff --git a/src/static_executors.rs b/src/static_executors.rs index c43679d..32002c3 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -1,3 +1,4 @@ +use crate::local_executor::State as LocalState; use crate::{debug_state, Executor, LocalExecutor, State}; use async_task::{Builder, Runnable, Task}; use slab::Slab; @@ -84,15 +85,15 @@ impl LocalExecutor<'static> { /// future::block_on(ex.run(task)); /// ``` pub fn leak(self) -> &'static StaticLocalExecutor { - let ptr = self.inner.state_ptr(); + let ptr = self.state_ptr(); // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid // when accessed through state_ptr. This executor will live for the full 'static // lifetime so this isn't an arbitrary lifetime extension. - let state: &'static State = unsafe { &*ptr }; + let state: &'static LocalState = unsafe { &*ptr }; std::mem::forget(self); - let mut active = state.active.lock().unwrap(); + let mut active = state.active.borrow_mut(); if !active.is_empty() { // Reschedule all of the active tasks. for waker in active.drain() { @@ -311,7 +312,7 @@ impl Default for StaticExecutor { /// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html #[repr(transparent)] pub struct StaticLocalExecutor { - state: State, + state: LocalState, marker_: PhantomData>, } @@ -320,7 +321,7 @@ impl RefUnwindSafe for StaticLocalExecutor {} impl fmt::Debug for StaticLocalExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(&self.state, "StaticLocalExecutor", f) + crate::local_executor::debug_state(&self.state, "StaticLocalExecutor", f) } } @@ -338,7 +339,7 @@ impl StaticLocalExecutor { /// ``` pub const fn new() -> Self { Self { - state: State::new(), + state: LocalState::new(), marker_: PhantomData, } } @@ -360,9 +361,33 @@ impl StaticLocalExecutor { /// }); /// ``` pub fn spawn(&'static self, future: impl Future + 'static) -> Task { - let (runnable, task) = Builder::new() - .propagate_panic(true) - .spawn_local(|()| future, self.schedule()); + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; runnable.schedule(); task } @@ -464,12 +489,11 @@ impl StaticLocalExecutor { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state: &'static State = &self.state; + fn schedule(&'static self) -> impl Fn(Runnable) + 'static { + let state: &'static LocalState = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); + state.queue.borrow_mut().push_front(runnable); } } } diff --git a/tests/drop.rs b/tests/drop.rs index 5d089b5..50cdf14 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; use std::task::{Poll, Waker}; -use async_executor::{Executor, Task}; +use async_executor::{Executor, LocalExecutor, Task}; use futures_lite::future; use once_cell::sync::Lazy; @@ -39,6 +39,36 @@ fn executor_cancels_everything() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn local_executor_cancels_everything() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Lazy>> = Lazy::new(Default::default); + + let ex = LocalExecutor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.tick()); + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[cfg(not(miri))] #[test] fn leaked_executor_leaks_everything() { @@ -70,6 +100,37 @@ fn leaked_executor_leaks_everything() { assert_eq!(DROP.load(Ordering::SeqCst), 0); } +#[cfg(not(miri))] +#[test] +fn leaked_local_executor_leaks_everything() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Lazy>> = Lazy::new(Default::default); + + let ex = LocalExecutor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.tick()); + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + mem::forget(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + assert!(future::block_on(future::poll_once(task)).is_none()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); +} + #[test] fn await_task_after_dropping_executor() { let s: String = "hello".into(); @@ -83,6 +144,19 @@ fn await_task_after_dropping_executor() { drop(s); } +#[test] +fn await_task_after_dropping_local_executor() { + let s: String = "hello".into(); + + let ex = LocalExecutor::new(); + let task: Task<&str> = ex.spawn(async { &*s }); + assert!(ex.try_tick()); + + drop(ex); + assert_eq!(future::block_on(task), "hello"); + drop(s); +} + #[test] fn drop_executor_and_then_drop_finished_task() { static DROP: AtomicUsize = AtomicUsize::new(0); @@ -102,6 +176,25 @@ fn drop_executor_and_then_drop_finished_task() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn drop_local_executor_and_then_drop_finished_task() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = LocalExecutor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + assert!(ex.try_tick()); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[test] fn drop_finished_task_and_then_drop_executor() { static DROP: AtomicUsize = AtomicUsize::new(0); @@ -121,6 +214,25 @@ fn drop_finished_task_and_then_drop_executor() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn drop_finished_task_and_then_drop_local_executor() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = LocalExecutor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + assert!(ex.try_tick()); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[test] fn iterator_panics_mid_run() { let ex = Executor::new(); @@ -138,6 +250,23 @@ fn iterator_panics_mid_run() { assert_eq!(future::block_on(ex.run(task)), 0); } +#[test] +fn local_iterator_panics_mid_run() { + let ex = LocalExecutor::new(); + + let panic = std::panic::catch_unwind(|| { + let mut handles = vec![]; + ex.spawn_many( + (0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }), + &mut handles, + ) + }); + assert!(panic.is_err()); + + let task = ex.spawn(future::ready(0)); + assert_eq!(future::block_on(ex.run(task)), 0); +} + struct CallOnDrop(F); impl Drop for CallOnDrop { diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs index cc57988..5384440 100644 --- a/tests/larger_tasks.rs +++ b/tests/larger_tasks.rs @@ -1,6 +1,6 @@ //! Test for larger tasks. -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::future::{self, block_on}; use futures_lite::prelude::*; @@ -80,6 +80,42 @@ fn do_run>(mut f: impl FnMut(Arc>) -> }); } +fn do_run_local>(mut f: impl FnMut(Arc>) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let _stop_timeout = { + let (stop_timeout, stopper) = async_channel::bounded::<()>(1); + thread::spawn(move || { + block_on(async move { + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }) + }); + stop_timeout + }; + + let ex = Arc::new(LocalExecutor::new()); + + // Test 1: Use the `run` command. + block_on(ex.run(f(ex.clone()))); + + // Test 2: Loop on `tick`. + block_on(async { + let ticker = async { + loop { + ex.tick().await; + } + }; + + f(ex.clone()).or(ticker).await + }); +} + #[test] fn smoke() { do_run(|ex| async move { ex.spawn(async {}).await }); @@ -97,3 +133,21 @@ fn timer() { .await; }) } + +#[test] +fn smoke_local() { + do_run_local(|ex| async move { ex.spawn(async {}).await }); +} + +#[test] +fn yield_now_local() { + do_run_local(|ex| async move { ex.spawn(future::yield_now()).await }) +} + +#[test] +fn timer_local() { + do_run_local(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }) +} diff --git a/tests/local_queue.rs b/tests/local_queue.rs index 4678366..62fe7f2 100644 --- a/tests/local_queue.rs +++ b/tests/local_queue.rs @@ -1,4 +1,4 @@ -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::{future, pin}; #[test] @@ -22,3 +22,25 @@ fn two_queues() { assert!(future::poll_once(run2.as_mut()).await.is_none()); }); } + +#[test] +fn local_two_queues() { + future::block_on(async { + // Create an executor with two runners. + let ex = LocalExecutor::new(); + let (run1, run2) = ( + ex.run(future::pending::<()>()), + ex.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); +} diff --git a/tests/panic_prop.rs b/tests/panic_prop.rs index eab4901..4c2a736 100644 --- a/tests/panic_prop.rs +++ b/tests/panic_prop.rs @@ -1,4 +1,4 @@ -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::{future, prelude::*}; #[test] @@ -12,3 +12,15 @@ fn test_panic_propagation() { // Polling the task should. assert!(future::block_on(task.catch_unwind()).is_err()); } + +#[test] +fn test_local_panic_propagation() { + let ex = LocalExecutor::new(); + let task = ex.spawn(async { panic!("should be caught by the task") }); + + // Running the executor should not panic. + assert!(ex.try_tick()); + + // Polling the task should. + assert!(future::block_on(task.catch_unwind()).is_err()); +} From 9c04535a4a14922f775ca9ac8a04b82f4ad15169 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 22 Jul 2025 18:21:10 -0700 Subject: [PATCH 02/22] Remove remaining atomics --- src/local_executor.rs | 34 +++------------------------------- 1 file changed, 3 insertions(+), 31 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 21170fe..5238967 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -4,7 +4,6 @@ use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Poll, Waker}; use async_task::{Builder, Runnable}; @@ -364,9 +363,6 @@ pub(crate) struct State { /// The global queue. pub(crate) queue: RefCell>, - /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. - notified: AtomicBool, - /// A list of sleeping tickers. sleepers: RefCell, @@ -379,7 +375,6 @@ impl State { pub(crate) const fn new() -> State { State { queue: RefCell::new(VecDeque::new()), - notified: AtomicBool::new(true), sleepers: RefCell::new(Sleepers { count: 0, wakers: Vec::new(), @@ -397,15 +392,9 @@ impl State { /// Notifies a sleeping ticker. #[inline] fn notify(&self) { - if self - .notified - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - let waker = self.sleepers.borrow_mut().notify(); - if let Some(w) = waker { - w.wake(); - } + let waker = self.sleepers.borrow_mut().notify(); + if let Some(w) = waker { + w.wake(); } } @@ -505,11 +494,6 @@ impl Sleepers { true } - /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. - fn is_notified(&self) -> bool { - self.count == 0 || self.count > self.wakers.len() - } - /// Returns notification waker for a sleeping ticker. /// /// If a ticker was notified already or there are no tickers, `None` will be returned. @@ -562,10 +546,6 @@ impl Ticker<'_> { } } - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - true } @@ -574,10 +554,6 @@ impl Ticker<'_> { if self.sleeping != 0 { let mut sleepers = self.state.sleepers.borrow_mut(); sleepers.remove(self.sleeping); - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); } self.sleeping = 0; } @@ -624,10 +600,6 @@ impl Drop for Ticker<'_> { let mut sleepers = self.state.sleepers.borrow_mut(); let notified = sleepers.remove(self.sleeping); - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - // If this ticker was notified, then notify another ticker. if notified { drop(sleepers); From 9c526bbb6a01a898c41f91f68138dc9b9fe44f06 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 22 Jul 2025 19:00:45 -0700 Subject: [PATCH 03/22] Update the safety comments --- src/local_executor.rs | 206 +++++++--------------------------------- src/static_executors.rs | 34 ++++--- 2 files changed, 53 insertions(+), 187 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 5238967..5d039e9 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -10,7 +10,7 @@ use async_task::{Builder, Runnable}; use futures_lite::{future, prelude::*}; use slab::Slab; -use crate::AsyncCallOnDrop; +use crate::{AsyncCallOnDrop, Sleepers}; #[doc(no_inline)] pub use async_task::Task; @@ -101,9 +101,7 @@ impl<'a> LocalExecutor<'a> { /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { let mut active = self.state().active(); - - // SAFETY: `T` and the future are `Send`. - unsafe { self.spawn_inner(future, &mut active) } + self.spawn_inner(future, &mut active) } /// Spawns many tasks onto the executor. @@ -151,20 +149,11 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = Some(self.state().active()); + let mut active = self.state().active(); // Convert the futures into tasks. - let tasks = futures.into_iter().enumerate().map(move |(i, future)| { - // SAFETY: `T` and the future are `Send`. - let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; - - // Yield the lock every once in a while to ease contention. - if i.wrapping_sub(1) % 500 == 0 { - drop(active.take()); - active = Some(self.state().active()); - } - - task + let tasks = futures.into_iter().map(move |future| { + self.spawn_inner(future, &mut active) }); // Push the tasks to the user's collection. @@ -172,11 +161,7 @@ impl<'a> LocalExecutor<'a> { } /// Spawn a future while holding the inner lock. - /// - /// # Safety - /// - /// If this is an `Executor`, `F` and `T` must be `Send`. - unsafe fn spawn_inner( + fn spawn_inner( &self, future: impl Future + 'a, active: &mut Slab, @@ -191,8 +176,7 @@ impl<'a> LocalExecutor<'a> { // // SAFETY: // - // If `future` is not `Send`, this must be a `LocalExecutor` as per this - // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `future` may not `Send`. Since `LocalExecutor` is `!Sync`, // `try_tick`, `tick` and `run` can only be called from the origin // thread of the `LocalExecutor`. Similarly, `spawn` can only be called // from the origin thread, ensuring that `future` and the executor share @@ -206,12 +190,17 @@ impl<'a> LocalExecutor<'a> { // the `Executor` is drained of all of its runnables. This ensures that // runnables are dropped and this precondition is satisfied. // - // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. - let (runnable, task) = Builder::new() + // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not + // `Send`, the `Waker` is guaranteed// to only be used on the same thread + // it was spawned on. + // + // `self.schedule()` is `'static`, and thus will outlive all borrowed + // variables in the future. + let (runnable, task) = unsafe { + Builder::new() .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()); + .spawn_unchecked(|()| future, self.schedule()) + }; entry.insert(runnable.waker()); runnable.schedule(); @@ -314,7 +303,7 @@ impl<'a> LocalExecutor<'a> { /// Returns a reference to the inner state. #[inline] fn state(&self) -> &State { - // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid // when accessed through state_ptr. unsafe { &*self.state_ptr() } } @@ -322,11 +311,11 @@ impl<'a> LocalExecutor<'a> { // Clones the inner state Arc #[inline] fn state_as_rc(&self) -> Rc { - // SAFETY: So long as an Executor lives, it's state pointer will always be a valid + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be a valid // Arc when accessed through state_ptr. - let arc = unsafe { Rc::from_raw(self.state_ptr()) }; - let clone = arc.clone(); - std::mem::forget(arc); + let rc = unsafe { Rc::from_raw(self.state_ptr()) }; + let clone = rc.clone(); + std::mem::forget(rc); clone } } @@ -367,7 +356,7 @@ pub(crate) struct State { sleepers: RefCell, /// Currently active tasks. - pub(crate) active: RefCell>, + active: RefCell>, } impl State { @@ -385,15 +374,14 @@ impl State { } /// Returns a reference to currently active tasks. - fn active(&self) -> RefMut<'_, Slab> { + pub(crate) fn active(&self) -> RefMut<'_, Slab> { self.active.borrow_mut() } /// Notifies a sleeping ticker. #[inline] - fn notify(&self) { - let waker = self.sleepers.borrow_mut().notify(); - if let Some(w) = waker { + pub(crate) fn notify(&self) { + if let Some(w) = self.sleepers.borrow_mut().notify() { w.wake(); } } @@ -402,10 +390,6 @@ impl State { match self.queue.borrow_mut().pop_back() { None => false, Some(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.notify(); - // Run the task. runnable.run(); true @@ -414,20 +398,16 @@ impl State { } pub(crate) async fn tick(&self) { - let runnable = Ticker::new(self).runnable().await; - runnable.run(); + Ticker::new(self).runnable().await.run(); } pub async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self); + let mut ticker = Ticker::new(self); // A future that runs tasks forever. let run_forever = async { loop { - for _ in 0..200 { - let runnable = runner.runnable().await; - runnable.run(); - } + ticker.runnable().await.run(); future::yield_now().await; } }; @@ -437,75 +417,6 @@ impl State { } } -/// A list of sleeping tickers. -struct Sleepers { - /// Number of sleeping tickers (both notified and unnotified). - count: usize, - - /// IDs and wakers of sleeping unnotified tickers. - /// - /// A sleeping ticker is notified when its waker is missing from this list. - wakers: Vec<(usize, Waker)>, - - /// Reclaimed IDs. - free_ids: Vec, -} - -impl Sleepers { - /// Inserts a new sleeping ticker. - fn insert(&mut self, waker: &Waker) -> usize { - let id = match self.free_ids.pop() { - Some(id) => id, - None => self.count + 1, - }; - self.count += 1; - self.wakers.push((id, waker.clone())); - id - } - - /// Re-inserts a sleeping ticker's waker if it was notified. - /// - /// Returns `true` if the ticker was notified. - fn update(&mut self, id: usize, waker: &Waker) -> bool { - for item in &mut self.wakers { - if item.0 == id { - item.1.clone_from(waker); - return false; - } - } - - self.wakers.push((id, waker.clone())); - true - } - - /// Removes a previously inserted sleeping ticker. - /// - /// Returns `true` if the ticker was notified. - fn remove(&mut self, id: usize) -> bool { - self.count -= 1; - self.free_ids.push(id); - - for i in (0..self.wakers.len()).rev() { - if self.wakers[i].0 == id { - self.wakers.remove(i); - return false; - } - } - true - } - - /// Returns notification waker for a sleeping ticker. - /// - /// If a ticker was notified already or there are no tickers, `None` will be returned. - fn notify(&mut self) -> Option { - if self.wakers.len() == self.count { - self.wakers.pop().map(|item| item.1) - } else { - None - } - } -} - /// Runs task one by one. struct Ticker<'a> { /// The executor state. @@ -552,23 +463,16 @@ impl Ticker<'_> { /// Moves the ticker into woken state. fn wake(&mut self) { if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.borrow_mut(); - sleepers.remove(self.sleeping); + self.state.sleepers.borrow_mut().remove(self.sleeping); } self.sleeping = 0; } - /// Waits for the next runnable task to run. - async fn runnable(&mut self) -> Runnable { - self.runnable_with(|| self.state.queue.borrow_mut().pop_back()) - .await - } - /// Waits for the next runnable task to run, given a function that searches for a task. - async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { + async fn runnable(&mut self) -> Runnable { future::poll_fn(|cx| { loop { - match search() { + match self.state.queue.borrow_mut().pop_back() { None => { // Move to sleeping and unnotified state. if !self.sleep(cx.waker()) { @@ -580,10 +484,6 @@ impl Ticker<'_> { // Wake up. self.wake(); - // Notify another ticker now to pick up where this ticker left off, just in - // case running the task takes a long time. - self.state.notify(); - return Poll::Ready(r); } } @@ -609,48 +509,6 @@ impl Drop for Ticker<'_> { } } -/// A worker in a work-stealing executor. -/// -/// This is just a ticker that also has an associated local queue for improved cache locality. -struct Runner<'a> { - /// The executor state. - state: &'a State, - - /// Inner ticker. - ticker: Ticker<'a>, - - /// Bumped every time a runnable task is found. - ticks: usize, -} - -impl Runner<'_> { - /// Creates a runner and registers it in the executor state. - fn new(state: &State) -> Runner<'_> { - let runner = Runner { - state, - ticker: Ticker::new(state), - ticks: 0, - }; - runner - } - - /// Waits for the next runnable task to run. - async fn runnable(&mut self) -> Runnable { - let runnable = self - .ticker - .runnable_with(|| { - // Try popping from the queue. - self.state.queue.borrow_mut().pop_back() - }) - .await; - - // Bump the tick counter. - self.ticks = self.ticks.wrapping_add(1); - - runnable - } -} - /// Debug implementation for `Executor` and `LocalExecutor`. fn debug_executor( executor: &LocalExecutor<'_>, diff --git a/src/static_executors.rs b/src/static_executors.rs index 32002c3..3dc0b8e 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -93,7 +93,7 @@ impl LocalExecutor<'static> { std::mem::forget(self); - let mut active = state.active.borrow_mut(); + let mut active = state.active(); if !active.is_empty() { // Reschedule all of the active tasks. for waker in active.drain() { @@ -402,20 +402,27 @@ impl StaticLocalExecutor { &'static self, future: impl Future + 'a, ) -> Task { + // Create the task and register it in the set of active tasks. + // // SAFETY: // - // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, - // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only - // be called from the origin thread, ensuring that `future` and the executor - // share the same origin thread. The `Runnable` can be scheduled from other - // threads, but because of the above `Runnable` can only be called or - // dropped on the origin thread. - // - `future` is not `'static`, but the caller guarantees that the - // task, and thus its `Runnable` must not live longer than `'a`. - // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. + // `future` may not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // + // `self.schedule()` is not `Send` nor `Sync`. As StaticLocalExecutor is not + // `Send`, the `Waker` is guaranteed// to only be used on the same thread + // it was spawned on. + // + // `self.schedule()` is `'static`, and thus will outlive all borrowed + // variables in the future. let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) @@ -494,6 +501,7 @@ impl StaticLocalExecutor { // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { state.queue.borrow_mut().push_front(runnable); + state.notify(); } } } From f3ad06df8a6573be5b12604001813ec095d9063b Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 13:04:41 -0700 Subject: [PATCH 04/22] CI Cleanup --- src/local_executor.rs | 18 ++++++++++-------- src/static_executors.rs | 4 ++-- tests/larger_tasks.rs | 3 ++- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 5d039e9..9e4d94f 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -152,9 +152,9 @@ impl<'a> LocalExecutor<'a> { let mut active = self.state().active(); // Convert the futures into tasks. - let tasks = futures.into_iter().map(move |future| { - self.spawn_inner(future, &mut active) - }); + let tasks = futures + .into_iter() + .map(move |future| self.spawn_inner(future, &mut active)); // Push the tasks to the user's collection. handles.extend(tasks); @@ -190,16 +190,16 @@ impl<'a> LocalExecutor<'a> { // the `Executor` is drained of all of its runnables. This ensures that // runnables are dropped and this precondition is satisfied. // - // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not + // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not // `Send`, the `Waker` is guaranteed// to only be used on the same thread // it was spawned on. // - // `self.schedule()` is `'static`, and thus will outlive all borrowed + // `self.schedule()` is `'static`, and thus will outlive all borrowed // variables in the future. let (runnable, task) = unsafe { Builder::new() - .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()) + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) }; entry.insert(runnable.waker()); @@ -407,7 +407,9 @@ impl State { // A future that runs tasks forever. let run_forever = async { loop { - ticker.runnable().await.run(); + for _ in 0..200 { + ticker.runnable().await.run(); + } future::yield_now().await; } }; diff --git a/src/static_executors.rs b/src/static_executors.rs index 3dc0b8e..c81f561 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -417,11 +417,11 @@ impl StaticLocalExecutor { // `future` is not `'static`, but the caller guarantees that the // task, and thus its `Runnable` must not live longer than `'a`. // - // `self.schedule()` is not `Send` nor `Sync`. As StaticLocalExecutor is not + // `self.schedule()` is not `Send` nor `Sync`. As StaticLocalExecutor is not // `Send`, the `Waker` is guaranteed// to only be used on the same thread // it was spawned on. // - // `self.schedule()` is `'static`, and thus will outlive all borrowed + // `self.schedule()` is `'static`, and thus will outlive all borrowed // variables in the future. let (runnable, task) = unsafe { Builder::new() diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs index 5384440..51e555c 100644 --- a/tests/larger_tasks.rs +++ b/tests/larger_tasks.rs @@ -5,6 +5,7 @@ use futures_lite::future::{self, block_on}; use futures_lite::prelude::*; use std::sync::Arc; +use std::rc::Rc; use std::thread; use std::time::Duration; @@ -99,7 +100,7 @@ fn do_run_local>(mut f: impl FnMut(Arc Date: Fri, 8 Aug 2025 22:00:56 -0700 Subject: [PATCH 05/22] Update benchmarks --- Cargo.toml | 8 + benches/local_executor.rs | 426 ++++++++++++++++++++++++++++++++++++++ tests/larger_tasks.rs | 4 +- 3 files changed, 436 insertions(+), 2 deletions(-) create mode 100644 benches/local_executor.rs diff --git a/Cargo.toml b/Cargo.toml index c8feeb5..c0dd00c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,9 @@ exclude = ["/.*"] # Adds support for executors optimized for use in static variables. static = [] +[lib] +bench = false + [dependencies] async-task = "4.4.0" concurrent-queue = "2.5.0" @@ -44,5 +47,10 @@ name = "executor" harness = false required-features = ["static"] +[[bench]] +name = "local_executor" +harness = false +required-features = ["static"] + [package.metadata.docs.rs] all-features = true diff --git a/benches/local_executor.rs b/benches/local_executor.rs new file mode 100644 index 0000000..2fcccc9 --- /dev/null +++ b/benches/local_executor.rs @@ -0,0 +1,426 @@ +use std::mem; + +use async_executor::{Executor, LocalExecutor, StaticLocalExecutor}; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures_lite::{future, prelude::*}; + +const TASKS: usize = 300; +const STEPS: usize = 300; +const LIGHT_TASKS: usize = 25_000; + +fn run(f: impl FnOnce(&'static LocalExecutor<'_>)) { + f(Box::leak(Box::new(LocalExecutor::new()))); +} + +fn run_static(f: impl FnOnce(&'static StaticLocalExecutor)) { + f(LocalExecutor::new().leak()) +} + +fn create(c: &mut Criterion) { + c.bench_function("executor::create", |b| { + b.iter(|| { + let ex = Executor::new(); + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }) + }); +} + +fn running_benches(c: &mut Criterion) { + for (prefix, with_static) in [("local_executor", false), ("static_local_executor", true)] { + let mut group = c.benchmark_group("single_thread"); + + group.bench_function(format!("{prefix}::spawn_one"), |b| { + if with_static { + run_static(|ex| { + b.iter(|| { + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }); + }); + } else { + run(|ex| { + b.iter(|| { + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }); + }); + } + }); + + if !with_static { + group.bench_function("executor::spawn_batch", |b| { + run(|ex| { + let mut handles = vec![]; + + b.iter(|| { + ex.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + handles.clear(); + }); + + handles.clear(); + }) + }); + } + + group.bench_function(format!("{prefix}::spawn_many_local"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(ex.spawn(async {})); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(ex.spawn(async {})); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::spawn_recursively"), |b| { + #[allow(clippy::manual_async_fn)] + fn go( + ex: &'static LocalExecutor<'static>, + i: usize, + ) -> impl Future + 'static { + async move { + if i != 0 { + ex.spawn(async move { + let fut = Box::pin(go(ex, i - 1)); + fut.await; + }) + .await; + } + } + } + + #[allow(clippy::manual_async_fn)] + fn go_static( + ex: &'static StaticLocalExecutor, + i: usize, + ) -> impl Future + 'static { + async move { + if i != 0 { + ex.spawn(async move { + let fut = Box::pin(go_static(ex, i - 1)); + fut.await; + }) + .await; + } + } + } + + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(go_static(ex, STEPS))); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(go(ex, STEPS))); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::yield_now"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::channels"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = mem::replace(&mut current_recv, next_recv); + + tasks.push(ex.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + })); + }); + }) + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = mem::replace(&mut current_recv, next_recv); + + tasks.push(ex.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + })); + }); + }) + } + }); + + group.bench_function(format!("{prefix}::web_server"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let (db_send, db_recv) = + async_channel::bounded::>(TASKS / 5); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = ex.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = ex.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = ex.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + })); + }) + }) + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let (db_send, db_recv) = + async_channel::bounded::>(TASKS / 5); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = ex.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = ex.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = ex.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + criterion::black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + })); + }) + }) + } + }); + } +} + +criterion_group!(benches, create, running_benches); + +criterion_main!(benches); diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs index 51e555c..8961646 100644 --- a/tests/larger_tasks.rs +++ b/tests/larger_tasks.rs @@ -4,8 +4,8 @@ use async_executor::{Executor, LocalExecutor}; use futures_lite::future::{self, block_on}; use futures_lite::prelude::*; -use std::sync::Arc; use std::rc::Rc; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -81,7 +81,7 @@ fn do_run>(mut f: impl FnMut(Arc>) -> }); } -fn do_run_local>(mut f: impl FnMut(Arc>) -> Fut) { +fn do_run_local>(mut f: impl FnMut(Rc>) -> Fut) { // This should not run for longer than two minutes. #[cfg(not(miri))] let _stop_timeout = { From b03a9d20b8e3213cd8f446225bfe9da8e2f0ef89 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:01:15 -0700 Subject: [PATCH 06/22] Use UnsafeCell instead of RefCell --- src/local_executor.rs | 89 +++++++++++++++++++++-------------------- src/static_executors.rs | 21 ++++++---- 2 files changed, 58 insertions(+), 52 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 9e4d94f..52b38fd 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -1,4 +1,4 @@ -use std::cell::{BorrowError, Cell, RefCell, RefMut}; +use std::cell::{Cell, UnsafeCell}; use std::collections::VecDeque; use std::fmt; use std::marker::PhantomData; @@ -83,7 +83,7 @@ impl<'a> LocalExecutor<'a> { /// assert!(local_ex.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.state().active().is_empty() + unsafe { &*self.state().active.get() }.is_empty() } /// Spawns a task onto the executor. @@ -100,8 +100,8 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.state().active(); - self.spawn_inner(future, &mut active) + let active = unsafe { &mut *self.state().active.get() }; + self.spawn_inner(future, active) } /// Spawns many tasks onto the executor. @@ -149,12 +149,12 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = self.state().active(); + let active = unsafe { &mut *self.state().active.get() }; // Convert the futures into tasks. let tasks = futures .into_iter() - .map(move |future| self.spawn_inner(future, &mut active)); + .map(move |future| self.spawn_inner(future, active)); // Push the tasks to the user's collection. handles.extend(tasks); @@ -170,7 +170,9 @@ impl<'a> LocalExecutor<'a> { let entry = active.vacant_entry(); let index = entry.key(); let state = self.state_as_rc(); - let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index))); + let future = AsyncCallOnDrop::new(future, move || { + drop(unsafe { &mut *state.active.get() }.try_remove(index)) + }); // Create the task and register it in the set of active tasks. // @@ -276,7 +278,10 @@ impl<'a> LocalExecutor<'a> { // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.borrow_mut().push_front(runnable); + { + let queue = unsafe { &mut *state.queue.get() }; + queue.push_front(runnable); + } state.notify(); } } @@ -331,13 +336,14 @@ impl Drop for LocalExecutor<'_> { // via Arc::into_raw in state_ptr. let state = unsafe { Rc::from_raw(ptr) }; - let mut active = state.active(); - for w in active.drain() { - w.wake(); + { + let active = unsafe { &mut *state.active.get() }; + for w in active.drain() { + w.wake(); + } } - drop(active); - state.queue.borrow_mut().clear(); + unsafe { &mut *state.queue.get() }.clear(); } } @@ -350,44 +356,41 @@ impl<'a> Default for LocalExecutor<'a> { /// The state of a executor. pub(crate) struct State { /// The global queue. - pub(crate) queue: RefCell>, + pub(crate) queue: UnsafeCell>, /// A list of sleeping tickers. - sleepers: RefCell, + sleepers: UnsafeCell, /// Currently active tasks. - active: RefCell>, + pub(crate) active: UnsafeCell>, } impl State { /// Creates state for a new executor. pub(crate) const fn new() -> State { State { - queue: RefCell::new(VecDeque::new()), - sleepers: RefCell::new(Sleepers { + queue: UnsafeCell::new(VecDeque::new()), + sleepers: UnsafeCell::new(Sleepers { count: 0, wakers: Vec::new(), free_ids: Vec::new(), }), - active: RefCell::new(Slab::new()), + active: UnsafeCell::new(Slab::new()), } } - /// Returns a reference to currently active tasks. - pub(crate) fn active(&self) -> RefMut<'_, Slab> { - self.active.borrow_mut() - } - /// Notifies a sleeping ticker. #[inline] pub(crate) fn notify(&self) { - if let Some(w) = self.sleepers.borrow_mut().notify() { + let waker = unsafe { &mut *self.sleepers.get() }.notify(); + if let Some(w) = waker { w.wake(); } } pub(crate) fn try_tick(&self) -> bool { - match self.queue.borrow_mut().pop_back() { + let runnable = unsafe { &mut *self.queue.get() }.pop_back(); + match runnable { None => false, Some(runnable) => { // Run the task. @@ -443,16 +446,16 @@ impl Ticker<'_> { /// /// Returns `false` if the ticker was already sleeping and unnotified. fn sleep(&mut self, waker: &Waker) -> bool { - let mut sleepers = self.state.sleepers.borrow_mut(); - match self.sleeping { // Move to sleeping state. 0 => { + let sleepers = unsafe { &mut *self.state.sleepers.get() }; self.sleeping = sleepers.insert(waker); } // Already sleeping, check if notified. id => { + let sleepers = unsafe { &mut *self.state.sleepers.get() }; if !sleepers.update(id, waker) { return false; } @@ -465,7 +468,8 @@ impl Ticker<'_> { /// Moves the ticker into woken state. fn wake(&mut self) { if self.sleeping != 0 { - self.state.sleepers.borrow_mut().remove(self.sleeping); + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + sleepers.remove(self.sleeping); } self.sleeping = 0; } @@ -474,7 +478,7 @@ impl Ticker<'_> { async fn runnable(&mut self) -> Runnable { future::poll_fn(|cx| { loop { - match self.state.queue.borrow_mut().pop_back() { + match unsafe { &mut *self.state.queue.get() }.pop_back() { None => { // Move to sleeping and unnotified state. if !self.sleep(cx.waker()) { @@ -499,12 +503,13 @@ impl Drop for Ticker<'_> { fn drop(&mut self) { // If this ticker is in sleeping state, it must be removed from the sleepers list. if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.borrow_mut(); - let notified = sleepers.remove(self.sleeping); + let notified = { + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + sleepers.remove(self.sleeping) + }; // If this ticker was notified, then notify another ticker. if notified { - drop(sleepers); self.state.notify(); } } @@ -543,32 +548,28 @@ fn debug_executor( /// Debug implementation for `Executor` and `LocalExecutor`. pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Debug wrapper for the number of active tasks. - struct ActiveTasks<'a>(&'a RefCell>); + struct ActiveTasks<'a>(&'a UnsafeCell>); impl fmt::Debug for ActiveTasks<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_borrow() { - Ok(lock) => fmt::Debug::fmt(&lock.len(), f), - Err(BorrowError { .. }) => f.write_str(""), - } + let active = unsafe { &*self.0.get() }; + fmt::Debug::fmt(&active.len(), f) } } /// Debug wrapper for the sleepers. - struct SleepCount<'a>(&'a RefCell); + struct SleepCount<'a>(&'a UnsafeCell); impl fmt::Debug for SleepCount<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_borrow() { - Ok(sleepers) => fmt::Debug::fmt(&sleepers.count, f), - Err(BorrowError { .. }) => f.write_str(""), - } + let sleepers = unsafe { &*self.0.get() }; + fmt::Debug::fmt(&sleepers.count, f) } } f.debug_struct(name) .field("active", &ActiveTasks(&state.active)) - .field("global_tasks", &state.queue.borrow().len()) + .field("global_tasks", &unsafe { &*state.queue.get() }.len()) .field("sleepers", &SleepCount(&state.sleepers)) .finish() } diff --git a/src/static_executors.rs b/src/static_executors.rs index c81f561..5e88538 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -93,14 +93,16 @@ impl LocalExecutor<'static> { std::mem::forget(self); - let mut active = state.active(); - if !active.is_empty() { - // Reschedule all of the active tasks. - for waker in active.drain() { - waker.wake(); + { + let active = unsafe { &mut *state.active.get() }; + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); } - // Overwrite to ensure that the slab is deallocated. - *active = Slab::new(); } // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). @@ -500,7 +502,10 @@ impl StaticLocalExecutor { let state: &'static LocalState = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.borrow_mut().push_front(runnable); + { + let queue = unsafe { &mut *state.queue.get() }; + queue.push_front(runnable); + } state.notify(); } } From 7e6cfe6b58ac2a5fccf5fce75d4adc78a5218d6d Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:08:59 -0700 Subject: [PATCH 07/22] Safety comments --- Cargo.toml | 2 +- src/local_executor.rs | 52 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c0dd00c..b1c5d9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ futures-lite = { version = "2.0.0", default-features = false, features = ["std"] async-channel = "2.0.0" async-io = "2.1.0" async-lock = "3.0.0" -criterion = { version = "0.7", default-features = false, features = ["cargo_bench_support"] } +criterion = { version = "0.5", default-features = false, features = ["cargo_bench_support"] } easy-parallel = "3.1.0" fastrand = "2.0.0" futures-lite = "2.0.0" diff --git a/src/local_executor.rs b/src/local_executor.rs index 52b38fd..fb8e4b9 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -83,6 +83,9 @@ impl<'a> LocalExecutor<'a> { /// assert!(local_ex.is_empty()); /// ``` pub fn is_empty(&self) -> bool { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. unsafe { &*self.state().active.get() }.is_empty() } @@ -100,6 +103,9 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. let active = unsafe { &mut *self.state().active.get() }; self.spawn_inner(future, active) } @@ -149,6 +155,9 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. let active = unsafe { &mut *self.state().active.get() }; // Convert the futures into tasks. @@ -171,6 +180,9 @@ impl<'a> LocalExecutor<'a> { let index = entry.key(); let state = self.state_as_rc(); let future = AsyncCallOnDrop::new(future, move || { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. drop(unsafe { &mut *state.active.get() }.try_remove(index)) }); @@ -276,9 +288,11 @@ impl<'a> LocalExecutor<'a> { fn schedule(&self) -> impl Fn(Runnable) + 'static { let state = self.state_as_rc(); - // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. let queue = unsafe { &mut *state.queue.get() }; queue.push_front(runnable); } @@ -337,12 +351,18 @@ impl Drop for LocalExecutor<'_> { let state = unsafe { Rc::from_raw(ptr) }; { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. let active = unsafe { &mut *state.active.get() }; for w in active.drain() { w.wake(); } } + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. unsafe { &mut *state.queue.get() }.clear(); } } @@ -382,6 +402,9 @@ impl State { /// Notifies a sleeping ticker. #[inline] pub(crate) fn notify(&self) { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let waker = unsafe { &mut *self.sleepers.get() }.notify(); if let Some(w) = waker { w.wake(); @@ -389,6 +412,9 @@ impl State { } pub(crate) fn try_tick(&self) -> bool { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. let runnable = unsafe { &mut *self.queue.get() }.pop_back(); match runnable { None => false, @@ -449,12 +475,18 @@ impl Ticker<'_> { match self.sleeping { // Move to sleeping state. 0 => { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let sleepers = unsafe { &mut *self.state.sleepers.get() }; self.sleeping = sleepers.insert(waker); } // Already sleeping, check if notified. id => { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let sleepers = unsafe { &mut *self.state.sleepers.get() }; if !sleepers.update(id, waker) { return false; @@ -468,6 +500,9 @@ impl Ticker<'_> { /// Moves the ticker into woken state. fn wake(&mut self) { if self.sleeping != 0 { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let sleepers = unsafe { &mut *self.state.sleepers.get() }; sleepers.remove(self.sleeping); } @@ -478,6 +513,9 @@ impl Ticker<'_> { async fn runnable(&mut self) -> Runnable { future::poll_fn(|cx| { loop { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. match unsafe { &mut *self.state.queue.get() }.pop_back() { None => { // Move to sleeping and unnotified state. @@ -504,6 +542,9 @@ impl Drop for Ticker<'_> { // If this ticker is in sleeping state, it must be removed from the sleepers list. if self.sleeping != 0 { let notified = { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let sleepers = unsafe { &mut *self.state.sleepers.get() }; sleepers.remove(self.sleeping) }; @@ -552,6 +593,9 @@ pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) impl fmt::Debug for ActiveTasks<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. let active = unsafe { &*self.0.get() }; fmt::Debug::fmt(&active.len(), f) } @@ -562,6 +606,9 @@ pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) impl fmt::Debug for SleepCount<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. let sleepers = unsafe { &*self.0.get() }; fmt::Debug::fmt(&sleepers.count, f) } @@ -569,6 +616,9 @@ pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) f.debug_struct(name) .field("active", &ActiveTasks(&state.active)) + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. .field("global_tasks", &unsafe { &*state.queue.get() }.len()) .field("sleepers", &SleepCount(&state.sleepers)) .finish() From 844aa791afe3074c5b295baad08e113cf8a82638 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:14:16 -0700 Subject: [PATCH 08/22] More safety comments --- src/static_executors.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/static_executors.rs b/src/static_executors.rs index 5e88538..34fac92 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -94,6 +94,9 @@ impl LocalExecutor<'static> { std::mem::forget(self); { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. let active = unsafe { &mut *state.active.get() }; if !active.is_empty() { // Reschedule all of the active tasks. @@ -500,9 +503,11 @@ impl StaticLocalExecutor { /// Returns a function that schedules a runnable task when it gets woken up. fn schedule(&'static self) -> impl Fn(Runnable) + 'static { let state: &'static LocalState = &self.state; - // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. let queue = unsafe { &mut *state.queue.get() }; queue.push_front(runnable); } From f25ec4e44d63475ade2292bf2b1337c795411494 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:17:00 -0700 Subject: [PATCH 09/22] Fix deprecated items --- benches/executor.rs | 4 +++- benches/local_executor.rs | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index 5fc140f..2184755 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -462,7 +462,9 @@ fn running_benches(c: &mut Criterion) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - black_box(resp_recv.recv().await.unwrap()); + core::hint::black_box( + resp_recv.recv().await.unwrap(), + ); } // Send the data back... diff --git a/benches/local_executor.rs b/benches/local_executor.rs index 2fcccc9..f004d5b 100644 --- a/benches/local_executor.rs +++ b/benches/local_executor.rs @@ -320,7 +320,7 @@ fn running_benches(c: &mut Criterion) { for _ in 0..web_rng.usize(STEPS / 2..STEPS) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - criterion::black_box(resp_recv.recv().await.unwrap()); + core::hint::black_box(resp_recv.recv().await.unwrap()); } // Send the data back... @@ -393,7 +393,7 @@ fn running_benches(c: &mut Criterion) { for _ in 0..web_rng.usize(STEPS / 2..STEPS) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - criterion::black_box(resp_recv.recv().await.unwrap()); + core::hint::black_box(resp_recv.recv().await.unwrap()); } // Send the data back... From 2f27ec05b975d71ebd8cd6f06fb0299f3245c68e Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:18:24 -0700 Subject: [PATCH 10/22] Update MSRV to 1.68 --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b1c5d9c..b7a55b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.13.2" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.63" +rust-version = "1.68" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" @@ -36,7 +36,7 @@ futures-lite = { version = "2.0.0", default-features = false, features = ["std"] async-channel = "2.0.0" async-io = "2.1.0" async-lock = "3.0.0" -criterion = { version = "0.5", default-features = false, features = ["cargo_bench_support"] } +criterion = { version = "0.6", default-features = false, features = ["cargo_bench_support"] } easy-parallel = "3.1.0" fastrand = "2.0.0" futures-lite = "2.0.0" From 48a17a5b3181433eb79a93caeaec5937847abe84 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:25:42 -0700 Subject: [PATCH 11/22] Use criterion 0.7 --- Cargo.toml | 2 +- benches/executor.rs | 2 +- benches/local_executor.rs | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7a55b4..f3a7340 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ futures-lite = { version = "2.0.0", default-features = false, features = ["std"] async-channel = "2.0.0" async-io = "2.1.0" async-lock = "3.0.0" -criterion = { version = "0.6", default-features = false, features = ["cargo_bench_support"] } +criterion = { version = "0.7", default-features = false, features = ["cargo_bench_support"] } easy-parallel = "3.1.0" fastrand = "2.0.0" futures-lite = "2.0.0" diff --git a/benches/executor.rs b/benches/executor.rs index 2184755..696f4cc 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -462,7 +462,7 @@ fn running_benches(c: &mut Criterion) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - core::hint::black_box( + black_box( resp_recv.recv().await.unwrap(), ); } diff --git a/benches/local_executor.rs b/benches/local_executor.rs index f004d5b..bbe57be 100644 --- a/benches/local_executor.rs +++ b/benches/local_executor.rs @@ -1,4 +1,5 @@ use std::mem; +use std::hint::black_box; use async_executor::{Executor, LocalExecutor, StaticLocalExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; @@ -320,7 +321,7 @@ fn running_benches(c: &mut Criterion) { for _ in 0..web_rng.usize(STEPS / 2..STEPS) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - core::hint::black_box(resp_recv.recv().await.unwrap()); + black_box(resp_recv.recv().await.unwrap()); } // Send the data back... @@ -393,7 +394,7 @@ fn running_benches(c: &mut Criterion) { for _ in 0..web_rng.usize(STEPS / 2..STEPS) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - core::hint::black_box(resp_recv.recv().await.unwrap()); + black_box(resp_recv.recv().await.unwrap()); } // Send the data back... From 099072bbaca6336584008089ebf948c2db20eb5b Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 22:26:39 -0700 Subject: [PATCH 12/22] Formatting --- benches/executor.rs | 4 +--- benches/local_executor.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index 696f4cc..5fc140f 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -462,9 +462,7 @@ fn running_benches(c: &mut Criterion) { let (resp_send, resp_recv) = async_channel::bounded(1); db_send.send(resp_send).await.unwrap(); - black_box( - resp_recv.recv().await.unwrap(), - ); + black_box(resp_recv.recv().await.unwrap()); } // Send the data back... diff --git a/benches/local_executor.rs b/benches/local_executor.rs index bbe57be..c0bc52d 100644 --- a/benches/local_executor.rs +++ b/benches/local_executor.rs @@ -1,5 +1,5 @@ -use std::mem; use std::hint::black_box; +use std::mem; use async_executor::{Executor, LocalExecutor, StaticLocalExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; From 68344f3baaff3a2dc53526c7393cc60a58498223 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 8 Aug 2025 23:00:19 -0700 Subject: [PATCH 13/22] Early-out in Runner when the queue is empty. --- src/local_executor.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index fb8e4b9..b514c9c 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -431,13 +431,19 @@ impl State { } pub async fn run(&self, future: impl Future) -> T { - let mut ticker = Ticker::new(self); - // A future that runs tasks forever. let run_forever = async { loop { for _ in 0..200 { - ticker.runnable().await.run(); + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + match unsafe { &mut *self.queue.get() }.pop_back() { + Some(runnable) => { + runnable.run(); + } + None => break, + } } future::yield_now().await; } From c4f077fadb84ced9f110e95cfdcaac8cee155a8f Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 02:31:12 -0700 Subject: [PATCH 14/22] Remove module qualification --- src/local_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index b514c9c..882b97a 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -35,7 +35,7 @@ pub struct LocalExecutor<'a> { state: Cell<*mut State>, /// Makes the `'a` lifetime invariant. - _marker: PhantomData>, + _marker: PhantomData>, } impl UnwindSafe for LocalExecutor<'_> {} From 646c77a707af05144683bb24e2b2bb4cafc6a2af Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 16:50:49 -0700 Subject: [PATCH 15/22] Arc -> Rc --- src/local_executor.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 882b97a..1a057d1 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -327,11 +327,11 @@ impl<'a> LocalExecutor<'a> { unsafe { &*self.state_ptr() } } - // Clones the inner state Arc + // Clones the inner state Rc #[inline] fn state_as_rc(&self) -> Rc { // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be a valid - // Arc when accessed through state_ptr. + // Rc when accessed through state_ptr. let rc = unsafe { Rc::from_raw(self.state_ptr()) }; let clone = rc.clone(); std::mem::forget(rc); @@ -346,8 +346,8 @@ impl Drop for LocalExecutor<'_> { return; } - // SAFETY: As ptr is not null, it was allocated via Arc::new and converted - // via Arc::into_raw in state_ptr. + // SAFETY: As ptr is not null, it was allocated via Rc::new and converted + // via Rc::into_raw in state_ptr. let state = unsafe { Rc::from_raw(ptr) }; { @@ -585,7 +585,7 @@ fn debug_executor( } // SAFETY: If the state pointer is not null, it must have been - // allocated properly by Arc::new and converted via Arc::into_raw + // allocated properly by Rc::new and converted via Rc::into_raw // in state_ptr. let state = unsafe { &*ptr }; From d0a1f9e43fb6d447bebb57f7ea6e18d56fef38ac Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 16:58:24 -0700 Subject: [PATCH 16/22] Refactor spawn_inner --- src/local_executor.rs | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 1a057d1..368400b 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -6,7 +6,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; use std::task::{Poll, Waker}; -use async_task::{Builder, Runnable}; +use async_task::{Builder, Runnable, Schedule}; use futures_lite::{future, prelude::*}; use slab::Slab; @@ -103,11 +103,13 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { + let state = self.state_as_rc(); // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because // `LocalExecutor` is !Send, there is no way to have concurrent access to the // values in `State`, including the active field. let active = unsafe { &mut *self.state().active.get() }; - self.spawn_inner(future, active) + let schedule = Self::schedule(state.clone()); + Self::spawn_inner(state, future, active, schedule) } /// Spawns many tasks onto the executor. @@ -155,15 +157,20 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because - // `LocalExecutor` is !Send, there is no way to have concurrent access to the - // values in `State`, including the active field. - let active = unsafe { &mut *self.state().active.get() }; + let tasks = { + let state = self.state_as_rc(); - // Convert the futures into tasks. - let tasks = futures - .into_iter() - .map(move |future| self.spawn_inner(future, active)); + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &mut *state.active.get() }; + + // Convert the futures into tasks. + futures.into_iter().map(move |future| { + let schedule = Self::schedule(state.clone()); + Self::spawn_inner(state.clone(), future, active, schedule) + }) + }; // Push the tasks to the user's collection. handles.extend(tasks); @@ -171,14 +178,14 @@ impl<'a> LocalExecutor<'a> { /// Spawn a future while holding the inner lock. fn spawn_inner( - &self, + state: Rc, future: impl Future + 'a, active: &mut Slab, + schedule: impl Schedule + 'static, ) -> Task { // Remove the task from the set of active tasks when the future finishes. let entry = active.vacant_entry(); let index = entry.key(); - let state = self.state_as_rc(); let future = AsyncCallOnDrop::new(future, move || { // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because // `LocalExecutor` is !Send, there is no way to have concurrent access to the @@ -204,16 +211,16 @@ impl<'a> LocalExecutor<'a> { // the `Executor` is drained of all of its runnables. This ensures that // runnables are dropped and this precondition is satisfied. // - // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not + // `schedule` is not `Send` nor `Sync`. As LocalExecutor is not // `Send`, the `Waker` is guaranteed// to only be used on the same thread // it was spawned on. // - // `self.schedule()` is `'static`, and thus will outlive all borrowed + // `schedule` is `'static`, and thus will outlive all borrowed // variables in the future. let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()) + .spawn_unchecked(|()| future, schedule) }; entry.insert(runnable.waker()); @@ -285,9 +292,7 @@ impl<'a> LocalExecutor<'a> { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + 'static { - let state = self.state_as_rc(); - + fn schedule(state: Rc) -> impl Fn(Runnable) + 'static { move |runnable| { { // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because From c7a5a1db2a6adfe26c3613d0c15bf194d3e65a07 Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 18:59:08 -0700 Subject: [PATCH 17/22] Use Pin instead of Rc cloning --- src/local_executor.rs | 46 +++++++++++++------------------------------ 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 368400b..84db9dd 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -3,10 +3,11 @@ use std::collections::VecDeque; use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::pin::Pin; use std::rc::Rc; use std::task::{Poll, Waker}; -use async_task::{Builder, Runnable, Schedule}; +use async_task::{Builder, Runnable}; use futures_lite::{future, prelude::*}; use slab::Slab; @@ -103,13 +104,12 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let state = self.state_as_rc(); + let state = self.state(); // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because // `LocalExecutor` is !Send, there is no way to have concurrent access to the // values in `State`, including the active field. let active = unsafe { &mut *self.state().active.get() }; - let schedule = Self::schedule(state.clone()); - Self::spawn_inner(state, future, active, schedule) + Self::spawn_inner(state, future, active) } /// Spawns many tasks onto the executor. @@ -158,7 +158,7 @@ impl<'a> LocalExecutor<'a> { handles: &mut impl Extend>, ) { let tasks = { - let state = self.state_as_rc(); + let state = self.state(); // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because // `LocalExecutor` is !Send, there is no way to have concurrent access to the @@ -166,10 +166,9 @@ impl<'a> LocalExecutor<'a> { let active = unsafe { &mut *state.active.get() }; // Convert the futures into tasks. - futures.into_iter().map(move |future| { - let schedule = Self::schedule(state.clone()); - Self::spawn_inner(state.clone(), future, active, schedule) - }) + futures + .into_iter() + .map(move |future| Self::spawn_inner(state, future, active)) }; // Push the tasks to the user's collection. @@ -178,10 +177,9 @@ impl<'a> LocalExecutor<'a> { /// Spawn a future while holding the inner lock. fn spawn_inner( - state: Rc, + state: Pin<&'a State>, future: impl Future + 'a, active: &mut Slab, - schedule: impl Schedule + 'static, ) -> Task { // Remove the task from the set of active tasks when the future finishes. let entry = active.vacant_entry(); @@ -220,7 +218,7 @@ impl<'a> LocalExecutor<'a> { let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) - .spawn_unchecked(|()| future, schedule) + .spawn_unchecked(|()| future, Self::schedule(state)) }; entry.insert(runnable.waker()); @@ -292,7 +290,7 @@ impl<'a> LocalExecutor<'a> { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(state: Rc) -> impl Fn(Runnable) + 'static { + fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + 'a { move |runnable| { { // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because @@ -307,12 +305,12 @@ impl<'a> LocalExecutor<'a> { /// Returns a pointer to the inner state. #[inline] - pub(crate) fn state_ptr(&self) -> *const State { + pub(crate) fn state(&self) -> Pin<&'a State> { #[cold] fn alloc_state(cell: &Cell<*mut State>) -> *mut State { debug_assert!(cell.get().is_null()); let state = Rc::new(State::new()); - let ptr = Rc::into_raw(state) as *mut State; + let ptr = Rc::into_raw(state).cast_mut(); cell.set(ptr); ptr } @@ -321,26 +319,10 @@ impl<'a> LocalExecutor<'a> { if ptr.is_null() { ptr = alloc_state(&self.state); } - ptr - } - /// Returns a reference to the inner state. - #[inline] - fn state(&self) -> &State { // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid // when accessed through state_ptr. - unsafe { &*self.state_ptr() } - } - - // Clones the inner state Rc - #[inline] - fn state_as_rc(&self) -> Rc { - // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be a valid - // Rc when accessed through state_ptr. - let rc = unsafe { Rc::from_raw(self.state_ptr()) }; - let clone = rc.clone(); - std::mem::forget(rc); - clone + Pin::new(unsafe { &*ptr }) } } From 62a9331853f837c497acfcc52ed64b581d4038ff Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 19:03:15 -0700 Subject: [PATCH 18/22] Fix static executor builds --- src/local_executor.rs | 2 +- src/static_executors.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 84db9dd..8e362fd 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -33,7 +33,7 @@ pub use async_task::Task; /// ``` pub struct LocalExecutor<'a> { /// The executor state. - state: Cell<*mut State>, + pub(crate) state: Cell<*mut State>, /// Makes the `'a` lifetime invariant. _marker: PhantomData>, diff --git a/src/static_executors.rs b/src/static_executors.rs index 34fac92..c3b7012 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -85,11 +85,16 @@ impl LocalExecutor<'static> { /// future::block_on(ex.run(task)); /// ``` pub fn leak(self) -> &'static StaticLocalExecutor { - let ptr = self.state_ptr(); - // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - let state: &'static LocalState = unsafe { &*ptr }; + let ptr = self.state.get(); + + let state: &'static LocalState = if ptr.is_null() { + Box::leak(Box::new(LocalState::new())) + } else { + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; std::mem::forget(self); From 45e5b900b655f1e63c4adfabbeaeddd7d6ac70d8 Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 19:13:01 -0700 Subject: [PATCH 19/22] Formatting --- src/static_executors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/static_executors.rs b/src/static_executors.rs index c3b7012..3385595 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -88,7 +88,7 @@ impl LocalExecutor<'static> { let ptr = self.state.get(); let state: &'static LocalState = if ptr.is_null() { - Box::leak(Box::new(LocalState::new())) + Box::leak(Box::new(LocalState::new())) } else { // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid // when accessed through state_ptr. This executor will live for the full 'static From 7523891ed606f989eecfdc5a2961db09c7e6b45d Mon Sep 17 00:00:00 2001 From: james7132 Date: Wed, 13 Aug 2025 19:17:52 -0700 Subject: [PATCH 20/22] Update safety comment --- src/local_executor.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/local_executor.rs b/src/local_executor.rs index 8e362fd..66c90e7 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -213,8 +213,9 @@ impl<'a> LocalExecutor<'a> { // `Send`, the `Waker` is guaranteed// to only be used on the same thread // it was spawned on. // - // `schedule` is `'static`, and thus will outlive all borrowed - // variables in the future. + // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) From 6d841340a4ddda280bae309baf64775cc6cd9fd8 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 14 Aug 2025 09:50:22 -0700 Subject: [PATCH 21/22] Create a critical section that will abort on panic --- src/lib.rs | 9 ++++++ src/local_executor.rs | 73 ++++++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 61ae9ec..0eefbb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -918,6 +918,15 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re .finish() } +struct AbortOnPanic; + +impl Drop for AbortOnPanic { + fn drop(&mut self) { + // Panicking while already panicking will result in an abort + panic!("Panicked while in a critical section. Abortign the process"); + } +} + /// Runs a closure when dropped. struct CallOnDrop(F); diff --git a/src/local_executor.rs b/src/local_executor.rs index 66c90e7..b59293e 100644 --- a/src/local_executor.rs +++ b/src/local_executor.rs @@ -1,17 +1,17 @@ use std::cell::{Cell, UnsafeCell}; use std::collections::VecDeque; -use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; use std::rc::Rc; use std::task::{Poll, Waker}; +use std::{fmt, mem}; use async_task::{Builder, Runnable}; use futures_lite::{future, prelude::*}; use slab::Slab; -use crate::{AsyncCallOnDrop, Sleepers}; +use crate::{AbortOnPanic, AsyncCallOnDrop, Sleepers}; #[doc(no_inline)] pub use async_task::Task; @@ -184,6 +184,8 @@ impl<'a> LocalExecutor<'a> { // Remove the task from the set of active tasks when the future finishes. let entry = active.vacant_entry(); let index = entry.key(); + let builder = Builder::new().propagate_panic(true); + let future = AsyncCallOnDrop::new(future, move || { // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because // `LocalExecutor` is !Send, there is no way to have concurrent access to the @@ -191,39 +193,46 @@ impl<'a> LocalExecutor<'a> { drop(unsafe { &mut *state.active.get() }.try_remove(index)) }); - // Create the task and register it in the set of active tasks. - // - // SAFETY: - // - // `future` may not `Send`. Since `LocalExecutor` is `!Sync`, - // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `LocalExecutor`. Similarly, `spawn` can only be called - // from the origin thread, ensuring that `future` and the executor share - // the same origin thread. The `Runnable` can be scheduled from other - // threads, but because of the above `Runnable` can only be called or - // dropped on the origin thread. - // - // `future` is not `'static`, but we make sure that the `Runnable` does - // not outlive `'a`. When the executor is dropped, the `active` field is - // drained and all of the `Waker`s are woken. Then, the queue inside of - // the `Executor` is drained of all of its runnables. This ensures that - // runnables are dropped and this precondition is satisfied. + // This is a critical section which will result in UB by aliasing active + // if the AsyncCallOnDrop is called while still in this function. // - // `schedule` is not `Send` nor `Sync`. As LocalExecutor is not - // `Send`, the `Waker` is guaranteed// to only be used on the same thread - // it was spawned on. - // - // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does - // not outlive `'a`. When the executor is dropped, the `active` field is - // drained and all of the `Waker`s are woken. - let (runnable, task) = unsafe { - Builder::new() - .propagate_panic(true) - .spawn_unchecked(|()| future, Self::schedule(state)) - }; + // To avoid this, this guard will abort the process if it does + // panic. Rust's drop order will ensure that this will run before + // executor, and thus before the above AsyncCallOnDrop is dropped. + let _panic_guard = AbortOnPanic; + + let (runnable, task) = + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // `future` may not `Send`. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` cannot be scheduled from other + // threads, and because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `schedule` is not `Send` nor `Sync`. As LocalExecutor is not + // `Send`, the `Waker` is guaranteed// to only be used on the same thread + // it was spawned on. + // + // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. + unsafe { builder.spawn_unchecked(|()| future, Self::schedule(state)) }; entry.insert(runnable.waker()); - runnable.schedule(); + + // Critical section over. Forget the guard to avoid panicking on return. + mem::forget(_panic_guard); task } From 5c8ed3a517522cd5ae9e54df20cbd59713fa2f1a Mon Sep 17 00:00:00 2001 From: James Liu Date: Sat, 16 Aug 2025 20:17:42 +0000 Subject: [PATCH 22/22] Update src/lib.rs Co-authored-by: Ellen Emilia Anna Zscheile --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 0eefbb6..5b30593 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -923,7 +923,7 @@ struct AbortOnPanic; impl Drop for AbortOnPanic { fn drop(&mut self) { // Panicking while already panicking will result in an abort - panic!("Panicked while in a critical section. Abortign the process"); + panic!("Panicked while in a critical section. Aborting the process"); } }