diff --git a/Cargo.toml b/Cargo.toml index c8feeb5..f3a7340 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.13.2" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.63" +rust-version = "1.68" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" @@ -18,6 +18,9 @@ exclude = ["/.*"] # Adds support for executors optimized for use in static variables. static = [] +[lib] +bench = false + [dependencies] async-task = "4.4.0" concurrent-queue = "2.5.0" @@ -44,5 +47,10 @@ name = "executor" harness = false required-features = ["static"] +[[bench]] +name = "local_executor" +harness = false +required-features = ["static"] + [package.metadata.docs.rs] all-features = true diff --git a/benches/local_executor.rs b/benches/local_executor.rs new file mode 100644 index 0000000..c0bc52d --- /dev/null +++ b/benches/local_executor.rs @@ -0,0 +1,427 @@ +use std::hint::black_box; +use std::mem; + +use async_executor::{Executor, LocalExecutor, StaticLocalExecutor}; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures_lite::{future, prelude::*}; + +const TASKS: usize = 300; +const STEPS: usize = 300; +const LIGHT_TASKS: usize = 25_000; + +fn run(f: impl FnOnce(&'static LocalExecutor<'_>)) { + f(Box::leak(Box::new(LocalExecutor::new()))); +} + +fn run_static(f: impl FnOnce(&'static StaticLocalExecutor)) { + f(LocalExecutor::new().leak()) +} + +fn create(c: &mut Criterion) { + c.bench_function("executor::create", |b| { + b.iter(|| { + let ex = Executor::new(); + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }) + }); +} + +fn running_benches(c: &mut Criterion) { + for (prefix, with_static) in [("local_executor", false), ("static_local_executor", true)] { + let mut group = c.benchmark_group("single_thread"); + + group.bench_function(format!("{prefix}::spawn_one"), |b| { + if with_static { + run_static(|ex| { + b.iter(|| { + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }); + }); + } else { + run(|ex| { + b.iter(|| { + let task = ex.spawn(async {}); + future::block_on(ex.run(task)); + }); + }); + } + }); + + if !with_static { + group.bench_function("executor::spawn_batch", |b| { + run(|ex| { + let mut handles = vec![]; + + b.iter(|| { + ex.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + handles.clear(); + }); + + handles.clear(); + }) + }); + } + + group.bench_function(format!("{prefix}::spawn_many_local"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(ex.spawn(async {})); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(ex.spawn(async {})); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::spawn_recursively"), |b| { + #[allow(clippy::manual_async_fn)] + fn go( + ex: &'static LocalExecutor<'static>, + i: usize, + ) -> impl Future + 'static { + async move { + if i != 0 { + ex.spawn(async move { + let fut = Box::pin(go(ex, i - 1)); + fut.await; + }) + .await; + } + } + } + + #[allow(clippy::manual_async_fn)] + fn go_static( + ex: &'static StaticLocalExecutor, + i: usize, + ) -> impl Future + 'static { + async move { + if i != 0 { + ex.spawn(async move { + let fut = Box::pin(go_static(ex, i - 1)); + fut.await; + }) + .await; + } + } + } + + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(go_static(ex, STEPS))); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(go(ex, STEPS))); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::yield_now"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + })); + }); + }); + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(ex.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + })); + }); + }); + } + }); + + group.bench_function(format!("{prefix}::channels"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = mem::replace(&mut current_recv, next_recv); + + tasks.push(ex.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + })); + }); + }) + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + // Create channels. + let mut tasks = Vec::new(); + let (first_send, first_recv) = async_channel::bounded(1); + let mut current_recv = first_recv; + + for _ in 0..TASKS { + let (next_send, next_recv) = async_channel::bounded(1); + let current_recv = mem::replace(&mut current_recv, next_recv); + + tasks.push(ex.spawn(async move { + // Send a notification on to the next task. + for _ in 0..STEPS { + current_recv.recv().await.unwrap(); + next_send.send(()).await.unwrap(); + } + })); + } + + for _ in 0..STEPS { + first_send.send(()).await.unwrap(); + current_recv.recv().await.unwrap(); + } + + for task in tasks { + task.await; + } + })); + }); + }) + } + }); + + group.bench_function(format!("{prefix}::web_server"), |b| { + if with_static { + run_static(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let (db_send, db_recv) = + async_channel::bounded::>(TASKS / 5); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = ex.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = ex.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = ex.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + })); + }) + }) + } else { + run(|ex| { + b.iter(move || { + future::block_on(ex.run(async { + let (db_send, db_recv) = + async_channel::bounded::>(TASKS / 5); + let mut db_rng = fastrand::Rng::with_seed(0x12345678); + let mut web_rng = db_rng.fork(); + + // This task simulates a database. + let db_task = ex.spawn(async move { + loop { + // Wait for a new task. + let incoming = match db_recv.recv().await { + Ok(incoming) => incoming, + Err(_) => break, + }; + + // Process the task. Maybe it takes a while. + for _ in 0..db_rng.usize(..10) { + future::yield_now().await; + } + + // Send the data back. + incoming.send(db_rng.usize(..)).await.ok(); + } + }); + + // This task simulates a web server waiting for new tasks. + let server_task = ex.spawn(async move { + for i in 0..TASKS { + // Get a new connection. + if web_rng.usize(..=16) == 16 { + future::yield_now().await; + } + + let mut web_rng = web_rng.fork(); + let db_send = db_send.clone(); + let task = ex.spawn(async move { + // Check if the data is cached... + if web_rng.bool() { + // ...it's in cache! + future::yield_now().await; + return; + } + + // Otherwise we have to make a DB call or two. + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { + let (resp_send, resp_recv) = async_channel::bounded(1); + db_send.send(resp_send).await.unwrap(); + black_box(resp_recv.recv().await.unwrap()); + } + + // Send the data back... + for _ in 0..web_rng.usize(3..16) { + future::yield_now().await; + } + }); + + task.detach(); + + if i & 16 == 0 { + future::yield_now().await; + } + } + }); + + // Spawn and wait for it to stop. + server_task.await; + db_task.await; + })); + }) + }) + } + }); + } +} + +criterion_group!(benches, create, running_benches); + +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 74e02db..5b30593 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,6 @@ use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::task::{Context, Poll, Waker}; @@ -55,11 +54,13 @@ use futures_lite::{future, prelude::*}; use pin_project_lite::pin_project; use slab::Slab; +mod local_executor; #[cfg(feature = "static")] mod static_executors; #[doc(no_inline)] pub use async_task::{FallibleTask, Task}; +pub use local_executor::*; #[cfg(feature = "static")] #[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))] pub use static_executors::*; @@ -431,235 +432,6 @@ impl<'a> Default for Executor<'a> { } } -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -/// -/// # Examples -/// -/// ``` -/// use async_executor::LocalExecutor; -/// use futures_lite::future; -/// -/// let local_ex = LocalExecutor::new(); -/// -/// future::block_on(local_ex.run(async { -/// println!("Hello world!"); -/// })); -/// ``` -pub struct LocalExecutor<'a> { - /// The inner executor. - inner: Executor<'a>, - - /// Makes the type `!Send` and `!Sync`. - _marker: PhantomData>, -} - -impl UnwindSafe for LocalExecutor<'_> {} -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl fmt::Debug for LocalExecutor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_executor(&self.inner, "LocalExecutor", f) - } -} - -impl<'a> LocalExecutor<'a> { - /// Creates a single-threaded executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// ``` - pub const fn new() -> LocalExecutor<'a> { - LocalExecutor { - inner: Executor::new(), - _marker: PhantomData, - } - } - - /// Returns `true` if there are no unfinished tasks. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// assert!(local_ex.is_empty()); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(!local_ex.is_empty()); - /// - /// assert!(local_ex.try_tick()); - /// assert!(local_ex.is_empty()); - /// ``` - pub fn is_empty(&self) -> bool { - self.inner().is_empty() - } - - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active(); - - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - } - - /// Spawns many tasks onto the executor. - /// - /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and - /// spawns all of the tasks in one go. With large amounts of tasks this can improve - /// contention. - /// - /// It is assumed that the iterator provided does not block; blocking iterators can lock up - /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the - /// mutex is not released, as there are no other threads that can poll this executor. - /// - /// ## Example - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::{stream, prelude::*}; - /// use std::future::ready; - /// - /// # futures_lite::future::block_on(async { - /// let mut ex = LocalExecutor::new(); - /// - /// let futures = [ - /// ready(1), - /// ready(2), - /// ready(3) - /// ]; - /// - /// // Spawn all of the futures onto the executor at once. - /// let mut tasks = vec![]; - /// ex.spawn_many(futures, &mut tasks); - /// - /// // Await all of them. - /// let results = ex.run(async move { - /// stream::iter(tasks).then(|x| x).collect::>().await - /// }).await; - /// assert_eq!(results, [1, 2, 3]); - /// # }); - /// ``` - /// - /// [`spawn`]: LocalExecutor::spawn - /// [`Executor::spawn_many`]: Executor::spawn_many - pub fn spawn_many + 'a>( - &self, - futures: impl IntoIterator, - handles: &mut impl Extend>, - ) { - let mut active = self.inner().state().active(); - - // Convert all of the futures to tasks. - let tasks = futures.into_iter().map(|future| { - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - - // As only one thread can spawn or poll tasks at a time, there is no need - // to release lock contention here. - }); - - // Push them to the user's collection. - handles.extend(tasks); - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let ex = LocalExecutor::new(); - /// assert!(!ex.try_tick()); // no tasks to run - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.inner().try_tick() - } - - /// Runs a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let ex = LocalExecutor::new(); - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// future::block_on(ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.inner().tick().await - } - - /// Runs the executor until the given future completes. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = future::block_on(local_ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, future: impl Future) -> T { - self.inner().run(future).await - } - - /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor<'a> { - &self.inner - } -} - -impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { - LocalExecutor::new() - } -} - /// The state of a executor. struct State { /// The global queue. @@ -1146,6 +918,15 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re .finish() } +struct AbortOnPanic; + +impl Drop for AbortOnPanic { + fn drop(&mut self) { + // Panicking while already panicking will result in an abort + panic!("Panicked while in a critical section. Aborting the process"); + } +} + /// Runs a closure when dropped. struct CallOnDrop(F); diff --git a/src/local_executor.rs b/src/local_executor.rs new file mode 100644 index 0000000..b59293e --- /dev/null +++ b/src/local_executor.rs @@ -0,0 +1,628 @@ +use std::cell::{Cell, UnsafeCell}; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Poll, Waker}; +use std::{fmt, mem}; + +use async_task::{Builder, Runnable}; +use futures_lite::{future, prelude::*}; +use slab::Slab; + +use crate::{AbortOnPanic, AsyncCallOnDrop, Sleepers}; +#[doc(no_inline)] +pub use async_task::Task; + +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +/// +/// # Examples +/// +/// ``` +/// use async_executor::LocalExecutor; +/// use futures_lite::future; +/// +/// let local_ex = LocalExecutor::new(); +/// +/// future::block_on(local_ex.run(async { +/// println!("Hello world!"); +/// })); +/// ``` +pub struct LocalExecutor<'a> { + /// The executor state. + pub(crate) state: Cell<*mut State>, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "LocalExecutor", f) + } +} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// ``` + pub const fn new() -> LocalExecutor<'a> { + LocalExecutor { + state: Cell::new(std::ptr::null_mut()), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// assert!(local_ex.is_empty()); + /// + /// let task = local_ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(!local_ex.is_empty()); + /// + /// assert!(local_ex.try_tick()); + /// assert!(local_ex.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + unsafe { &*self.state().active.get() }.is_empty() + } + + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&self, future: impl Future + 'a) -> Task { + let state = self.state(); + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &mut *self.state().active.get() }; + Self::spawn_inner(state, future, active) + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::{stream, prelude::*}; + /// use std::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = LocalExecutor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: LocalExecutor::spawn + /// [`Executor::spawn_many`]: Executor::spawn_many + pub fn spawn_many + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let tasks = { + let state = self.state(); + + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &mut *state.active.get() }; + + // Convert the futures into tasks. + futures + .into_iter() + .map(move |future| Self::spawn_inner(state, future, active)) + }; + + // Push the tasks to the user's collection. + handles.extend(tasks); + } + + /// Spawn a future while holding the inner lock. + fn spawn_inner( + state: Pin<&'a State>, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let builder = Builder::new().propagate_panic(true); + + let future = AsyncCallOnDrop::new(future, move || { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + drop(unsafe { &mut *state.active.get() }.try_remove(index)) + }); + + // This is a critical section which will result in UB by aliasing active + // if the AsyncCallOnDrop is called while still in this function. + // + // To avoid this, this guard will abort the process if it does + // panic. Rust's drop order will ensure that this will run before + // executor, and thus before the above AsyncCallOnDrop is dropped. + let _panic_guard = AbortOnPanic; + + let (runnable, task) = + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // `future` may not `Send`. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` cannot be scheduled from other + // threads, and because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `schedule` is not `Send` nor `Sync`. As LocalExecutor is not + // `Send`, the `Waker` is guaranteed// to only be used on the same thread + // it was spawned on. + // + // `Self::schedule` may not be `'static`, but we make sure that the `Waker` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. + unsafe { builder.spawn_unchecked(|()| future, Self::schedule(state)) }; + entry.insert(runnable.waker()); + runnable.schedule(); + + // Critical section over. Forget the guard to avoid panicking on return. + mem::forget(_panic_guard); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state().tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(local_ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state().run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + 'a { + move |runnable| { + { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + let queue = unsafe { &mut *state.queue.get() }; + queue.push_front(runnable); + } + state.notify(); + } + } + + /// Returns a pointer to the inner state. + #[inline] + pub(crate) fn state(&self) -> Pin<&'a State> { + #[cold] + fn alloc_state(cell: &Cell<*mut State>) -> *mut State { + debug_assert!(cell.get().is_null()); + let state = Rc::new(State::new()); + let ptr = Rc::into_raw(state).cast_mut(); + cell.set(ptr); + ptr + } + + let mut ptr = self.state.get(); + if ptr.is_null() { + ptr = alloc_state(&self.state); + } + + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. + Pin::new(unsafe { &*ptr }) + } +} + +impl Drop for LocalExecutor<'_> { + fn drop(&mut self) { + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Rc::new and converted + // via Rc::into_raw in state_ptr. + let state = unsafe { Rc::from_raw(ptr) }; + + { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &mut *state.active.get() }; + for w in active.drain() { + w.wake(); + } + } + + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + unsafe { &mut *state.queue.get() }.clear(); + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} + +/// The state of a executor. +pub(crate) struct State { + /// The global queue. + pub(crate) queue: UnsafeCell>, + + /// A list of sleeping tickers. + sleepers: UnsafeCell, + + /// Currently active tasks. + pub(crate) active: UnsafeCell>, +} + +impl State { + /// Creates state for a new executor. + pub(crate) const fn new() -> State { + State { + queue: UnsafeCell::new(VecDeque::new()), + sleepers: UnsafeCell::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: UnsafeCell::new(Slab::new()), + } + } + + /// Notifies a sleeping ticker. + #[inline] + pub(crate) fn notify(&self) { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let waker = unsafe { &mut *self.sleepers.get() }.notify(); + if let Some(w) = waker { + w.wake(); + } + } + + pub(crate) fn try_tick(&self) -> bool { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + let runnable = unsafe { &mut *self.queue.get() }.pop_back(); + match runnable { + None => false, + Some(runnable) => { + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + Ticker::new(self).runnable().await.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + match unsafe { &mut *self.queue.get() }.pop_back() { + Some(runnable) => { + runnable.run(); + } + None => break, + } + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + match self.sleeping { + // Move to sleeping state. + 0 => { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + if !sleepers.update(id, waker) { + return false; + } + } + } + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + sleepers.remove(self.sleeping); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + async fn runnable(&mut self) -> Runnable { + future::poll_fn(|cx| { + loop { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + match unsafe { &mut *self.state.queue.get() }.pop_back() { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + return Poll::Ready(r); + } + } + } + }) + .await + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let notified = { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let sleepers = unsafe { &mut *self.state.sleepers.get() }; + sleepers.remove(self.sleeping) + }; + + // If this ticker was notified, then notify another ticker. + if notified { + self.state.notify(); + } + } + } +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_executor( + executor: &LocalExecutor<'_>, + name: &str, + f: &mut fmt::Formatter<'_>, +) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.get(); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Rc::new and converted via Rc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +pub(crate) fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a UnsafeCell>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &*self.0.get() }; + fmt::Debug::fmt(&active.len(), f) + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a UnsafeCell); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // SAFETY: All UnsafeCell accesses to sleepers are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the sleepers field. + let sleepers = unsafe { &*self.0.get() }; + fmt::Debug::fmt(&sleepers.count, f) + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + .field("global_tasks", &unsafe { &*state.queue.get() }.len()) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} diff --git a/src/static_executors.rs b/src/static_executors.rs index c43679d..3385595 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -1,3 +1,4 @@ +use crate::local_executor::State as LocalState; use crate::{debug_state, Executor, LocalExecutor, State}; use async_task::{Builder, Runnable, Task}; use slab::Slab; @@ -84,22 +85,32 @@ impl LocalExecutor<'static> { /// future::block_on(ex.run(task)); /// ``` pub fn leak(self) -> &'static StaticLocalExecutor { - let ptr = self.inner.state_ptr(); - // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - let state: &'static State = unsafe { &*ptr }; + let ptr = self.state.get(); + + let state: &'static LocalState = if ptr.is_null() { + Box::leak(Box::new(LocalState::new())) + } else { + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + unsafe { &*ptr } + }; std::mem::forget(self); - let mut active = state.active.lock().unwrap(); - if !active.is_empty() { - // Reschedule all of the active tasks. - for waker in active.drain() { - waker.wake(); + { + // SAFETY: All UnsafeCell accesses to active are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the active field. + let active = unsafe { &mut *state.active.get() }; + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); } - // Overwrite to ensure that the slab is deallocated. - *active = Slab::new(); } // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). @@ -311,7 +322,7 @@ impl Default for StaticExecutor { /// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html #[repr(transparent)] pub struct StaticLocalExecutor { - state: State, + state: LocalState, marker_: PhantomData>, } @@ -320,7 +331,7 @@ impl RefUnwindSafe for StaticLocalExecutor {} impl fmt::Debug for StaticLocalExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(&self.state, "StaticLocalExecutor", f) + crate::local_executor::debug_state(&self.state, "StaticLocalExecutor", f) } } @@ -338,7 +349,7 @@ impl StaticLocalExecutor { /// ``` pub const fn new() -> Self { Self { - state: State::new(), + state: LocalState::new(), marker_: PhantomData, } } @@ -360,9 +371,33 @@ impl StaticLocalExecutor { /// }); /// ``` pub fn spawn(&'static self, future: impl Future + 'static) -> Task { - let (runnable, task) = Builder::new() - .propagate_panic(true) - .spawn_local(|()| future, self.schedule()); + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; runnable.schedule(); task } @@ -377,20 +412,27 @@ impl StaticLocalExecutor { &'static self, future: impl Future + 'a, ) -> Task { + // Create the task and register it in the set of active tasks. + // // SAFETY: // - // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, - // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only - // be called from the origin thread, ensuring that `future` and the executor - // share the same origin thread. The `Runnable` can be scheduled from other - // threads, but because of the above `Runnable` can only be called or - // dropped on the origin thread. - // - `future` is not `'static`, but the caller guarantees that the - // task, and thus its `Runnable` must not live longer than `'a`. - // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. + // `future` may not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // + // `self.schedule()` is not `Send` nor `Sync`. As StaticLocalExecutor is not + // `Send`, the `Waker` is guaranteed// to only be used on the same thread + // it was spawned on. + // + // `self.schedule()` is `'static`, and thus will outlive all borrowed + // variables in the future. let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) @@ -464,11 +506,16 @@ impl StaticLocalExecutor { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state: &'static State = &self.state; - // TODO: If possible, push into the current local queue and notify the ticker. + fn schedule(&'static self) -> impl Fn(Runnable) + 'static { + let state: &'static LocalState = &self.state; move |runnable| { - state.queue.push(runnable).unwrap(); + { + // SAFETY: All UnsafeCell accesses to queue are tightly scoped, and because + // `LocalExecutor` is !Send, there is no way to have concurrent access to the + // values in `State`, including the queue field. + let queue = unsafe { &mut *state.queue.get() }; + queue.push_front(runnable); + } state.notify(); } } diff --git a/tests/drop.rs b/tests/drop.rs index 5d089b5..50cdf14 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; use std::task::{Poll, Waker}; -use async_executor::{Executor, Task}; +use async_executor::{Executor, LocalExecutor, Task}; use futures_lite::future; use once_cell::sync::Lazy; @@ -39,6 +39,36 @@ fn executor_cancels_everything() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn local_executor_cancels_everything() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Lazy>> = Lazy::new(Default::default); + + let ex = LocalExecutor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.tick()); + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[cfg(not(miri))] #[test] fn leaked_executor_leaks_everything() { @@ -70,6 +100,37 @@ fn leaked_executor_leaks_everything() { assert_eq!(DROP.load(Ordering::SeqCst), 0); } +#[cfg(not(miri))] +#[test] +fn leaked_local_executor_leaks_everything() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Lazy>> = Lazy::new(Default::default); + + let ex = LocalExecutor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.tick()); + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + mem::forget(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + assert!(future::block_on(future::poll_once(task)).is_none()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); +} + #[test] fn await_task_after_dropping_executor() { let s: String = "hello".into(); @@ -83,6 +144,19 @@ fn await_task_after_dropping_executor() { drop(s); } +#[test] +fn await_task_after_dropping_local_executor() { + let s: String = "hello".into(); + + let ex = LocalExecutor::new(); + let task: Task<&str> = ex.spawn(async { &*s }); + assert!(ex.try_tick()); + + drop(ex); + assert_eq!(future::block_on(task), "hello"); + drop(s); +} + #[test] fn drop_executor_and_then_drop_finished_task() { static DROP: AtomicUsize = AtomicUsize::new(0); @@ -102,6 +176,25 @@ fn drop_executor_and_then_drop_finished_task() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn drop_local_executor_and_then_drop_finished_task() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = LocalExecutor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + assert!(ex.try_tick()); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[test] fn drop_finished_task_and_then_drop_executor() { static DROP: AtomicUsize = AtomicUsize::new(0); @@ -121,6 +214,25 @@ fn drop_finished_task_and_then_drop_executor() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn drop_finished_task_and_then_drop_local_executor() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = LocalExecutor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + assert!(ex.try_tick()); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + #[test] fn iterator_panics_mid_run() { let ex = Executor::new(); @@ -138,6 +250,23 @@ fn iterator_panics_mid_run() { assert_eq!(future::block_on(ex.run(task)), 0); } +#[test] +fn local_iterator_panics_mid_run() { + let ex = LocalExecutor::new(); + + let panic = std::panic::catch_unwind(|| { + let mut handles = vec![]; + ex.spawn_many( + (0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }), + &mut handles, + ) + }); + assert!(panic.is_err()); + + let task = ex.spawn(future::ready(0)); + assert_eq!(future::block_on(ex.run(task)), 0); +} + struct CallOnDrop(F); impl Drop for CallOnDrop { diff --git a/tests/larger_tasks.rs b/tests/larger_tasks.rs index cc57988..8961646 100644 --- a/tests/larger_tasks.rs +++ b/tests/larger_tasks.rs @@ -1,9 +1,10 @@ //! Test for larger tasks. -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::future::{self, block_on}; use futures_lite::prelude::*; +use std::rc::Rc; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -80,6 +81,42 @@ fn do_run>(mut f: impl FnMut(Arc>) -> }); } +fn do_run_local>(mut f: impl FnMut(Rc>) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let _stop_timeout = { + let (stop_timeout, stopper) = async_channel::bounded::<()>(1); + thread::spawn(move || { + block_on(async move { + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }) + }); + stop_timeout + }; + + let ex = Rc::new(LocalExecutor::new()); + + // Test 1: Use the `run` command. + block_on(ex.run(f(ex.clone()))); + + // Test 2: Loop on `tick`. + block_on(async { + let ticker = async { + loop { + ex.tick().await; + } + }; + + f(ex.clone()).or(ticker).await + }); +} + #[test] fn smoke() { do_run(|ex| async move { ex.spawn(async {}).await }); @@ -97,3 +134,21 @@ fn timer() { .await; }) } + +#[test] +fn smoke_local() { + do_run_local(|ex| async move { ex.spawn(async {}).await }); +} + +#[test] +fn yield_now_local() { + do_run_local(|ex| async move { ex.spawn(future::yield_now()).await }) +} + +#[test] +fn timer_local() { + do_run_local(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }) +} diff --git a/tests/local_queue.rs b/tests/local_queue.rs index 4678366..62fe7f2 100644 --- a/tests/local_queue.rs +++ b/tests/local_queue.rs @@ -1,4 +1,4 @@ -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::{future, pin}; #[test] @@ -22,3 +22,25 @@ fn two_queues() { assert!(future::poll_once(run2.as_mut()).await.is_none()); }); } + +#[test] +fn local_two_queues() { + future::block_on(async { + // Create an executor with two runners. + let ex = LocalExecutor::new(); + let (run1, run2) = ( + ex.run(future::pending::<()>()), + ex.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); +} diff --git a/tests/panic_prop.rs b/tests/panic_prop.rs index eab4901..4c2a736 100644 --- a/tests/panic_prop.rs +++ b/tests/panic_prop.rs @@ -1,4 +1,4 @@ -use async_executor::Executor; +use async_executor::{Executor, LocalExecutor}; use futures_lite::{future, prelude::*}; #[test] @@ -12,3 +12,15 @@ fn test_panic_propagation() { // Polling the task should. assert!(future::block_on(task.catch_unwind()).is_err()); } + +#[test] +fn test_local_panic_propagation() { + let ex = LocalExecutor::new(); + let task = ex.spawn(async { panic!("should be caught by the task") }); + + // Running the executor should not panic. + assert!(ex.try_tick()); + + // Polling the task should. + assert!(future::block_on(task.catch_unwind()).is_err()); +}