Skip to content

Commit 5e08a9a

Browse files
author
Stjepan Glavina
committed
Use atomics to make run() and tick() futures Send + Sync
1 parent 05456ef commit 5e08a9a

File tree

1 file changed

+15
-15
lines changed

1 file changed

+15
-15
lines changed

src/lib.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121
#![forbid(unsafe_code)]
2222
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
2323

24-
use std::cell::Cell;
2524
use std::future::Future;
2625
use std::marker::PhantomData;
2726
use std::panic::{RefUnwindSafe, UnwindSafe};
2827
use std::pin::Pin;
2928
use std::rc::Rc;
30-
use std::sync::atomic::{AtomicBool, Ordering};
29+
use std::sync::atomic::{AtomicBool, Ordering, AtomicU64, AtomicUsize};
3130
use std::sync::{Arc, Mutex, RwLock};
3231
use std::task::{Context, Poll, Waker};
3332

@@ -200,7 +199,7 @@ impl State {
200199
sleepers: Mutex::new(Sleepers {
201200
count: 0,
202201
wakers: Vec::new(),
203-
id_gen: 0,
202+
id_gen: 1,
204203
}),
205204
}
206205
}
@@ -489,21 +488,21 @@ struct Ticker<'a> {
489488
/// The executor state.
490489
state: &'a State,
491490

492-
/// Set to `true` when in sleeping state.
491+
/// Set to a non-zero sleeper ID when in sleeping state.
493492
///
494493
/// States a ticker can be in:
495494
/// 1) Woken.
496495
/// 2a) Sleeping and unnotified.
497496
/// 2b) Sleeping and notified.
498-
sleeping: Cell<Option<u64>>,
497+
sleeping: AtomicU64,
499498
}
500499

501500
impl Ticker<'_> {
502501
/// Creates a ticker.
503502
fn new(state: &State) -> Ticker<'_> {
504503
Ticker {
505504
state,
506-
sleeping: Cell::new(None),
505+
sleeping: AtomicU64::new(0),
507506
}
508507
}
509508

@@ -513,12 +512,12 @@ impl Ticker<'_> {
513512
fn sleep(&self, waker: &Waker) -> bool {
514513
let mut sleepers = self.state.sleepers.lock().unwrap();
515514

516-
match self.sleeping.get() {
515+
match self.sleeping.load(Ordering::SeqCst) {
517516
// Move to sleeping state.
518-
None => self.sleeping.set(Some(sleepers.insert(waker))),
517+
0 => self.sleeping.store(sleepers.insert(waker), Ordering::SeqCst),
519518

520519
// Already sleeping, check if notified.
521-
Some(id) => {
520+
id => {
522521
if !sleepers.update(id, waker) {
523522
return false;
524523
}
@@ -534,7 +533,8 @@ impl Ticker<'_> {
534533

535534
/// Moves the ticker into woken state.
536535
fn wake(&self) {
537-
if let Some(id) = self.sleeping.take() {
536+
let id = self.sleeping.swap(0, Ordering::SeqCst);
537+
if id != 0 {
538538
let mut sleepers = self.state.sleepers.lock().unwrap();
539539
sleepers.remove(id);
540540

@@ -581,7 +581,8 @@ impl Ticker<'_> {
581581
impl Drop for Ticker<'_> {
582582
fn drop(&mut self) {
583583
// If this ticker is in sleeping state, it must be removed from the sleepers list.
584-
if let Some(id) = self.sleeping.take() {
584+
let id = self.sleeping.swap(0, Ordering::SeqCst);
585+
if id != 0 {
585586
let mut sleepers = self.state.sleepers.lock().unwrap();
586587
let notified = sleepers.remove(id);
587588

@@ -613,7 +614,7 @@ struct Runner<'a> {
613614
local: Arc<ConcurrentQueue<Runnable>>,
614615

615616
/// Bumped every time a runnable task is found.
616-
ticks: Cell<usize>,
617+
ticks: AtomicUsize,
617618
}
618619

619620
impl Runner<'_> {
@@ -623,7 +624,7 @@ impl Runner<'_> {
623624
state,
624625
ticker: Ticker::new(state),
625626
local: Arc::new(ConcurrentQueue::bounded(512)),
626-
ticks: Cell::new(0),
627+
ticks: AtomicUsize::new(0),
627628
};
628629
state
629630
.local_queues
@@ -677,8 +678,7 @@ impl Runner<'_> {
677678
.await;
678679

679680
// Bump the tick counter.
680-
let ticks = self.ticks.get();
681-
self.ticks.set(ticks.wrapping_add(1));
681+
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
682682

683683
if ticks % 64 == 0 {
684684
// Steal tasks from the global queue to ensure fair task scheduling.

0 commit comments

Comments
 (0)