diff --git a/src/lib.rs b/src/lib.rs index 2ec014a..710a650 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use std::sync::{Arc, Mutex, RwLock, TryLockError}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::task::{Poll, Waker}; use async_task::{Builder, Runnable}; @@ -143,7 +143,7 @@ impl<'a> Executor<'a> { /// assert!(ex.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.state().active.lock().unwrap().is_empty() + self.state().active().is_empty() } /// Spawns a task onto the executor. @@ -160,7 +160,7 @@ impl<'a> Executor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - let mut active = self.state().active.lock().unwrap(); + let mut active = self.state().active(); // SAFETY: `T` and the future are `Send`. unsafe { self.spawn_inner(future, &mut active) } @@ -211,7 +211,7 @@ impl<'a> Executor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = Some(self.state().active.lock().unwrap()); + let mut active = Some(self.state().active()); // Convert the futures into tasks. let tasks = futures.into_iter().enumerate().map(move |(i, future)| { @@ -221,7 +221,7 @@ impl<'a> Executor<'a> { // Yield the lock every once in a while to ease contention. if i.wrapping_sub(1) % 500 == 0 { drop(active.take()); - active = Some(self.state().active.lock().unwrap()); + active = Some(self.state().active()); } task @@ -246,7 +246,7 @@ impl<'a> Executor<'a> { let index = entry.key(); let state = self.state_as_arc(); let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); + let _guard = CallOnDrop(move || drop(state.active().try_remove(index))); future.await }; @@ -415,7 +415,7 @@ impl Drop for Executor<'_> { // via Arc::into_raw in state_ptr. let state = unsafe { Arc::from_raw(ptr) }; - let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner()); + let mut active = state.active(); for w in active.drain() { w.wake(); } @@ -517,7 +517,7 @@ impl<'a> LocalExecutor<'a> { /// }); /// ``` pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active.lock().unwrap(); + let mut active = self.inner().state().active(); // SAFETY: This executor is not thread safe, so the future and its result // cannot be sent to another thread. @@ -569,7 +569,7 @@ impl<'a> LocalExecutor<'a> { futures: impl IntoIterator, handles: &mut impl Extend>, ) { - let mut active = self.inner().state().active.lock().unwrap(); + let mut active = self.inner().state().active(); // Convert all of the futures to tasks. let tasks = futures.into_iter().map(|future| { @@ -694,6 +694,11 @@ impl State { } } + /// Returns a reference to currently active tasks. + fn active(&self) -> MutexGuard<'_, Slab> { + self.active.lock().unwrap_or_else(|e| e.into_inner()) + } + /// Notifies a sleeping ticker. #[inline] fn notify(&self) { @@ -1099,7 +1104,7 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re match self.0.try_lock() { Ok(lock) => fmt::Debug::fmt(&lock.len(), f), Err(TryLockError::WouldBlock) => f.write_str(""), - Err(TryLockError::Poisoned(_)) => f.write_str(""), + Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f), } } } diff --git a/tests/drop.rs b/tests/drop.rs index 54a0741..5d089b5 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -133,6 +133,9 @@ fn iterator_panics_mid_run() { ) }); assert!(panic.is_err()); + + let task = ex.spawn(future::ready(0)); + assert_eq!(future::block_on(ex.run(task)), 0); } struct CallOnDrop(F);