Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
if self
.locked
.compare_exchange(false, true, Acquire, Acquire)
.is_ok()
{
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/loom/std/atomic_u64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ mod imp {
prev
}

pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
pub(crate) fn compare_exchange(&self, old: u64, new: u64, _: Ordering) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;

if prev != old {
return prev;
return Err(prev);
}

*lock = new;
prev
Ok(prev)
}
}
}
20 changes: 12 additions & 8 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
Expand Down Expand Up @@ -194,13 +194,17 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
);

if actual != prev {
if self
.inner
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
Expand Down
18 changes: 10 additions & 8 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);

match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
Expand Down Expand Up @@ -306,11 +308,11 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));
let next = NonNull::new(
self.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);

let next = match next {
Some(next) => next,
Expand All @@ -333,7 +335,7 @@ impl<T> Block<T> {

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };

curr = match actual {
Ok(_) => {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
let actual =
self.block_tail
.compare_and_swap(block_ptr, next_block.as_ptr(), Release);

if actual == block_ptr {
if self
.block_tail
.compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
.is_ok()
{
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);

Expand Down Expand Up @@ -191,7 +191,7 @@ impl<T> Tx<T> {

// TODO: Unify this logic with Block::grow
for _ in 0..3 {
match curr.as_ref().try_push(&mut block, AcqRel) {
match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
Ok(_) => {
reused = true;
break;
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/sync/task/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
.unwrap_or_else(|x| x)
{
WAITING => {
unsafe {
// Locked acquired, update the waker cell
Expand Down