Skip to content

Attempt to optimize LocalExecutor #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,15 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re
.finish()
}

struct AbortOnPanic;

impl Drop for AbortOnPanic {
fn drop(&mut self) {
// Panicking while already panicking will result in an abort
panic!("Panicked while in a critical section. Abortign the process");
}
}

/// Runs a closure when dropped.
struct CallOnDrop<F: FnMut()>(F);

Expand Down
73 changes: 41 additions & 32 deletions src/local_executor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::cell::{Cell, UnsafeCell};
use std::collections::VecDeque;
use std::fmt;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Poll, Waker};
use std::{fmt, mem};

use async_task::{Builder, Runnable};
use futures_lite::{future, prelude::*};
use slab::Slab;

use crate::{AsyncCallOnDrop, Sleepers};
use crate::{AbortOnPanic, AsyncCallOnDrop, Sleepers};
#[doc(no_inline)]
pub use async_task::Task;

Expand Down Expand Up @@ -184,46 +184,55 @@ impl<'a> LocalExecutor<'a> {
// Remove the task from the set of active tasks when the future finishes.
let entry = active.vacant_entry();
let index = entry.key();
let builder = Builder::new().propagate_panic(true);

let future = AsyncCallOnDrop::new(future, move || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if Builder::new().propagate_panic(true) could be saved in a binding above this line.

It is not really necessary, but it makes it obvious that no panics can occur between the construction of AsyncCallOnDrop and the call to .spawn_unchecked, at which point the handling of drop unwinding gets taken over by Task, which automatically schedules itself in that case and notifies this LocalExecutor.

Note that entry.insert might allocate, and subsequently panic on OOM.
I think the easiest way to prevent panics in this function in a sort-of "critical section" w.r.t. state.active would be to invoke active.reserve(1); at the top of the function.

Copy link
Contributor Author

@james7132 james7132 Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good catch! Unwinds continue to be the one control flow path that seems to be hard to find.

I went ahead and moved the builder outside of the "critical section".

reserve addresses the allocator OOM from the Slab, but it may also occur in the schedule with the VecDeque and adds extra work on the hotpath. I'd rather use a more robust / zero cost method and just opted to put an AbortOnPanic guard around the critical section. Rust's drop order should force it to run before the AsyncCallOnDrop is dropped.

// SAFETY: All UnsafeCell accesses to active are tightly scoped, and because
// `LocalExecutor` is !Send, there is no way to have concurrent access to the
// values in `State`, including the active field.
drop(unsafe { &mut *state.active.get() }.try_remove(index))
});

// Create the task and register it in the set of active tasks.
//
// SAFETY:
//
// `future` may not `Send`. Since `LocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
// from the origin thread, ensuring that `future` and the executor share
// the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
//
// `future` is not `'static`, but we make sure that the `Runnable` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken. Then, the queue inside of
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
// This is a critical section which will result in UB by aliasing active
// if the AsyncCallOnDrop is called while still in this function.
//
// `schedule` is not `Send` nor `Sync`. As LocalExecutor is not
// `Send`, the `Waker` is guaranteed// to only be used on the same thread
// it was spawned on.
//
// `Self::schedule` may not be `'static`, but we make sure that the `Waker` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, Self::schedule(state))
};
// To avoid this, this guard will abort the process if it does
// panic. Rust's drop order will ensure that this will run before
// executor, and thus before the above AsyncCallOnDrop is dropped.
let _panic_guard = AbortOnPanic;

let (runnable, task) =
// Create the task and register it in the set of active tasks.
//
// SAFETY:
//
// `future` may not `Send`. Since `LocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
// from the origin thread, ensuring that `future` and the executor share
// the same origin thread. The `Runnable` cannot be scheduled from other
// threads, and because of the above `Runnable` can only be called or
// dropped on the origin thread.
//
// `future` is not `'static`, but we make sure that the `Runnable` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken. Then, the queue inside of
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `schedule` is not `Send` nor `Sync`. As LocalExecutor is not
// `Send`, the `Waker` is guaranteed// to only be used on the same thread
// it was spawned on.
//
// `Self::schedule` may not be `'static`, but we make sure that the `Waker` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken.
unsafe { builder.spawn_unchecked(|()| future, Self::schedule(state)) };
entry.insert(runnable.waker());

runnable.schedule();

// Critical section over. Forget the guard to avoid panicking on return.
mem::forget(_panic_guard);
task
}

Expand Down