Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "async-executor"
version = "1.13.2"
authors = ["Stjepan Glavina <[email protected]>", "John Nunley <[email protected]>"]
edition = "2021"
rust-version = "1.63"
rust-version = "1.65"
description = "Async executor"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-executor"
Expand Down
82 changes: 39 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use static_executors::*;
/// ```
pub struct Executor<'a> {
/// The executor state.
state: AtomicPtr<State>,
pub(crate) state: AtomicPtr<State>,

/// Makes the `'a` lifetime invariant.
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
Expand Down Expand Up @@ -163,10 +163,11 @@ impl<'a> Executor<'a> {
/// });
/// ```
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active();
let state = self.state();
let mut active = state.active();

// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
unsafe { Self::spawn_inner(state, future, &mut active) }
}

/// Spawns many tasks onto the executor.
Expand Down Expand Up @@ -214,12 +215,13 @@ impl<'a> Executor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active());
let state = self.state();
let mut active = Some(state.as_ref().active());

// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
// SAFETY: `T` and the future are `Send`.
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) };

// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
Expand All @@ -240,14 +242,13 @@ impl<'a> Executor<'a> {
///
/// If this is an `Executor`, `F` and `T` must be `Send`.
unsafe fn spawn_inner<T: 'a>(
&self,
state: Pin<&'a State>,
future: impl Future<Output = T> + 'a,
active: &mut Slab<Waker>,
) -> Task<T> {
// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
let state = self.state_as_arc();
let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));

// Create the task and register it in the set of active tasks.
Expand All @@ -269,12 +270,16 @@ impl<'a> Executor<'a> {
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// `Self::schedule` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
//
// `Self::schedule` may not be k`'static`, but we make sure that the `Waker` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken.
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule());
.spawn_unchecked(|()| future, Self::schedule(state));
entry.insert(runnable.waker());

runnable.schedule();
Expand Down Expand Up @@ -345,9 +350,7 @@ impl<'a> Executor<'a> {
}

/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state_as_arc();

fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a {
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
Expand All @@ -357,12 +360,11 @@ impl<'a> Executor<'a> {

/// Returns a pointer to the inner state.
#[inline]
fn state_ptr(&self) -> *const State {
fn state(&self) -> Pin<&'a State> {
#[cold]
fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
let state = Arc::new(State::new());
// TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
let ptr = Arc::into_raw(state) as *mut State;
let ptr = Arc::into_raw(state).cast_mut();
if let Err(actual) = atomic_ptr.compare_exchange(
std::ptr::null_mut(),
ptr,
Expand All @@ -381,26 +383,10 @@ impl<'a> Executor<'a> {
if ptr.is_null() {
ptr = alloc_state(&self.state);
}
ptr
}

/// Returns a reference to the inner state.
#[inline]
fn state(&self) -> &State {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr.
unsafe { &*self.state_ptr() }
}

// Clones the inner state Arc
#[inline]
fn state_as_arc(&self) -> Arc<State> {
// SAFETY: So long as an Executor lives, it's state pointer will always be a valid
// Arc when accessed through state_ptr.
let arc = unsafe { Arc::from_raw(self.state_ptr()) };
let clone = arc.clone();
std::mem::forget(arc);
clone
// and will never be moved until it's dropped.
Pin::new(unsafe { &*ptr })
}
}

Expand All @@ -415,7 +401,7 @@ impl Drop for Executor<'_> {
// via Arc::into_raw in state_ptr.
let state = unsafe { Arc::from_raw(ptr) };

let mut active = state.active();
let mut active = state.pin().active();
for w in active.drain() {
w.wake();
}
Expand Down Expand Up @@ -517,11 +503,12 @@ impl<'a> LocalExecutor<'a> {
/// });
/// ```
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active();
let state = self.inner().state();
let mut active = state.active();

// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
unsafe { Executor::spawn_inner(state, future, &mut active) }
}

/// Spawns many tasks onto the executor.
Expand Down Expand Up @@ -569,13 +556,14 @@ impl<'a> LocalExecutor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active();
let state = self.inner().state();
let mut active = state.active();

// Convert all of the futures to tasks.
let tasks = futures.into_iter().map(|future| {
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
unsafe { Executor::spawn_inner(state, future, &mut active) }

// As only one thread can spawn or poll tasks at a time, there is no need
// to release lock contention here.
Expand Down Expand Up @@ -694,9 +682,16 @@ impl State {
}
}

fn pin(&self) -> Pin<&Self> {
Pin::new(self)
}

/// Returns a reference to currently active tasks.
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
self.active.lock().unwrap_or_else(|e| e.into_inner())
fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab<Waker>> {
self.get_ref()
.active
.lock()
.unwrap_or_else(|e| e.into_inner())
}

/// Notifies a sleeping ticker.
Expand Down Expand Up @@ -1192,13 +1187,14 @@ fn _ensure_send_and_sync() {
is_sync::<Executor<'_>>(Executor::new());

let ex = Executor::new();
let state = ex.state();
is_send(ex.run(pending::<()>()));
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
is_send(ex.schedule());
is_sync(ex.schedule());
is_static(ex.schedule());
is_send(Executor::schedule(state));
is_sync(Executor::schedule(state));
is_static(Executor::schedule(state));

/// ```compile_fail
/// use async_executor::LocalExecutor;
Expand Down
31 changes: 21 additions & 10 deletions src/static_executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
future::Future,
marker::PhantomData,
panic::{RefUnwindSafe, UnwindSafe},
sync::atomic::Ordering,
};

impl Executor<'static> {
Expand Down Expand Up @@ -34,11 +35,16 @@ impl Executor<'static> {
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticExecutor {
let ptr = self.state_ptr();
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
let ptr = self.state.load(Ordering::Relaxed);

let state: &'static State = if ptr.is_null() {
Box::leak(Box::new(State::new()))
} else {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
unsafe { &*ptr }
};

std::mem::forget(self);

Expand Down Expand Up @@ -84,11 +90,16 @@ impl LocalExecutor<'static> {
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticLocalExecutor {
let ptr = self.inner.state_ptr();
// SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
let ptr = self.inner.state.load(Ordering::Relaxed);

let state: &'static State = if ptr.is_null() {
Box::leak(Box::new(State::new()))
} else {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
unsafe { &*ptr }
};

std::mem::forget(self);

Expand Down