diff --git a/Cargo.toml b/Cargo.toml index c9f03f7c4..7947e34e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ compio-ws = { path = "./compio-ws", version = "0.3.0", default-features = false bytes = "1.7.1" cfg_aliases = "0.2.1" cfg-if = "1.0.0" +compio-send-wrapper = "0.7.0" criterion = "0.8.0" crossbeam-queue = "0.3.8" flume = { version = "0.12.0", default-features = false } diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index 1d88dbd96..d1b5dba49 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -23,6 +23,7 @@ compio-io = { workspace = true, optional = true } async-task = "4.5.0" cfg-if = { workspace = true } criterion = { workspace = true, optional = true } +compio-send-wrapper = { workspace = true } core_affinity = "0.8.3" crossbeam-queue = { workspace = true } futures-util = { workspace = true } diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 1e208be62..36642fe73 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -39,9 +39,6 @@ mod scheduler; mod opt_waker; pub use opt_waker::OptWaker; -mod send_wrapper; -use send_wrapper::SendWrapper; - #[cfg(feature = "time")] use crate::runtime::time::{TimerFuture, TimerKey, TimerRuntime}; use crate::{BufResult, affinity::bind_to_cpu_set, runtime::scheduler::Scheduler}; diff --git a/compio-runtime/src/runtime/opt_waker.rs b/compio-runtime/src/runtime/opt_waker.rs index e64e670ac..e4ce79ba4 100644 --- a/compio-runtime/src/runtime/opt_waker.rs +++ b/compio-runtime/src/runtime/opt_waker.rs @@ -7,7 +7,7 @@ use std::{ }; #[cfg(not(feature = "notify-always"))] -use crate::runtime::send_wrapper::SendWrapper; +use compio_send_wrapper::SendWrapper; /// An optimized waker that avoids unnecessary wake-ups on the same thread. pub struct OptWaker { diff --git a/compio-runtime/src/runtime/scheduler/mod.rs b/compio-runtime/src/runtime/scheduler/mod.rs index 7a578145f..d16a6c1c2 100644 --- a/compio-runtime/src/runtime/scheduler/mod.rs +++ b/compio-runtime/src/runtime/scheduler/mod.rs @@ -8,13 +8,11 @@ use std::{ }; use async_task::{Runnable, Task}; +use compio_send_wrapper::SendWrapper; use crossbeam_queue::SegQueue; use slab::Slab; -use crate::runtime::{ - SendWrapper, - scheduler::{drop_hook::DropHook, local_queue::LocalQueue}, -}; +use crate::runtime::scheduler::{drop_hook::DropHook, local_queue::LocalQueue}; mod drop_hook; mod local_queue; diff --git a/compio-runtime/src/runtime/send_wrapper.rs b/compio-runtime/src/runtime/send_wrapper.rs deleted file mode 100644 index aefcb3682..000000000 --- a/compio-runtime/src/runtime/send_wrapper.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2017 Thomas Keh. -// Copyright 2024 compio-rs -// -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -#[cfg(feature = "current_thread_id")] -use std::thread::current_id; -use std::{ - mem::{self, ManuallyDrop}, - thread::{self, ThreadId}, -}; - -#[cfg(not(feature = "current_thread_id"))] -mod imp { - use std::{ - cell::Cell, - thread::{self, ThreadId}, - }; - thread_local! { - static THREAD_ID: Cell = Cell::new(thread::current().id()); - } - - pub fn current_id() -> ThreadId { - THREAD_ID.get() - } -} - -#[cfg(not(feature = "current_thread_id"))] -use imp::current_id; - -/// A wrapper that copied from `send_wrapper` crate, with our own optimizations. -pub struct SendWrapper { - data: ManuallyDrop, - thread_id: ThreadId, -} - -impl SendWrapper { - /// Create a `SendWrapper` wrapper around a value of type `T`. - /// The wrapper takes ownership of the value. - #[inline] - pub fn new(data: T) -> SendWrapper { - SendWrapper { - data: ManuallyDrop::new(data), - thread_id: current_id(), - } - } - - /// Returns `true` if the value can be safely accessed from within the - /// current thread. - #[inline] - pub fn valid(&self) -> bool { - self.thread_id == current_id() - } - - /// Returns a reference to the contained value. - /// - /// # Safety - /// - /// The caller should be in the same thread as the creator. - #[inline] - pub unsafe fn get_unchecked(&self) -> &T { - &self.data - } - - /// Returns a reference to the contained value, if valid. - #[inline] - #[allow(dead_code)] - pub fn get(&self) -> Option<&T> { - if self.valid() { Some(&self.data) } else { None } - } - - /// Returns a tracker that can be used to check if the current thread is - /// the same as the creator thread. - #[inline] - pub fn tracker(&self) -> SendWrapper<()> { - SendWrapper { - data: ManuallyDrop::new(()), - thread_id: self.thread_id, - } - } -} - -unsafe impl Send for SendWrapper {} -unsafe impl Sync for SendWrapper {} - -impl Drop for SendWrapper { - /// Drops the contained value. - /// - /// # Panics - /// - /// Dropping panics if it is done from a different thread than the one the - /// `SendWrapper` instance has been created with. - /// - /// Exceptions: - /// - There is no extra panic if the thread is already panicking/unwinding. - /// This is because otherwise there would be double panics (usually - /// resulting in an abort) when dereferencing from a wrong thread. - /// - If `T` has a trivial drop ([`needs_drop::()`] is false) then this - /// method never panics. - /// - /// [`needs_drop::()`]: std::mem::needs_drop - #[track_caller] - fn drop(&mut self) { - // If the drop is trivial (`needs_drop` = false), then dropping `T` can't access - // it and so it can be safely dropped on any thread. - if !mem::needs_drop::() || self.valid() { - unsafe { - // Drop the inner value - // - // SAFETY: - // - We've just checked that it's valid to drop `T` on this thread - // - We only move out from `self.data` here and in drop, so `self.data` is - // present - ManuallyDrop::drop(&mut self.data); - } - } else { - invalid_drop() - } - } -} - -#[cold] -#[inline(never)] -#[track_caller] -fn invalid_drop() { - const DROP_ERROR: &str = "Dropped SendWrapper variable from a thread different to the one \ - it has been created with."; - - if !thread::panicking() { - // panic because of dropping from wrong thread - // only do this while not unwinding (could be caused by deref from wrong thread) - panic!("{}", DROP_ERROR) - } -}