diff --git a/Cargo.lock b/Cargo.lock index 97be21f..6593385 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,15 +26,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "bank_threads" -version = "0.1.0" -dependencies = [ - "spawned-concurrency", - "spawned-rt", - "tracing", -] - [[package]] name = "base64" version = "0.21.7" @@ -823,15 +814,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "ping_pong_threads" -version = "0.1.0" -dependencies = [ - "spawned-concurrency", - "spawned-rt", - "tracing", -] - [[package]] name = "pkg-config" version = "0.3.32" @@ -1406,17 +1388,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "updater_threads" -version = "0.1.0" -dependencies = [ - "futures", - "reqwest", - "spawned-concurrency", - "spawned-rt", - "tracing", -] - [[package]] name = "url" version = "2.5.7" diff --git a/Cargo.toml b/Cargo.toml index 14d1aad..ce5c4cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,9 @@ members = [ "rt", "concurrency", "examples/bank", - "examples/bank_threads", "examples/name_server", "examples/ping_pong", - "examples/ping_pong_threads", "examples/updater", - "examples/updater_threads", "examples/blocking_genserver", "examples/busy_genserver_warning", ] diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/gen_server.rs similarity index 60% rename from concurrency/src/tasks/gen_server.rs rename to concurrency/src/gen_server.rs index 15108a1..ffeab2a 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/gen_server.rs @@ -1,9 +1,7 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. -use crate::{ - error::GenServerError, - tasks::InitResult::{NoSuccess, Success}, -}; +use crate::error::GenServerError; +use InitResult::{NoSuccess, Success}; use core::pin::pin; use futures::future::{self, FutureExt as _}; use spawned_rt::{ @@ -14,6 +12,86 @@ use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); +/// Execution backend for GenServer. +/// +/// Determines how the GenServer's async loop is executed. Choose based on +/// the nature of your workload: +/// +/// # Backend Comparison +/// +/// | Backend | Execution Model | Best For | Limitations | +/// |---------|-----------------|----------|-------------| +/// | `Async` | Tokio task | Non-blocking I/O, async operations | Blocks runtime if sync code runs too long | +/// | `Blocking` | Tokio blocking pool | Short blocking operations (file I/O, DNS) | Shared pool with limited threads | +/// | `Thread` | Dedicated OS thread | Long-running blocking work, CPU-heavy tasks | Higher memory overhead per GenServer | +/// +/// # Examples +/// +/// ```ignore +/// // For typical async workloads (HTTP handlers, database queries) +/// let handle = MyServer::new().start(Backend::Async); +/// +/// // For occasional blocking operations (file reads, external commands) +/// let handle = MyServer::new().start(Backend::Blocking); +/// +/// // For CPU-intensive or permanently blocking services +/// let handle = MyServer::new().start(Backend::Thread); +/// ``` +/// +/// # When to Use Each Backend +/// +/// ## `Backend::Async` (Default) +/// - **Advantages**: Lightweight, efficient, good for high concurrency +/// - **Use when**: Your GenServer does mostly async I/O (network, database) +/// - **Avoid when**: Your code blocks (e.g., `std::thread::sleep`, heavy computation) +/// +/// ## `Backend::Blocking` +/// - **Advantages**: Prevents blocking the async runtime, uses tokio's managed pool +/// - **Use when**: You have occasional blocking operations that complete quickly +/// - **Avoid when**: You need guaranteed thread availability or long-running blocks +/// +/// ## `Backend::Thread` +/// - **Advantages**: Complete isolation, no interference with async runtime +/// - **Use when**: Long-running blocking work, singleton services, CPU-bound tasks +/// - **Avoid when**: You need many GenServers (each gets its own OS thread) +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum Backend { + /// Run on tokio async runtime (default). + /// + /// Best for non-blocking, async workloads. The GenServer runs as a + /// lightweight tokio task, enabling high concurrency with minimal overhead. + /// + /// **Warning**: If your `handle_call` or `handle_cast` blocks synchronously + /// (e.g., `std::thread::sleep`, CPU-heavy loops), it will block the entire + /// tokio runtime thread, affecting other tasks. + #[default] + Async, + + /// Run on tokio's blocking thread pool. + /// + /// Use for GenServers that perform blocking operations like: + /// - Synchronous file I/O + /// - DNS lookups + /// - External process calls + /// - Short CPU-bound computations + /// + /// The pool is shared across all `spawn_blocking` calls and has a default + /// limit of 512 threads. If the pool is exhausted, new blocking tasks wait. + Blocking, + + /// Run on a dedicated OS thread. + /// + /// Use for GenServers that: + /// - Block indefinitely or for long periods + /// - Need guaranteed thread availability + /// - Should not compete with other blocking tasks + /// - Run CPU-intensive workloads + /// + /// Each GenServer gets its own thread, providing complete isolation from + /// the async runtime. Higher memory overhead (~2MB stack per thread). + Thread, +} + #[derive(Debug)] pub struct GenServerHandle { pub tx: mpsc::Sender>, @@ -163,26 +241,19 @@ pub trait GenServer: Send + Sized { type OutMsg: Send + Sized; type Error: Debug + Send; - fn start(self) -> GenServerHandle { - GenServerHandle::new(self) - } - - /// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't - /// happen if the task is blocking the thread. As such, for sync compute task - /// or other blocking tasks need to be in their own separate thread, and the OS - /// will manage them through hardware interrupts. - /// Start blocking provides such thread. - fn start_blocking(self) -> GenServerHandle { - GenServerHandle::new_blocking(self) - } - - /// For some "singleton" GenServers that run througout the whole execution of the - /// program, it makes sense to run in their own dedicated thread to avoid interference - /// with the rest of the tasks' runtime. - /// The use of tokio::task::spawm_blocking is not recommended for these scenarios - /// as it is a limited thread pool better suited for blocking IO tasks that eventually end - fn start_on_thread(self) -> GenServerHandle { - GenServerHandle::new_on_thread(self) + /// Start the GenServer with the specified backend. + /// + /// # Arguments + /// * `backend` - The execution backend to use: + /// - `Backend::Async` - Run on tokio async runtime (default, best for non-blocking workloads) + /// - `Backend::Blocking` - Run on tokio's blocking thread pool (for blocking operations) + /// - `Backend::Thread` - Run on a dedicated OS thread (for long-running blocking services) + fn start(self, backend: Backend) -> GenServerHandle { + match backend { + Backend::Async => GenServerHandle::new(self), + Backend::Blocking => GenServerHandle::new_blocking(self), + Backend::Thread => GenServerHandle::new_on_thread(self), + } } fn run( @@ -407,7 +478,7 @@ mod warn_on_block { mod tests { use super::*; - use crate::{messages::Unused, tasks::send_after}; + use crate::{messages::Unused, send_after}; use std::{ sync::{Arc, Mutex}, thread, @@ -484,13 +555,16 @@ mod tests { } } + const ASYNC: Backend = Backend::Async; + const BLOCKING: Backend = Backend::Blocking; + #[test] pub fn badly_behaved_thread_non_blocking() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start(); + let mut badboy = BadlyBehavedTask.start(ASYNC); let _ = badboy.cast(Unused).await; - let mut goodboy = WellBehavedTask { count: 0 }.start(); + let mut goodboy = WellBehavedTask { count: 0 }.start(ASYNC); let _ = goodboy.cast(Unused).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -508,9 +582,9 @@ mod tests { pub fn badly_behaved_thread() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start_blocking(); + let mut badboy = BadlyBehavedTask.start(BLOCKING); let _ = badboy.cast(Unused).await; - let mut goodboy = WellBehavedTask { count: 0 }.start(); + let mut goodboy = WellBehavedTask { count: 0 }.start(ASYNC); let _ = goodboy.cast(Unused).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -565,7 +639,7 @@ mod tests { pub fn unresolving_task_times_out() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut unresolving_task = SomeTask.start(); + let mut unresolving_task = SomeTask.start(ASYNC); let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) @@ -615,7 +689,7 @@ mod tests { runtime.block_on(async move { let (rx, tx) = mpsc::channel::(); let sender_channel = Arc::new(Mutex::new(tx)); - let _task = SomeTaskThatFailsOnInit::new(sender_channel).start(); + let _task = SomeTaskThatFailsOnInit::new(sender_channel).start(ASYNC); // Wait a while to ensure the task has time to run and fail rt::sleep(Duration::from_secs(1)).await; @@ -624,4 +698,236 @@ mod tests { assert!(rx.is_closed()) }); } + + // ==================== Backend enum tests ==================== + + #[test] + pub fn backend_default_is_async() { + assert_eq!(Backend::default(), Backend::Async); + } + + #[test] + #[allow(clippy::clone_on_copy)] + pub fn backend_enum_is_copy_and_clone() { + let backend = Backend::Async; + let copied = backend; // Copy + let cloned = backend.clone(); // Clone - intentionally testing Clone trait + assert_eq!(backend, copied); + assert_eq!(backend, cloned); + } + + #[test] + pub fn backend_enum_debug_format() { + assert_eq!(format!("{:?}", Backend::Async), "Async"); + assert_eq!(format!("{:?}", Backend::Blocking), "Blocking"); + assert_eq!(format!("{:?}", Backend::Thread), "Thread"); + } + + #[test] + pub fn backend_enum_equality() { + assert_eq!(Backend::Async, Backend::Async); + assert_eq!(Backend::Blocking, Backend::Blocking); + assert_eq!(Backend::Thread, Backend::Thread); + assert_ne!(Backend::Async, Backend::Blocking); + assert_ne!(Backend::Async, Backend::Thread); + assert_ne!(Backend::Blocking, Backend::Thread); + } + + // ==================== Backend functionality tests ==================== + + /// Simple counter GenServer for testing all backends + struct Counter { + count: u64, + } + + #[derive(Clone)] + enum CounterCall { + Get, + Increment, + Stop, + } + + #[derive(Clone)] + enum CounterCast { + Increment, + } + + impl GenServer for Counter { + type CallMsg = CounterCall; + type CastMsg = CounterCast; + type OutMsg = u64; + type Error = (); + + async fn handle_call( + &mut self, + message: Self::CallMsg, + _: &GenServerHandle, + ) -> CallResponse { + match message { + CounterCall::Get => CallResponse::Reply(self.count), + CounterCall::Increment => { + self.count += 1; + CallResponse::Reply(self.count) + } + CounterCall::Stop => CallResponse::Stop(self.count), + } + } + + async fn handle_cast( + &mut self, + message: Self::CastMsg, + _: &GenServerHandle, + ) -> CastResponse { + match message { + CounterCast::Increment => { + self.count += 1; + CastResponse::NoReply + } + } + } + } + + #[test] + pub fn backend_async_handles_call_and_cast() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut counter = Counter { count: 0 }.start(Backend::Async); + + // Test call + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 0); + + let result = counter.call(CounterCall::Increment).await.unwrap(); + assert_eq!(result, 1); + + // Test cast + counter.cast(CounterCast::Increment).await.unwrap(); + rt::sleep(Duration::from_millis(10)).await; // Give time for cast to process + + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 2); + + // Stop + let final_count = counter.call(CounterCall::Stop).await.unwrap(); + assert_eq!(final_count, 2); + }); + } + + #[test] + pub fn backend_blocking_handles_call_and_cast() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut counter = Counter { count: 0 }.start(Backend::Blocking); + + // Test call + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 0); + + let result = counter.call(CounterCall::Increment).await.unwrap(); + assert_eq!(result, 1); + + // Test cast + counter.cast(CounterCast::Increment).await.unwrap(); + rt::sleep(Duration::from_millis(50)).await; // Give time for cast to process + + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 2); + + // Stop + let final_count = counter.call(CounterCall::Stop).await.unwrap(); + assert_eq!(final_count, 2); + }); + } + + #[test] + pub fn backend_thread_handles_call_and_cast() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut counter = Counter { count: 0 }.start(Backend::Thread); + + // Test call + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 0); + + let result = counter.call(CounterCall::Increment).await.unwrap(); + assert_eq!(result, 1); + + // Test cast + counter.cast(CounterCast::Increment).await.unwrap(); + rt::sleep(Duration::from_millis(50)).await; // Give time for cast to process + + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 2); + + // Stop + let final_count = counter.call(CounterCall::Stop).await.unwrap(); + assert_eq!(final_count, 2); + }); + } + + #[test] + pub fn backend_thread_isolates_blocking_work() { + // Similar to badly_behaved_thread but using Backend::Thread + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut badboy = BadlyBehavedTask.start(Backend::Thread); + let _ = badboy.cast(Unused).await; + let mut goodboy = WellBehavedTask { count: 0 }.start(ASYNC); + let _ = goodboy.cast(Unused).await; + rt::sleep(Duration::from_secs(1)).await; + let count = goodboy.call(InMessage::GetCount).await.unwrap(); + + // goodboy should have run normally because badboy is on a separate thread + match count { + OutMsg::Count(num) => { + assert_eq!(num, 10); + } + } + goodboy.call(InMessage::Stop).await.unwrap(); + }); + } + + #[test] + pub fn multiple_backends_concurrent() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Start counters on all three backends + let mut async_counter = Counter { count: 0 }.start(Backend::Async); + let mut blocking_counter = Counter { count: 100 }.start(Backend::Blocking); + let mut thread_counter = Counter { count: 200 }.start(Backend::Thread); + + // Increment each + async_counter.call(CounterCall::Increment).await.unwrap(); + blocking_counter.call(CounterCall::Increment).await.unwrap(); + thread_counter.call(CounterCall::Increment).await.unwrap(); + + // Verify each has independent state + let async_val = async_counter.call(CounterCall::Get).await.unwrap(); + let blocking_val = blocking_counter.call(CounterCall::Get).await.unwrap(); + let thread_val = thread_counter.call(CounterCall::Get).await.unwrap(); + + assert_eq!(async_val, 1); + assert_eq!(blocking_val, 101); + assert_eq!(thread_val, 201); + + // Clean up + async_counter.call(CounterCall::Stop).await.unwrap(); + blocking_counter.call(CounterCall::Stop).await.unwrap(); + thread_counter.call(CounterCall::Stop).await.unwrap(); + }); + } + + #[test] + pub fn backend_default_works_in_start() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Using Backend::default() should work the same as Backend::Async + let mut counter = Counter { count: 42 }.start(Backend::default()); + + let result = counter.call(CounterCall::Get).await.unwrap(); + assert_eq!(result, 42); + + counter.call(CounterCall::Stop).await.unwrap(); + }); + } } diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index 0edcab8..4894147 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -1,6 +1,22 @@ //! spawned concurrency //! Some basic traits and structs to implement concurrent code à-la-Erlang. pub mod error; +mod gen_server; pub mod messages; -pub mod tasks; -pub mod threads; +mod process; +mod stream; +mod time; + +#[cfg(test)] +mod stream_tests; +#[cfg(test)] +mod timer_tests; + +pub use error::GenServerError; +pub use gen_server::{ + send_message_on, Backend, CallResponse, CastResponse, GenServer, GenServerHandle, + GenServerInMsg, InitResult, InitResult::NoSuccess, InitResult::Success, +}; +pub use process::{send, Process, ProcessInfo}; +pub use stream::spawn_listener; +pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/process.rs b/concurrency/src/process.rs similarity index 100% rename from concurrency/src/tasks/process.rs rename to concurrency/src/process.rs diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/stream.rs similarity index 97% rename from concurrency/src/tasks/stream.rs rename to concurrency/src/stream.rs index 492c4f9..1448d04 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/stream.rs @@ -1,4 +1,4 @@ -use crate::tasks::{GenServer, GenServerHandle}; +use crate::{GenServer, GenServerHandle}; use futures::{future::select, Stream, StreamExt}; use spawned_rt::tasks::JoinHandle; diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/stream_tests.rs similarity index 92% rename from concurrency/src/tasks/stream_tests.rs rename to concurrency/src/stream_tests.rs index bebc023..e266eff 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/stream_tests.rs @@ -1,5 +1,5 @@ -use crate::tasks::{ - send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, +use crate::{ + send_after, spawn_listener, Backend, CallResponse, CastResponse, GenServer, GenServerHandle, }; use futures::{stream, StreamExt}; use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; @@ -67,7 +67,7 @@ impl GenServer for Summatory { pub fn test_sum_numbers_from_stream() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let stream = stream::iter(vec![1u16, 2, 3, 4, 5].into_iter().map(Ok::)); spawn_listener( @@ -87,7 +87,7 @@ pub fn test_sum_numbers_from_stream() { pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel @@ -115,7 +115,7 @@ pub fn test_sum_numbers_from_channel() { pub fn test_sum_numbers_from_broadcast_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let (tx, rx) = tokio::sync::broadcast::channel::(5); // Spawn a task to send numbers to the channel @@ -145,7 +145,7 @@ pub fn test_stream_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel @@ -192,7 +192,7 @@ pub fn test_stream_cancellation() { pub fn test_halting_on_stream_error() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let stream = tokio_stream::iter(vec![Ok(1u16), Ok(2), Ok(3), Err(()), Ok(4), Ok(5)]); let msg_stream = stream.filter_map(|value| async move { match value { @@ -216,7 +216,7 @@ pub fn test_halting_on_stream_error() { pub fn test_skipping_on_stream_error() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start(Backend::Async); let stream = tokio_stream::iter(vec![Ok(1u16), Ok(2), Ok(3), Err(()), Ok(4), Ok(5)]); let msg_stream = stream.filter_map(|value| async move { match value { diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs deleted file mode 100644 index 6936162..0000000 --- a/concurrency/src/tasks/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! spawned concurrency -//! Runtime tasks-based traits and structs to implement concurrent code à-la-Erlang. - -mod gen_server; -mod process; -mod stream; -mod time; - -#[cfg(test)] -mod stream_tests; -#[cfg(test)] -mod timer_tests; - -pub use gen_server::{ - send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, - InitResult, InitResult::NoSuccess, InitResult::Success, -}; -pub use process::{send, Process, ProcessInfo}; -pub use stream::spawn_listener; -pub use time::{send_after, send_interval}; diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs deleted file mode 100644 index 0237b85..0000000 --- a/concurrency/src/threads/gen_server.rs +++ /dev/null @@ -1,217 +0,0 @@ -//! GenServer trait and structs to create an abstraction similar to Erlang gen_server. -//! See examples/name_server for a usage example. -use spawned_rt::threads::{self as rt, mpsc, oneshot, CancellationToken}; -use std::{ - fmt::Debug, - panic::{catch_unwind, AssertUnwindSafe}, -}; - -use crate::error::GenServerError; - -#[derive(Debug)] -pub struct GenServerHandle { - pub tx: mpsc::Sender>, - cancellation_token: CancellationToken, -} - -impl Clone for GenServerHandle { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - cancellation_token: self.cancellation_token.clone(), - } - } -} - -impl GenServerHandle { - pub(crate) fn new(gen_server: G) -> Self { - let (tx, mut rx) = mpsc::channel::>(); - let cancellation_token = CancellationToken::new(); - let handle = GenServerHandle { - tx, - cancellation_token, - }; - let handle_clone = handle.clone(); - // Ignore the JoinHandle for now. Maybe we'll use it in the future - let _join_handle = rt::spawn(move || { - if gen_server.run(&handle, &mut rx).is_err() { - tracing::trace!("GenServer crashed") - }; - }); - handle_clone - } - - pub fn sender(&self) -> mpsc::Sender> { - self.tx.clone() - } - - pub fn call(&mut self, message: G::CallMsg) -> Result { - let (oneshot_tx, oneshot_rx) = oneshot::channel::>(); - self.tx.send(GenServerInMsg::Call { - sender: oneshot_tx, - message, - })?; - match oneshot_rx.recv() { - Ok(result) => result, - Err(_) => Err(GenServerError::Server), - } - } - - pub fn cast(&mut self, message: G::CastMsg) -> Result<(), GenServerError> { - self.tx - .send(GenServerInMsg::Cast { message }) - .map_err(|_error| GenServerError::Server) - } - - pub fn cancellation_token(&self) -> CancellationToken { - self.cancellation_token.clone() - } -} - -pub enum GenServerInMsg { - Call { - sender: oneshot::Sender>, - message: G::CallMsg, - }, - Cast { - message: G::CastMsg, - }, -} - -pub enum CallResponse { - Reply(G::OutMsg), - Unused, - Stop(G::OutMsg), -} - -pub enum CastResponse { - NoReply, - Unused, - Stop, -} - -pub trait GenServer: Send + Sized { - type CallMsg: Clone + Send + Sized; - type CastMsg: Clone + Send + Sized; - type OutMsg: Send + Sized; - type Error: Debug; - - fn start(self) -> GenServerHandle { - GenServerHandle::new(self) - } - - /// We copy the same interface as tasks, but all threads can work - /// while blocking by default - fn start_blocking(self) -> GenServerHandle { - GenServerHandle::new(self) - } - - fn run( - self, - handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, - ) -> Result<(), GenServerError> { - let mut cancellation_token = handle.cancellation_token.clone(); - let res = match self.init(handle) { - Ok(new_state) => Ok(new_state.main_loop(handle, rx)?), - Err(err) => { - tracing::error!("Initialization failed: {err:?}"); - Err(GenServerError::Initialization) - } - }; - cancellation_token.cancel(); - res - } - - /// Initialization function. It's called before main loop. It - /// can be overrided on implementations in case initial steps are - /// required. - fn init(self, _handle: &GenServerHandle) -> Result { - Ok(self) - } - - fn main_loop( - mut self, - handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, - ) -> Result<(), GenServerError> { - loop { - if !self.receive(handle, rx)? { - break; - } - } - tracing::trace!("Stopping GenServer"); - Ok(()) - } - - fn receive( - &mut self, - handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, - ) -> Result { - let message = rx.recv().ok(); - - let keep_running = match message { - Some(GenServerInMsg::Call { sender, message }) => { - let (keep_running, response) = match catch_unwind(AssertUnwindSafe(|| { - self.handle_call(message, handle) - })) { - Ok(response) => match response { - CallResponse::Reply(response) => (true, Ok(response)), - CallResponse::Stop(response) => (false, Ok(response)), - CallResponse::Unused => { - tracing::error!("GenServer received unexpected CallMessage"); - (false, Err(GenServerError::CallMsgUnused)) - } - }, - Err(error) => { - tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); - (true, Err(GenServerError::Callback)) - } - }; - // Send response back - if sender.send(response).is_err() { - tracing::trace!("GenServer failed to send response back, client must have died") - }; - keep_running - } - Some(GenServerInMsg::Cast { message }) => { - match catch_unwind(AssertUnwindSafe(|| self.handle_cast(message, handle))) { - Ok(response) => match response { - CastResponse::NoReply => true, - CastResponse::Stop => false, - CastResponse::Unused => { - tracing::error!("GenServer received unexpected CastMessage"); - false - } - }, - Err(error) => { - tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); - true - } - } - } - None => { - // Channel has been closed; won't receive further messages. Stop the server. - false - } - }; - Ok(keep_running) - } - - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &GenServerHandle, - ) -> CallResponse { - CallResponse::Unused - } - - fn handle_cast( - &mut self, - _message: Self::CastMsg, - _handle: &GenServerHandle, - ) -> CastResponse { - CastResponse::Unused - } -} diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs deleted file mode 100644 index 193af89..0000000 --- a/concurrency/src/threads/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! spawned concurrency -//! IO threads-based traits and structs to implement concurrent code à-la-Erlang. - -mod gen_server; -mod process; -mod stream; -mod time; - -#[cfg(test)] -mod timer_tests; - -pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -pub use process::{send, Process, ProcessInfo}; -pub use stream::spawn_listener; -pub use time::{send_after, send_interval}; diff --git a/concurrency/src/threads/process.rs b/concurrency/src/threads/process.rs deleted file mode 100644 index 3dfd87d..0000000 --- a/concurrency/src/threads/process.rs +++ /dev/null @@ -1,71 +0,0 @@ -//! Process trait and struct to create a process abstraction similar to Erlang processes. -//! See examples/ping_pong for a usage example. - -use spawned_rt::threads::{self as rt, mpsc, JoinHandle}; - -#[derive(Debug)] -pub struct ProcessInfo { - pub tx: mpsc::Sender, - pub handle: JoinHandle<()>, -} - -impl ProcessInfo { - pub fn sender(&self) -> mpsc::Sender { - self.tx.clone() - } - - pub fn handle(self) -> JoinHandle<()> { - self.handle - } -} - -pub trait Process -where - Self: Send + Sync + Sized + 'static, -{ - fn spawn(mut self) -> ProcessInfo { - let (tx, mut rx) = mpsc::channel::(); - let tx_clone = tx.clone(); - let handle = rt::spawn(move || self.run(&tx_clone, &mut rx)); - ProcessInfo { tx, handle } - } - - fn run(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) { - self.init(tx); - self.main_loop(tx, rx); - } - - fn main_loop(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) { - loop { - if self.should_stop() { - break; - } - - self.receive(tx, rx); - } - } - - fn should_stop(&self) -> bool { - false - } - - fn init(&mut self, _tx: &mpsc::Sender) { - {} - } - - fn receive(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) -> T { - match rx.recv().ok() { - Some(message) => self.handle(message, tx), - None => todo!(), - } - } - - fn handle(&mut self, message: T, tx: &mpsc::Sender) -> T; -} - -pub fn send(tx: &mpsc::Sender, message: T) -where - T: Send, -{ - let _ = tx.send(message); -} diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs deleted file mode 100644 index a4fd749..0000000 --- a/concurrency/src/threads/stream.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::threads::{GenServer, GenServerHandle}; - -use futures::Stream; - -/// Spawns a listener that listens to a stream and sends messages to a GenServer. -/// -/// Items sent through the stream are required to be wrapped in a Result type. -pub fn spawn_listener(_handle: GenServerHandle, _message_builder: F, _stream: S) -where - T: GenServer + 'static, - F: Fn(I) -> T::CastMsg + Send + 'static, - I: Send + 'static, - E: std::fmt::Debug + Send + 'static, - S: Unpin + Send + Stream> + 'static, -{ - unimplemented!("Unsupported function in threads mode") -} diff --git a/concurrency/src/threads/time.rs b/concurrency/src/threads/time.rs deleted file mode 100644 index 3d47c05..0000000 --- a/concurrency/src/threads/time.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::time::Duration; - -use spawned_rt::threads::{self as rt, CancellationToken, JoinHandle}; - -use super::{GenServer, GenServerHandle}; - -pub struct TimerHandle { - pub join_handle: JoinHandle<()>, - pub cancellation_token: CancellationToken, -} - -// Sends a message after a given period to the specified GenServer. The task terminates -// once the send has completed -pub fn send_after( - period: Duration, - mut handle: GenServerHandle, - message: T::CastMsg, -) -> TimerHandle -where - T: GenServer + 'static, -{ - let cancellation_token = CancellationToken::new(); - let mut cloned_token = cancellation_token.clone(); - let join_handle = rt::spawn(move || { - rt::sleep(period); - if !cloned_token.is_cancelled() { - let _ = handle.cast(message); - }; - }); - TimerHandle { - join_handle, - cancellation_token, - } -} - -// Sends a message to the specified GenServe repeatedly after `Time` milliseconds. -pub fn send_interval( - period: Duration, - mut handle: GenServerHandle, - message: T::CastMsg, -) -> TimerHandle -where - T: GenServer + 'static, -{ - let cancellation_token = CancellationToken::new(); - let mut cloned_token = cancellation_token.clone(); - let join_handle = rt::spawn(move || loop { - rt::sleep(period); - if cloned_token.is_cancelled() { - break; - } else { - let _ = handle.cast(message.clone()); - }; - }); - TimerHandle { - join_handle, - cancellation_token, - } -} diff --git a/concurrency/src/threads/timer_tests.rs b/concurrency/src/threads/timer_tests.rs deleted file mode 100644 index 446b147..0000000 --- a/concurrency/src/threads/timer_tests.rs +++ /dev/null @@ -1,221 +0,0 @@ -use crate::threads::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle}; -use spawned_rt::threads::{self as rt, CancellationToken}; -use std::time::Duration; - -use super::send_after; - -type RepeaterHandle = GenServerHandle; - -#[derive(Clone)] -enum RepeaterCastMessage { - Inc, - StopTimer, -} - -#[derive(Clone)] -enum RepeaterCallMessage { - GetCount, -} - -#[derive(PartialEq, Debug)] -enum RepeaterOutMessage { - Count(i32), -} - -#[derive(Clone)] -struct Repeater { - pub(crate) count: i32, - pub(crate) cancellation_token: Option, -} - -impl Repeater { - pub fn new(initial_count: i32) -> Self { - Repeater { - count: initial_count, - cancellation_token: None, - } - } -} - -impl Repeater { - pub fn stop_timer(server: &mut RepeaterHandle) -> Result<(), ()> { - server.cast(RepeaterCastMessage::StopTimer).map_err(|_| ()) - } - - pub fn get_count(server: &mut RepeaterHandle) -> Result { - server.call(RepeaterCallMessage::GetCount).map_err(|_| ()) - } -} - -impl GenServer for Repeater { - type CallMsg = RepeaterCallMessage; - type CastMsg = RepeaterCastMessage; - type OutMsg = RepeaterOutMessage; - type Error = (); - - fn init(mut self, handle: &RepeaterHandle) -> Result { - let timer = send_interval( - Duration::from_millis(100), - handle.clone(), - RepeaterCastMessage::Inc, - ); - self.cancellation_token = Some(timer.cancellation_token); - Ok(self) - } - - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &RepeaterHandle, - ) -> CallResponse { - let count = self.count; - CallResponse::Reply(RepeaterOutMessage::Count(count)) - } - - fn handle_cast( - &mut self, - message: Self::CastMsg, - _handle: &GenServerHandle, - ) -> CastResponse { - match message { - RepeaterCastMessage::Inc => { - self.count += 1; - } - RepeaterCastMessage::StopTimer => { - if let Some(mut ct) = self.cancellation_token.clone() { - ct.cancel() - }; - } - }; - CastResponse::NoReply - } -} - -#[test] -pub fn test_send_interval_and_cancellation() { - // Start a Repeater - let mut repeater = Repeater::new(0).start(); - - // Wait for 1 second - rt::sleep(Duration::from_secs(1)); - - // Check count - let count = Repeater::get_count(&mut repeater).unwrap(); - - // 9 messages in 1 second (after first 100 milliseconds sleep) - assert_eq!(RepeaterOutMessage::Count(9), count); - - // Pause timer - Repeater::stop_timer(&mut repeater).unwrap(); - - // Wait another second - rt::sleep(Duration::from_secs(1)); - - // Check count again - let count2 = Repeater::get_count(&mut repeater).unwrap(); - - // As timer was paused, count should remain at 9 - assert_eq!(RepeaterOutMessage::Count(9), count2); -} - -type DelayedHandle = GenServerHandle; - -#[derive(Clone)] -enum DelayedCastMessage { - Inc, -} - -#[derive(Clone)] -enum DelayedCallMessage { - GetCount, -} - -#[derive(PartialEq, Debug)] -enum DelayedOutMessage { - Count(i32), -} - -#[derive(Clone)] -struct Delayed { - pub(crate) count: i32, -} - -impl Delayed { - pub fn new(initial_count: i32) -> Self { - Delayed { - count: initial_count, - } - } -} - -impl Delayed { - pub fn get_count(server: &mut DelayedHandle) -> Result { - server.call(DelayedCallMessage::GetCount).map_err(|_| ()) - } -} - -impl GenServer for Delayed { - type CallMsg = DelayedCallMessage; - type CastMsg = DelayedCastMessage; - type OutMsg = DelayedOutMessage; - type Error = (); - - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &DelayedHandle, - ) -> CallResponse { - let count = self.count; - CallResponse::Reply(DelayedOutMessage::Count(count)) - } - - fn handle_cast(&mut self, message: Self::CastMsg, _handle: &DelayedHandle) -> CastResponse { - match message { - DelayedCastMessage::Inc => { - self.count += 1; - } - }; - CastResponse::NoReply - } -} - -#[test] -pub fn test_send_after_and_cancellation() { - // Start a Delayed - let mut repeater = Delayed::new(0).start(); - - // Set a just once timed message - let _ = send_after( - Duration::from_millis(100), - repeater.clone(), - DelayedCastMessage::Inc, - ); - - // Wait for 200 milliseconds - rt::sleep(Duration::from_millis(200)); - - // Check count - let count = Delayed::get_count(&mut repeater).unwrap(); - - // Only one message (no repetition) - assert_eq!(DelayedOutMessage::Count(1), count); - - // New timer - let mut timer = send_after( - Duration::from_millis(100), - repeater.clone(), - DelayedCastMessage::Inc, - ); - - // Cancel the new timer before timeout - timer.cancellation_token.cancel(); - - // Wait another 200 milliseconds - rt::sleep(Duration::from_millis(200)); - - // Check count again - let count2 = Delayed::get_count(&mut repeater).unwrap(); - - // As timer was cancelled, count should remain at 1 - assert_eq!(DelayedOutMessage::Count(1), count2); -} diff --git a/concurrency/src/tasks/time.rs b/concurrency/src/time.rs similarity index 100% rename from concurrency/src/tasks/time.rs rename to concurrency/src/time.rs diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/timer_tests.rs similarity index 95% rename from concurrency/src/tasks/timer_tests.rs rename to concurrency/src/timer_tests.rs index 9697513..b625e4a 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/timer_tests.rs @@ -1,6 +1,6 @@ use super::{ - send_after, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle, InitResult, - InitResult::Success, + send_after, send_interval, Backend, CallResponse, CastResponse, GenServer, GenServerHandle, + InitResult, InitResult::Success, }; use spawned_rt::tasks::{self as rt, CancellationToken}; use std::time::Duration; @@ -102,7 +102,7 @@ pub fn test_send_interval_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Repeater - let mut repeater = Repeater::new(0).start(); + let mut repeater = Repeater::new(0).start(Backend::Async); // Wait for 1 second rt::sleep(Duration::from_secs(1)).await; @@ -209,7 +209,7 @@ pub fn test_send_after_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start(); + let mut repeater = Delayed::new(0).start(Backend::Async); // Set a just once timed message let _ = send_after( @@ -253,7 +253,7 @@ pub fn test_send_after_gen_server_teardown() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start(); + let mut repeater = Delayed::new(0).start(Backend::Async); // Set a just once timed message let _ = send_after( diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 37485c8..bbc243a 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -24,13 +24,13 @@ mod server; use messages::{BankError, BankOutMessage}; use server::Bank; -use spawned_concurrency::tasks::GenServer as _; +use spawned_concurrency::{Backend, GenServer as _}; use spawned_rt::tasks as rt; fn main() { rt::run(async { // Starting the bank - let mut name_server = Bank::new().start(); + let mut name_server = Bank::new().start(Backend::Async); // Testing initial balance for "main" account let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await; diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index 2d6587a..d0f288e 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -1,11 +1,7 @@ use std::collections::HashMap; use spawned_concurrency::{ - messages::Unused, - tasks::{ - CallResponse, GenServer, GenServerHandle, - InitResult::{self, Success}, - }, + messages::Unused, CallResponse, GenServer, GenServerHandle, InitResult, Success, }; use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage}; diff --git a/examples/bank_threads/Cargo.toml b/examples/bank_threads/Cargo.toml deleted file mode 100644 index 0f4f4e0..0000000 --- a/examples/bank_threads/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "bank_threads" -version = "0.1.0" -edition = "2021" - -[dependencies] -spawned-rt = { workspace = true } -spawned-concurrency = { workspace = true } -tracing = { workspace = true } - -[[bin]] -name = "bank_threads" -path = "src/main.rs" \ No newline at end of file diff --git a/examples/bank_threads/src/main.rs b/examples/bank_threads/src/main.rs deleted file mode 100644 index 4fbca29..0000000 --- a/examples/bank_threads/src/main.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Simple example to test concurrency/Process abstraction. -//! -//! Based on Joe's Armstrong book: Programming Erlang, Second edition -//! Section 22.1 - The Road to the Generic Server -//! -//! Erlang usage example: -//! 1> my_bank:start(). -//! {ok,<0.33.0>} -//! 2> my_bank:deposit("joe", 10). -//! not_a_customer -//! 3> my_bank:new_account("joe"). -//! {welcome,"joe"} -//! 4> my_bank:deposit("joe", 10). -//! {thanks,"joe",your_balance_is,10} -//! 5> my_bank:deposit("joe", 30). -//! {thanks,"joe",your_balance_is,40} -//! 6> my_bank:withdraw("joe", 15). -//! {thanks,"joe",your_balance_is,25} -//! 7> my_bank:withdraw("joe", 45). -//! {sorry,"joe",you_only_have,25,in_the_bank - -mod messages; -mod server; - -use messages::{BankError, BankOutMessage}; -use server::Bank; -use spawned_concurrency::threads::GenServer as _; -use spawned_rt::threads as rt; - -fn main() { - rt::run(|| { - // Starting the bank - let mut name_server = Bank::new().start(); - - // Testing initial balance for "main" account - let result = Bank::withdraw(&mut name_server, "main".to_string(), 15); - tracing::info!("Withdraw result {result:?}"); - assert_eq!( - result, - Ok(BankOutMessage::WidrawOk { - who: "main".to_string(), - amount: 985 - }) - ); - - let joe = "Joe".to_string(); - - // Error on deposit for an unexistent account - let result = Bank::deposit(&mut name_server, joe.clone(), 10); - tracing::info!("Deposit result {result:?}"); - assert_eq!(result, Err(BankError::NotACustomer { who: joe.clone() })); - - // Account creation - let result = Bank::new_account(&mut name_server, "Joe".to_string()); - tracing::info!("New account result {result:?}"); - assert_eq!(result, Ok(BankOutMessage::Welcome { who: joe.clone() })); - - // Deposit - let result = Bank::deposit(&mut name_server, "Joe".to_string(), 10); - tracing::info!("Deposit result {result:?}"); - assert_eq!( - result, - Ok(BankOutMessage::Balance { - who: joe.clone(), - amount: 10 - }) - ); - - // Deposit - let result = Bank::deposit(&mut name_server, "Joe".to_string(), 30); - tracing::info!("Deposit result {result:?}"); - assert_eq!( - result, - Ok(BankOutMessage::Balance { - who: joe.clone(), - amount: 40 - }) - ); - - // Withdrawal - let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 15); - tracing::info!("Withdraw result {result:?}"); - assert_eq!( - result, - Ok(BankOutMessage::WidrawOk { - who: joe.clone(), - amount: 25 - }) - ); - - // Withdrawal with not enough balance - let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 45); - tracing::info!("Withdraw result {result:?}"); - assert_eq!( - result, - Err(BankError::InsufficientBalance { - who: joe.clone(), - amount: 25 - }) - ); - - // Full withdrawal - let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25); - tracing::info!("Withdraw result {result:?}"); - assert_eq!( - result, - Ok(BankOutMessage::WidrawOk { - who: joe, - amount: 0 - }) - ); - - // Stopping the bank - let result = Bank::stop(&mut name_server); - tracing::info!("Stop result {result:?}"); - assert_eq!(result, Ok(BankOutMessage::Stopped)); - }) -} diff --git a/examples/bank_threads/src/messages.rs b/examples/bank_threads/src/messages.rs deleted file mode 100644 index d58ae9d..0000000 --- a/examples/bank_threads/src/messages.rs +++ /dev/null @@ -1,25 +0,0 @@ -#[derive(Debug, Clone)] -pub enum BankInMessage { - New { who: String }, - Add { who: String, amount: i32 }, - Remove { who: String, amount: i32 }, - Stop, -} - -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq)] -pub enum BankOutMessage { - Welcome { who: String }, - Balance { who: String, amount: i32 }, - WidrawOk { who: String, amount: i32 }, - Stopped, -} - -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq)] -pub enum BankError { - AlreadyACustomer { who: String }, - NotACustomer { who: String }, - InsufficientBalance { who: String, amount: i32 }, - ServerError, -} diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs deleted file mode 100644 index baeb71a..0000000 --- a/examples/bank_threads/src/server.rs +++ /dev/null @@ -1,104 +0,0 @@ -use std::collections::HashMap; - -use spawned_concurrency::{ - messages::Unused, - threads::{CallResponse, GenServer, GenServerHandle}, -}; - -use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage}; - -type MsgResult = Result; -type BankHandle = GenServerHandle; - -#[derive(Clone)] -pub struct Bank { - accounts: HashMap, -} - -impl Bank { - pub fn new() -> Self { - Bank { - accounts: HashMap::new(), - } - } -} - -impl Bank { - pub fn stop(server: &mut BankHandle) -> MsgResult { - server - .call(InMessage::Stop) - .unwrap_or(Err(BankError::ServerError)) - } - - pub fn new_account(server: &mut BankHandle, who: String) -> MsgResult { - server - .call(InMessage::New { who }) - .unwrap_or(Err(BankError::ServerError)) - } - - pub fn deposit(server: &mut BankHandle, who: String, amount: i32) -> MsgResult { - server - .call(InMessage::Add { who, amount }) - .unwrap_or(Err(BankError::ServerError)) - } - - pub fn withdraw(server: &mut BankHandle, who: String, amount: i32) -> MsgResult { - server - .call(InMessage::Remove { who, amount }) - .unwrap_or(Err(BankError::ServerError)) - } -} - -impl GenServer for Bank { - type CallMsg = InMessage; - type CastMsg = Unused; - type OutMsg = MsgResult; - type Error = BankError; - - // Initializing "main" account with 1000 in balance to test init() callback. - fn init(mut self, _handle: &GenServerHandle) -> Result { - self.accounts.insert("main".to_string(), 1000); - Ok(self) - } - - fn handle_call(&mut self, message: Self::CallMsg, _handle: &BankHandle) -> CallResponse { - match message.clone() { - Self::CallMsg::New { who } => match self.accounts.get(&who) { - Some(_amount) => CallResponse::Reply(Err(BankError::AlreadyACustomer { who })), - None => { - self.accounts.insert(who.clone(), 0); - CallResponse::Reply(Ok(OutMessage::Welcome { who })) - } - }, - Self::CallMsg::Add { who, amount } => match self.accounts.get(&who) { - Some(current) => { - let new_amount = current + amount; - self.accounts.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::Balance { - who, - amount: new_amount, - })) - } - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), - }, - Self::CallMsg::Remove { who, amount } => match self.accounts.get(&who) { - Some(¤t) => match current < amount { - true => CallResponse::Reply(Err(BankError::InsufficientBalance { - who, - amount: current, - })), - false => { - let new_amount = current - amount; - self.accounts.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::WidrawOk { - who, - amount: new_amount, - })) - } - }, - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), - }, - Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), - } - } -} diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 981f5ab..8188d51 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -2,8 +2,8 @@ use spawned_rt::tasks as rt; use std::time::Duration; use std::{process::exit, thread}; -use spawned_concurrency::tasks::{ - CallResponse, CastResponse, GenServer, GenServerHandle, send_after, +use spawned_concurrency::{ + Backend, CallResponse, CastResponse, GenServer, GenServerHandle, send_after, }; // We test a scenario with a badly behaved task @@ -93,15 +93,15 @@ impl GenServer for WellBehavedTask { } } -/// Example of start_blocking to fix issues #8 https://github.com/lambdaclass/spawned/issues/8 +/// Example of Backend::Thread to fix issues #8 https://github.com/lambdaclass/spawned/issues/8 /// Tasks that block can block the entire tokio runtime (and other cooperative multitasking models) -/// To fix this we implement start_blocking, which under the hood launches a new thread to deal with the issue +/// To fix this we use Backend::Thread, which under the hood launches a new thread to deal with the issue pub fn main() { rt::run(async move { - // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::new().start_on_thread(); + // If we change BadlyBehavedTask to Backend::Async instead, it can stop the entire program + let mut badboy = BadlyBehavedTask::new().start(Backend::Thread); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::new(0).start(); + let mut goodboy = WellBehavedTask::new(0).start(Backend::Async); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); diff --git a/examples/busy_genserver_warning/main.rs b/examples/busy_genserver_warning/main.rs index 2d6d6ef..dd6e325 100644 --- a/examples/busy_genserver_warning/main.rs +++ b/examples/busy_genserver_warning/main.rs @@ -3,7 +3,7 @@ use std::time::Duration; use std::{process::exit, thread}; use tracing::info; -use spawned_concurrency::tasks::{CallResponse, CastResponse, GenServer, GenServerHandle}; +use spawned_concurrency::{Backend, CallResponse, CastResponse, GenServer, GenServerHandle}; // We test a scenario with a badly behaved task struct BusyWorker; @@ -63,8 +63,8 @@ impl GenServer for BusyWorker { /// whenever we detect tasks that take too long to run. pub fn main() { rt::run(async move { - // If we change BusyWorker to start_blocking instead, it won't print the warning - let mut badboy = BusyWorker::new().start(); + // If we change BusyWorker to Backend::Blocking instead, it won't print the warning + let mut badboy = BusyWorker::new().start(Backend::Async); let _ = badboy.cast(()).await; rt::sleep(Duration::from_secs(5)).await; diff --git a/examples/name_server/src/main.rs b/examples/name_server/src/main.rs index 22e91c7..abfe73f 100644 --- a/examples/name_server/src/main.rs +++ b/examples/name_server/src/main.rs @@ -16,12 +16,12 @@ mod server; use messages::NameServerOutMessage; use server::NameServer; -use spawned_concurrency::tasks::GenServer as _; +use spawned_concurrency::{Backend, GenServer as _}; use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut name_server = NameServer::new().start(); + let mut name_server = NameServer::new().start(Backend::Async); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index 90d017e..f7b152e 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use spawned_concurrency::{ - messages::Unused, - tasks::{CallResponse, GenServer, GenServerHandle}, + messages::Unused, CallResponse, GenServer, GenServerHandle, }; use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; diff --git a/examples/ping_pong/src/consumer.rs b/examples/ping_pong/src/consumer.rs index 8ead269..0efe7c6 100644 --- a/examples/ping_pong/src/consumer.rs +++ b/examples/ping_pong/src/consumer.rs @@ -1,4 +1,4 @@ -use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; +use spawned_concurrency::{self as concurrency, Process, ProcessInfo}; use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; diff --git a/examples/ping_pong/src/producer.rs b/examples/ping_pong/src/producer.rs index 71829a1..7bf04a5 100644 --- a/examples/ping_pong/src/producer.rs +++ b/examples/ping_pong/src/producer.rs @@ -1,4 +1,4 @@ -use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; +use spawned_concurrency::{self as concurrency, Process, ProcessInfo}; use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; diff --git a/examples/ping_pong_threads/Cargo.toml b/examples/ping_pong_threads/Cargo.toml deleted file mode 100644 index fb2b28a..0000000 --- a/examples/ping_pong_threads/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "ping_pong_threads" -version = "0.1.0" -edition = "2021" - -[dependencies] -spawned-rt = { workspace = true } -spawned-concurrency = { workspace = true } -tracing = { workspace = true } - -[[bin]] -name = "ping_pong_threads" -path = "src/main.rs" \ No newline at end of file diff --git a/examples/ping_pong_threads/src/consumer.rs b/examples/ping_pong_threads/src/consumer.rs deleted file mode 100644 index 44777c4..0000000 --- a/examples/ping_pong_threads/src/consumer.rs +++ /dev/null @@ -1,26 +0,0 @@ -use spawned_concurrency::threads::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::threads::mpsc::Sender; - -use crate::messages::Message; - -pub struct Consumer {} - -impl Consumer { - pub fn spawn_new() -> ProcessInfo { - Self {}.spawn() - } -} - -impl Process for Consumer { - fn handle(&mut self, message: Message, _tx: &Sender) -> Message { - tracing::info!("Consumer received {message:?}"); - match message.clone() { - Message::Ping { from } => { - tracing::info!("Consumer sent Pong"); - concurrency::send(&from, Message::Pong); - } - Message::Pong => (), - }; - message - } -} diff --git a/examples/ping_pong_threads/src/main.rs b/examples/ping_pong_threads/src/main.rs deleted file mode 100644 index 73fc4d6..0000000 --- a/examples/ping_pong_threads/src/main.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! Simple example to test concurrency/Process abstraction -//! -//! Based on an Erlang example: -//! -module(ping). -//! -//! -export([ping/1, pong/0, spawn_consumer/0, spawn_producer/1, start/0]). -//! -//! ping(Pid) -> -//! Pid ! {ping, self()}, -//! receive -//! pong -> -//! io:format("Received pong!!!~n"), -//! ping(Pid) -//! end. -//! -//! pong() -> -//! receive -//! {ping, Pid} -> -//! io:format("Received ping!!~n"), -//! Pid ! pong, -//! pong(); -//! die -> -//! ok -//! end. -//! -//! spawn_consumer() -> -//! spawn(ping, pong, []). -//! -//! spawn_producer(Pid) -> -//! spawn(ping, ping, [Pid]). -//! -//! start() -> -//! Pid = spawn_consumer(), -//! spawn_producer(Pid). - -mod consumer; -mod messages; -mod producer; - -use std::{thread, time::Duration}; - -use consumer::Consumer; -use producer::Producer; -use spawned_rt::threads as rt; - -fn main() { - rt::run(|| { - let consumer = Consumer::spawn_new(); - - Producer::spawn_new(consumer.sender()); - - // giving it some time before ending - thread::sleep(Duration::from_millis(1)); - }) -} diff --git a/examples/ping_pong_threads/src/messages.rs b/examples/ping_pong_threads/src/messages.rs deleted file mode 100644 index e8a07ef..0000000 --- a/examples/ping_pong_threads/src/messages.rs +++ /dev/null @@ -1,7 +0,0 @@ -use spawned_rt::threads::mpsc::Sender; - -#[derive(Debug, Clone)] -pub enum Message { - Ping { from: Sender }, - Pong, -} diff --git a/examples/ping_pong_threads/src/producer.rs b/examples/ping_pong_threads/src/producer.rs deleted file mode 100644 index 01dd564..0000000 --- a/examples/ping_pong_threads/src/producer.rs +++ /dev/null @@ -1,32 +0,0 @@ -use spawned_concurrency::threads::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::threads::mpsc::Sender; - -use crate::messages::Message; - -pub struct Producer { - consumer: Sender, -} - -impl Producer { - pub fn spawn_new(consumer: Sender) -> ProcessInfo { - Self { consumer }.spawn() - } - - fn send_ping(&self, tx: &Sender, consumer: &Sender) { - let message = Message::Ping { from: tx.clone() }; - tracing::info!("Producer sent Ping"); - concurrency::send(consumer, message); - } -} - -impl Process for Producer { - fn init(&mut self, tx: &Sender) { - self.send_ping(tx, &self.consumer); - } - - fn handle(&mut self, message: Message, tx: &Sender) -> Message { - tracing::info!("Producer received {message:?}"); - self.send_ping(tx, &self.consumer); - message - } -} diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index a0db2cb..4cb3028 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -9,7 +9,7 @@ mod server; use std::{thread, time::Duration}; use server::UpdaterServer; -use spawned_concurrency::tasks::GenServer as _; +use spawned_concurrency::{Backend, GenServer as _}; use spawned_rt::tasks as rt; fn main() { @@ -19,7 +19,7 @@ fn main() { "https://httpbin.org/ip".to_string(), Duration::from_millis(1000), ) - .start(); + .start(Backend::Async); // giving it some time before ending thread::sleep(Duration::from_secs(10)); diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index f40d59d..ceaa5c0 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -1,11 +1,7 @@ use std::time::Duration; use spawned_concurrency::{ - messages::Unused, - tasks::{ - send_interval, CastResponse, GenServer, GenServerHandle, - InitResult::{self, Success}, - }, + messages::Unused, send_interval, CastResponse, GenServer, GenServerHandle, InitResult, Success, }; use spawned_rt::tasks::CancellationToken; diff --git a/examples/updater_threads/Cargo.toml b/examples/updater_threads/Cargo.toml deleted file mode 100644 index 7266750..0000000 --- a/examples/updater_threads/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "updater_threads" -version = "0.1.0" -edition = "2021" - -[dependencies] -spawned-rt = { workspace = true } -spawned-concurrency = { workspace = true } -tracing = { workspace = true } -reqwest = { version = "0.11", features = ["blocking"] } -futures = "0.3.1" - -[[bin]] -name = "updater_threads" -path = "src/main.rs" \ No newline at end of file diff --git a/examples/updater_threads/src/main.rs b/examples/updater_threads/src/main.rs deleted file mode 100644 index aad6dba..0000000 --- a/examples/updater_threads/src/main.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! Example to test a recurrent gen_server. -//! -//! Just activates periodically and performs an http request -//! - -mod messages; -mod server; - -use std::{thread, time::Duration}; - -use server::UpdaterServer; -use spawned_concurrency::threads::GenServer as _; -use spawned_rt::threads as rt; - -fn main() { - rt::run(|| { - UpdaterServer { - url: "https://httpbin.org/ip".to_string(), - periodicity: Duration::from_millis(1000), - } - .start(); - - // giving it some time before ending - thread::sleep(Duration::from_secs(10)); - }) -} diff --git a/examples/updater_threads/src/messages.rs b/examples/updater_threads/src/messages.rs deleted file mode 100644 index daa0589..0000000 --- a/examples/updater_threads/src/messages.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[derive(Debug, Clone)] -pub enum UpdaterInMessage { - Check, -} - -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq)] -pub enum UpdaterOutMessage { - Ok, - Error, -} diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs deleted file mode 100644 index 23eafc1..0000000 --- a/examples/updater_threads/src/server.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::time::Duration; - -use spawned_concurrency::{ - messages::Unused, - threads::{send_after, CastResponse, GenServer, GenServerHandle}, -}; -use spawned_rt::threads::block_on; - -use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMessage}; - -type UpdateServerHandle = GenServerHandle; - -#[derive(Clone)] -pub struct UpdaterServer { - pub url: String, - pub periodicity: Duration, -} - -impl GenServer for UpdaterServer { - type CallMsg = Unused; - type CastMsg = InMessage; - type OutMsg = OutMessage; - type Error = std::fmt::Error; - - // Initializing GenServer to start periodic checks. - fn init(self, handle: &GenServerHandle) -> Result { - send_after(self.periodicity, handle.clone(), InMessage::Check); - Ok(self) - } - - fn handle_cast(&mut self, message: Self::CastMsg, handle: &UpdateServerHandle) -> CastResponse { - match message { - Self::CastMsg::Check => { - send_after(self.periodicity, handle.clone(), InMessage::Check); - let url = self.url.clone(); - tracing::info!("Fetching: {url}"); - let resp = block_on(req(url)); - - tracing::info!("Response: {resp:?}"); - - CastResponse::NoReply - } - } - } -} - -async fn req(url: String) -> Result { - reqwest::get(url).await?.text().await -}