From 76dd9020234a6df99192c893c2e2d48b37643440 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 24 Aug 2025 01:26:29 -0700 Subject: [PATCH 1/3] Don't explicitly panic when avoidable --- src/lib.rs | 73 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 74e02db..e009958 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,7 +46,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; +use std::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError}; use std::task::{Context, Poll, Waker}; use async_task::{Builder, Runnable}; @@ -350,7 +350,8 @@ impl<'a> Executor<'a> { // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.push(runnable).unwrap(); + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); state.notify(); } } @@ -696,7 +697,7 @@ impl State { /// Returns a reference to currently active tasks. fn active(&self) -> MutexGuard<'_, Slab> { - self.active.lock().unwrap_or_else(|e| e.into_inner()) + self.active.lock().unwrap_or_else(PoisonError::into_inner) } /// Notifies a sleeping ticker. @@ -707,7 +708,11 @@ impl State { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_ok() { - let waker = self.sleepers.lock().unwrap().notify(); + let waker = self + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner) + .notify(); if let Some(w) = waker { w.wake(); } @@ -852,7 +857,11 @@ impl Ticker<'_> { /// /// Returns `false` if the ticker was already sleeping and unnotified. fn sleep(&mut self, waker: &Waker) -> bool { - let mut sleepers = self.state.sleepers.lock().unwrap(); + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); match self.sleeping { // Move to sleeping state. @@ -878,7 +887,11 @@ impl Ticker<'_> { /// Moves the ticker into woken state. fn wake(&mut self) { if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); sleepers.remove(self.sleeping); self.state @@ -926,7 +939,11 @@ impl Drop for Ticker<'_> { fn drop(&mut self) { // If this ticker is in sleeping state, it must be removed from the sleepers list. if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock().unwrap(); + let mut sleepers = self + .state + .sleepers + .lock() + .unwrap_or_else(PoisonError::into_inner); let notified = sleepers.remove(self.sleeping); self.state @@ -971,7 +988,7 @@ impl Runner<'_> { state .local_queues .write() - .unwrap() + .unwrap_or_else(PoisonError::into_inner) .push(runner.local.clone()); runner } @@ -993,25 +1010,25 @@ impl Runner<'_> { } // Try stealing from other runners. - let local_queues = self.state.local_queues.read().unwrap(); - - // Pick a random starting point in the iterator list and rotate the list. - let n = local_queues.len(); - let start = rng.usize(..n); - let iter = local_queues - .iter() - .chain(local_queues.iter()) - .skip(start) - .take(n); - - // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); - - // Try stealing from each local queue in the list. - for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { - return Some(r); + if let Ok(local_queues) = self.state.local_queues.try_read() { + // Pick a random starting point in the iterator list and rotate the list. + let n = local_queues.len(); + let start = rng.usize(..n); + let iter = local_queues + .iter() + .chain(local_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(local, &self.local); + if let Ok(r) = self.local.pop() { + return Some(r); + } } } @@ -1037,7 +1054,7 @@ impl Drop for Runner<'_> { self.state .local_queues .write() - .unwrap() + .unwrap_or_else(PoisonError::into_inner) .retain(|local| !Arc::ptr_eq(local, &self.local)); // Re-schedule remaining tasks in the local queue. From a50d70a97f7129ddcf2c91779f44eb334f1639c4 Mon Sep 17 00:00:00 2001 From: James Liu Date: Sun, 24 Aug 2025 09:08:53 +0000 Subject: [PATCH 2/3] Update src/lib.rs Co-authored-by: Taiki Endo --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index e009958..85c161b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -351,7 +351,7 @@ impl<'a> Executor<'a> { // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { let result = state.queue.push(runnable); - debug_assert!(result.is_ok()); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. state.notify(); } } From 5a48063bed009fd6a772dcfa1597212718427c32 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 24 Aug 2025 02:12:35 -0700 Subject: [PATCH 3/3] Do the same for static executors --- src/static_executors.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/static_executors.rs b/src/static_executors.rs index c43679d..7b4e667 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -7,6 +7,7 @@ use std::{ future::Future, marker::PhantomData, panic::{RefUnwindSafe, UnwindSafe}, + sync::PoisonError, }; impl Executor<'static> { @@ -42,7 +43,7 @@ impl Executor<'static> { std::mem::forget(self); - let mut active = state.active.lock().unwrap(); + let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner); if !active.is_empty() { // Reschedule all of the active tasks. for waker in active.drain() { @@ -92,7 +93,7 @@ impl LocalExecutor<'static> { std::mem::forget(self); - let mut active = state.active.lock().unwrap(); + let mut active = state.active.lock().unwrap_or_else(PoisonError::into_inner); if !active.is_empty() { // Reschedule all of the active tasks. for waker in active.drain() { @@ -283,7 +284,8 @@ impl StaticExecutor { let state: &'static State = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.push(runnable).unwrap(); + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. state.notify(); } } @@ -468,7 +470,8 @@ impl StaticLocalExecutor { let state: &'static State = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - state.queue.push(runnable).unwrap(); + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail. state.notify(); } }