Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError};
use std::task::{Context, Poll, Waker};

use async_task::{Builder, Runnable};
Expand Down Expand Up @@ -350,7 +350,8 @@ impl<'a> Executor<'a> {

// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
let result = state.queue.push(runnable);
debug_assert!(result.is_ok()); // Since we use unbounded queue, push will never fail.
state.notify();
}
}
Expand Down Expand Up @@ -696,7 +697,7 @@ impl State {

/// Returns a reference to currently active tasks.
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
self.active.lock().unwrap_or_else(|e| e.into_inner())
self.active.lock().unwrap_or_else(PoisonError::into_inner)
}

/// Notifies a sleeping ticker.
Expand All @@ -707,7 +708,11 @@ impl State {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let waker = self.sleepers.lock().unwrap().notify();
let waker = self
.sleepers
.lock()
.unwrap_or_else(PoisonError::into_inner)
.notify();
if let Some(w) = waker {
w.wake();
}
Expand Down Expand Up @@ -852,7 +857,11 @@ impl Ticker<'_> {
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&mut self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();
let mut sleepers = self
.state
.sleepers
.lock()
.unwrap_or_else(PoisonError::into_inner);

match self.sleeping {
// Move to sleeping state.
Expand All @@ -878,7 +887,11 @@ impl Ticker<'_> {
/// Moves the ticker into woken state.
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let mut sleepers = self
.state
.sleepers
.lock()
.unwrap_or_else(PoisonError::into_inner);
sleepers.remove(self.sleeping);

self.state
Expand Down Expand Up @@ -926,7 +939,11 @@ impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let mut sleepers = self
.state
.sleepers
.lock()
.unwrap_or_else(PoisonError::into_inner);
let notified = sleepers.remove(self.sleeping);

self.state
Expand Down Expand Up @@ -971,7 +988,7 @@ impl Runner<'_> {
state
.local_queues
.write()
.unwrap()
.unwrap_or_else(PoisonError::into_inner)
.push(runner.local.clone());
runner
}
Expand All @@ -993,25 +1010,25 @@ impl Runner<'_> {
}

// Try stealing from other runners.
let local_queues = self.state.local_queues.read().unwrap();

// Pick a random starting point in the iterator list and rotate the list.
let n = local_queues.len();
let start = rng.usize(..n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);

// Remove this runner's local queue.
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));

// Try stealing from each local queue in the list.
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
return Some(r);
if let Ok(local_queues) = self.state.local_queues.try_read() {
// Pick a random starting point in the iterator list and rotate the list.
let n = local_queues.len();
let start = rng.usize(..n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);

// Remove this runner's local queue.
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));

// Try stealing from each local queue in the list.
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
return Some(r);
}
}
}

Expand All @@ -1037,7 +1054,7 @@ impl Drop for Runner<'_> {
self.state
.local_queues
.write()
.unwrap()
.unwrap_or_else(PoisonError::into_inner)
.retain(|local| !Arc::ptr_eq(local, &self.local));

// Re-schedule remaining tasks in the local queue.
Expand Down