diff --git a/src/lib.rs b/src/lib.rs index e49cf74..b8ac55f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,7 +136,7 @@ macro_rules! main { ) => { $(#[$attr])* fn $name () $(-> $ret)? { - $crate::__private::block_on(async { + async_io::block_on(async { $bl }) } @@ -149,8 +149,8 @@ macro_rules! main { ) => { $(#[$post_attr])* fn $name () $(-> $ret)? { - <$exty as $crate::__private::MainExecutor>::with_main(|ex| { - $crate::__private::block_on(ex.run(async move { + <$exty as $crate::main_executor::MainExecutor>::with_main(|ex| { + async_io::block_on(ex.run(async move { let $ex = ex; $bl })) @@ -216,126 +216,5 @@ macro_rules! test { }; } -#[doc(hidden)] -pub mod __private { - pub use async_io::block_on; - pub use std::rc::Rc; - - use crate::{Executor, LocalExecutor}; - use event_listener::Event; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - use std::thread; - - /// Something that can be set up as an executor. - #[doc(hidden)] - pub trait MainExecutor: Sized { - /// Create this type and pass it into `main`. - fn with_main T>(f: F) -> T; - } - - impl MainExecutor for Arc> { - #[inline] - fn with_main T>(f: F) -> T { - let ex = Arc::new(Executor::new()); - with_thread_pool(&ex, || f(&ex)) - } - } - - impl MainExecutor for Executor<'_> { - #[inline] - fn with_main T>(f: F) -> T { - let ex = Executor::new(); - with_thread_pool(&ex, || f(&ex)) - } - } - - impl MainExecutor for Rc> { - #[inline] - fn with_main T>(f: F) -> T { - f(&Rc::new(LocalExecutor::new())) - } - } - - impl MainExecutor for LocalExecutor<'_> { - fn with_main T>(f: F) -> T { - f(&LocalExecutor::new()) - } - } - - /// Run a function that takes an `Executor` inside of a thread pool. - #[inline] - fn with_thread_pool(ex: &Executor<'_>, f: impl FnOnce() -> T) -> T { - let stopper = WaitForStop::new(); - - // Create a thread for each CPU. - thread::scope(|scope| { - let num_threads = thread::available_parallelism().map_or(1, |num| num.get()); - for i in 0..num_threads { - let ex = &ex; - let stopper = &stopper; - - thread::Builder::new() - .name(format!("smol-macros-{i}")) - .spawn_scoped(scope, || { - block_on(ex.run(stopper.wait())); - }) - .expect("failed to spawn thread"); - } - - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); - - stopper.stop(); - - match result { - Ok(value) => value, - Err(err) => std::panic::resume_unwind(err), - } - }) - } - - /// Wait for the executor to stop. - struct WaitForStop { - /// Whether or not we need to stop. - stopped: AtomicBool, - - /// Wait for the stop. - events: Event, - } - - impl WaitForStop { - /// Create a new wait for stop. - #[inline] - fn new() -> Self { - Self { - stopped: AtomicBool::new(false), - events: Event::new(), - } - } - - /// Wait for the event to stop. - #[inline] - async fn wait(&self) { - loop { - if self.stopped.load(Ordering::Relaxed) { - return; - } - - event_listener::listener!(&self.events => listener); - - if self.stopped.load(Ordering::Acquire) { - return; - } - - listener.await; - } - } - - /// Stop the waiter. - #[inline] - fn stop(&self) { - self.stopped.store(true, Ordering::SeqCst); - self.events.notify_additional(usize::MAX); - } - } -} +pub mod main_executor; +pub mod wait_for_stop; diff --git a/src/main_executor.rs b/src/main_executor.rs new file mode 100644 index 0000000..0eee8de --- /dev/null +++ b/src/main_executor.rs @@ -0,0 +1,72 @@ +use std::rc::Rc; +use std::sync::Arc; +use std::thread; + +use crate::wait_for_stop::WaitForStop; +use crate::{Executor, LocalExecutor}; + +/// Something that can be set up as an executor. +pub trait MainExecutor: Sized { + /// Create this type and pass it into `main`. + fn with_main T>(f: F) -> T; +} + +impl MainExecutor for Arc> { + #[inline] + fn with_main T>(f: F) -> T { + let ex = Arc::new(Executor::new()); + with_thread_pool(&ex, || f(&ex)) + } +} + +impl MainExecutor for Executor<'_> { + #[inline] + fn with_main T>(f: F) -> T { + let ex = Executor::new(); + with_thread_pool(&ex, || f(&ex)) + } +} + +impl MainExecutor for Rc> { + #[inline] + fn with_main T>(f: F) -> T { + f(&Rc::new(LocalExecutor::new())) + } +} + +impl MainExecutor for LocalExecutor<'_> { + fn with_main T>(f: F) -> T { + f(&LocalExecutor::new()) + } +} + +/// Run a function that takes an `Executor` inside of a thread pool. +#[inline] +fn with_thread_pool(ex: &Executor<'_>, f: impl FnOnce() -> T) -> T { + let stopper = WaitForStop::new(); + + // Create a thread for each CPU. + thread::scope(|scope| { + let num_threads = thread::available_parallelism().map_or(1, |num| num.get()); + for i in 0..num_threads { + let ex = &ex; + let stopper = &stopper; + + thread::Builder::new() + .name(format!("smol-macros-{i}")) + .spawn_scoped(scope, || { + async_io::block_on(ex.run(stopper.wait())); + }) + .expect("failed to spawn thread"); + } + + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); + + stopper.stop(); + + match result { + Ok(value) => value, + Err(err) => std::panic::resume_unwind(err), + } + }) +} \ No newline at end of file diff --git a/src/wait_for_stop.rs b/src/wait_for_stop.rs new file mode 100644 index 0000000..08f35a9 --- /dev/null +++ b/src/wait_for_stop.rs @@ -0,0 +1,47 @@ +use event_listener::Event; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Wait for the executor to stop. +pub(crate) struct WaitForStop { + /// Whether or not we need to stop. + stopped: AtomicBool, + + /// Wait for the stop. + events: Event, +} + +impl WaitForStop { + /// Create a new wait for stop. + #[inline] + pub(crate) fn new() -> Self { + Self { + stopped: AtomicBool::new(false), + events: Event::new(), + } + } + + /// Wait for the event to stop. + #[inline] + pub(crate) async fn wait(&self) { + loop { + if self.stopped.load(Ordering::Relaxed) { + return; + } + + event_listener::listener!(&self.events => listener); + + if self.stopped.load(Ordering::Acquire) { + return; + } + + listener.await; + } + } + + /// Stop the waiter. + #[inline] + pub(crate) fn stop(&self) { + self.stopped.store(true, Ordering::SeqCst); + self.events.notify_additional(usize::MAX); + } +}