diff --git a/Cargo.lock b/Cargo.lock index fe5edec..25080b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -732,15 +732,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "name_server_with_error" -version = "0.1.0" -dependencies = [ - "spawned-concurrency", - "spawned-rt", - "tracing", -] - [[package]] name = "native-tls" version = "0.2.14" diff --git a/Cargo.toml b/Cargo.toml index 1f22153..11f8201 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ members = [ "examples/bank", "examples/bank_threads", "examples/name_server", - "examples/name_server_with_error", "examples/ping_pong", "examples/ping_pong_threads", "examples/updater", diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 5c31170..c5275c7 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -112,13 +112,13 @@ pub enum GenServerInMsg { } pub enum CallResponse { - Reply(G, G::OutMsg), + Reply(G::OutMsg), Unused, Stop(G::OutMsg), } -pub enum CastResponse { - NoReply(G), +pub enum CastResponse { + NoReply, Unused, Stop, } @@ -128,7 +128,7 @@ pub enum InitResult { NoSuccess(G), } -pub trait GenServer: Send + Sized + Clone { +pub trait GenServer: Send + Sized { type CallMsg: Clone + Send + Sized + Sync; type CastMsg: Clone + Send + Sized + Sync; type OutMsg: Send + Sized; @@ -154,7 +154,7 @@ pub trait GenServer: Send + Sized + Clone { ) -> impl Future> + Send { async { let res = match self.init(handle).await { - Ok(Success(new_state)) => new_state.main_loop(handle, rx).await, + Ok(Success(new_state)) => Ok(new_state.main_loop(handle, rx).await), Ok(NoSuccess(intermediate_state)) => { // new_state is NoSuccess, this means the initialization failed, but the error was handled // in callback. No need to report the error. @@ -191,53 +191,44 @@ pub trait GenServer: Send + Sized + Clone { mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - ) -> impl Future> + Send { + ) -> impl Future + Send { async { loop { - let (new_state, cont) = self.receive(handle, rx).await?; - self = new_state; - if !cont { + if !self.receive(handle, rx).await { break; } } tracing::trace!("Stopping GenServer"); - Ok(self) + self } } fn receive( - self, + &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - ) -> impl Future> + Send { + ) -> impl Future + Send { async move { let message = rx.recv().await; - // Save current state in case of a rollback - let state_clone = self.clone(); - - let (keep_running, new_state) = match message { + let keep_running = match message { Some(GenServerInMsg::Call { sender, message }) => { - let (keep_running, new_state, response) = + let (keep_running, response) = match AssertUnwindSafe(self.handle_call(message, handle)) .catch_unwind() .await { Ok(response) => match response { - CallResponse::Reply(new_state, response) => { - (true, new_state, Ok(response)) - } - CallResponse::Stop(response) => (false, state_clone, Ok(response)), + CallResponse::Reply(response) => (true, Ok(response)), + CallResponse::Stop(response) => (false, Ok(response)), CallResponse::Unused => { tracing::error!("GenServer received unexpected CallMessage"); - (false, state_clone, Err(GenServerError::CallMsgUnused)) + (false, Err(GenServerError::CallMsgUnused)) } }, Err(error) => { - tracing::error!( - "Error in callback, reverting state - Error: '{error:?}'" - ); - (true, state_clone, Err(GenServerError::Callback)) + tracing::error!("Error in callback: '{error:?}'"); + (false, Err(GenServerError::Callback)) } }; // Send response back @@ -246,7 +237,7 @@ pub trait GenServer: Send + Sized + Clone { "GenServer failed to send response back, client must have died" ) }; - (keep_running, new_state) + keep_running } Some(GenServerInMsg::Cast { message }) => { match AssertUnwindSafe(self.handle_cast(message, handle)) @@ -254,32 +245,30 @@ pub trait GenServer: Send + Sized + Clone { .await { Ok(response) => match response { - CastResponse::NoReply(new_state) => (true, new_state), - CastResponse::Stop => (false, state_clone), + CastResponse::NoReply => true, + CastResponse::Stop => false, CastResponse::Unused => { tracing::error!("GenServer received unexpected CastMessage"); - (false, state_clone) + false } }, Err(error) => { - tracing::trace!( - "Error in callback, reverting state - Error: '{error:?}'" - ); - (true, state_clone) + tracing::trace!("Error in callback: '{error:?}'"); + false } } } None => { // Channel has been closed; won't receive further messages. Stop the server. - (false, self) + false } }; - Ok((new_state, keep_running)) + keep_running } } fn handle_call( - self, + &mut self, _message: Self::CallMsg, _handle: &GenServerHandle, ) -> impl Future> + Send { @@ -287,10 +276,10 @@ pub trait GenServer: Send + Sized + Clone { } fn handle_cast( - self, + &mut self, _message: Self::CastMsg, _handle: &GenServerHandle, - ) -> impl Future> + Send { + ) -> impl Future + Send { async { CastResponse::Unused } } @@ -316,7 +305,6 @@ mod tests { time::Duration, }; - #[derive(Clone)] struct BadlyBehavedTask; #[derive(Clone)] @@ -336,7 +324,7 @@ mod tests { type Error = Unused; async fn handle_call( - self, + &mut self, _: Self::CallMsg, _: &GenServerHandle, ) -> CallResponse { @@ -344,17 +332,16 @@ mod tests { } async fn handle_cast( - self, + &mut self, _: Self::CastMsg, _: &GenServerHandle, - ) -> CastResponse { + ) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; thread::sleep(Duration::from_secs(2)); CastResponse::Stop } } - #[derive(Clone)] struct WellBehavedTask { pub count: u64, } @@ -366,28 +353,25 @@ mod tests { type Error = Unused; async fn handle_call( - self, + &mut self, message: Self::CallMsg, _: &GenServerHandle, ) -> CallResponse { match message { - InMessage::GetCount => { - let count = self.count; - CallResponse::Reply(self, OutMsg::Count(count)) - } + InMessage::GetCount => CallResponse::Reply(OutMsg::Count(self.count)), InMessage::Stop => CallResponse::Stop(OutMsg::Count(self.count)), } } async fn handle_cast( - mut self, + &mut self, _: Self::CastMsg, handle: &GenServerHandle, - ) -> CastResponse { + ) -> CastResponse { self.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), Unused); - CastResponse::NoReply(self) + CastResponse::NoReply } } @@ -433,7 +417,7 @@ mod tests { const TIMEOUT_DURATION: Duration = Duration::from_millis(100); - #[derive(Debug, Default, Clone)] + #[derive(Debug, Default)] struct SomeTask; #[derive(Clone)] @@ -449,7 +433,7 @@ mod tests { type Error = Unused; async fn handle_call( - self, + &mut self, message: Self::CallMsg, _handle: &GenServerHandle, ) -> CallResponse { @@ -457,12 +441,12 @@ mod tests { SomeTaskCallMsg::SlowOperation => { // Simulate a slow operation that will not resolve in time rt::sleep(TIMEOUT_DURATION * 2).await; - CallResponse::Reply(self, Unused) + CallResponse::Reply(Unused) } SomeTaskCallMsg::FastOperation => { // Simulate a fast operation that resolves in time rt::sleep(TIMEOUT_DURATION / 2).await; - CallResponse::Reply(self, Unused) + CallResponse::Reply(Unused) } } } @@ -486,7 +470,6 @@ mod tests { }); } - #[derive(Clone)] struct SomeTaskThatFailsOnInit { sender_channel: Arc>>, } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 09f8bf9..ecb2f36 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -8,7 +8,6 @@ use crate::tasks::{ type SummatoryHandle = GenServerHandle; -#[derive(Clone)] struct Summatory { count: u16, } @@ -40,26 +39,26 @@ impl GenServer for Summatory { type Error = (); async fn handle_cast( - mut self, + &mut self, message: Self::CastMsg, _handle: &GenServerHandle, - ) -> CastResponse { + ) -> CastResponse { match message { SummatoryCastMessage::Add(val) => { self.count += val; - CastResponse::NoReply(self) + CastResponse::NoReply } SummatoryCastMessage::Stop => CastResponse::Stop, } } async fn handle_call( - self, + &mut self, _message: Self::CallMsg, _handle: &SummatoryHandle, ) -> CallResponse { let current_value = self.count; - CallResponse::Reply(self, current_value) + CallResponse::Reply(current_value) } } diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 9eef493..9697513 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -1,12 +1,10 @@ -use crate::tasks::{ - gen_server::InitResult, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle, +use super::{ + send_after, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle, InitResult, InitResult::Success, }; use spawned_rt::tasks::{self as rt, CancellationToken}; use std::time::Duration; -use super::send_after; - type RepeaterHandle = GenServerHandle; #[derive(Clone)] @@ -25,7 +23,6 @@ enum RepeaterOutMessage { Count(i32), } -#[derive(Clone)] struct Repeater { pub(crate) count: i32, pub(crate) cancellation_token: Option, @@ -73,19 +70,19 @@ impl GenServer for Repeater { } async fn handle_call( - self, + &mut self, _message: Self::CallMsg, _handle: &RepeaterHandle, ) -> CallResponse { let count = self.count; - CallResponse::Reply(self, RepeaterOutMessage::Count(count)) + CallResponse::Reply(RepeaterOutMessage::Count(count)) } async fn handle_cast( - mut self, + &mut self, message: Self::CastMsg, _handle: &GenServerHandle, - ) -> CastResponse { + ) -> CastResponse { match message { RepeaterCastMessage::Inc => { self.count += 1; @@ -96,7 +93,7 @@ impl GenServer for Repeater { }; } }; - CastResponse::NoReply(self) + CastResponse::NoReply } } @@ -148,7 +145,6 @@ enum DelayedOutMessage { Count(i32), } -#[derive(Clone)] struct Delayed { pub(crate) count: i32, } @@ -181,30 +177,30 @@ impl GenServer for Delayed { type Error = (); async fn handle_call( - self, + &mut self, message: Self::CallMsg, _handle: &DelayedHandle, ) -> CallResponse { match message { DelayedCallMessage::GetCount => { let count = self.count; - CallResponse::Reply(self, DelayedOutMessage::Count(count)) + CallResponse::Reply(DelayedOutMessage::Count(count)) } DelayedCallMessage::Stop => CallResponse::Stop(DelayedOutMessage::Count(self.count)), } } async fn handle_cast( - mut self, + &mut self, message: Self::CastMsg, _handle: &DelayedHandle, - ) -> CastResponse { + ) -> CastResponse { match message { DelayedCastMessage::Inc => { self.count += 1; } }; - CastResponse::NoReply(self) + CastResponse::NoReply } } diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index b290a07..2d6587a 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -13,7 +13,6 @@ use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as O type MsgResult = Result; type BankHandle = GenServerHandle; -#[derive(Clone)] pub struct Bank { accounts: HashMap, } @@ -72,56 +71,45 @@ impl GenServer for Bank { } async fn handle_call( - mut self, + &mut self, message: Self::CallMsg, _handle: &BankHandle, ) -> CallResponse { match message.clone() { Self::CallMsg::New { who } => match self.accounts.get(&who) { - Some(_amount) => { - CallResponse::Reply(self, Err(BankError::AlreadyACustomer { who })) - } + Some(_amount) => CallResponse::Reply(Err(BankError::AlreadyACustomer { who })), None => { self.accounts.insert(who.clone(), 0); - CallResponse::Reply(self, Ok(OutMessage::Welcome { who })) + CallResponse::Reply(Ok(OutMessage::Welcome { who })) } }, Self::CallMsg::Add { who, amount } => match self.accounts.get(&who) { Some(current) => { let new_amount = current + amount; self.accounts.insert(who.clone(), new_amount); - CallResponse::Reply( - self, - Ok(OutMessage::Balance { - who, - amount: new_amount, - }), - ) + CallResponse::Reply(Ok(OutMessage::Balance { + who, + amount: new_amount, + })) } - None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(Err(BankError::NotACustomer { who })), }, Self::CallMsg::Remove { who, amount } => match self.accounts.get(&who) { Some(¤t) => match current < amount { - true => CallResponse::Reply( - self, - Err(BankError::InsufficientBalance { - who, - amount: current, - }), - ), + true => CallResponse::Reply(Err(BankError::InsufficientBalance { + who, + amount: current, + })), false => { let new_amount = current - amount; self.accounts.insert(who.clone(), new_amount); - CallResponse::Reply( - self, - Ok(OutMessage::WidrawOk { - who, - amount: new_amount, - }), - ) + CallResponse::Reply(Ok(OutMessage::WidrawOk { + who, + amount: new_amount, + })) } }, - None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(Err(BankError::NotACustomer { who })), }, Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), } diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 98e670b..ca954a7 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -7,7 +7,6 @@ use spawned_concurrency::tasks::{ }; // We test a scenario with a badly behaved task -#[derive(Clone)] struct BadlyBehavedTask; impl BadlyBehavedTask { @@ -33,11 +32,15 @@ impl GenServer for BadlyBehavedTask { type OutMsg = (); type Error = (); - async fn handle_call(self, _: Self::CallMsg, _: &GenServerHandle) -> CallResponse { + async fn handle_call( + &mut self, + _: Self::CallMsg, + _: &GenServerHandle, + ) -> CallResponse { CallResponse::Stop(()) } - async fn handle_cast(self, _: Self::CastMsg, _: &GenServerHandle) -> CastResponse { + async fn handle_cast(&mut self, _: Self::CastMsg, _: &GenServerHandle) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; loop { println!("{:?}: bad still alive", thread::current().id()); @@ -46,7 +49,6 @@ impl GenServer for BadlyBehavedTask { } } -#[derive(Clone)] struct WellBehavedTask { count: u64, } @@ -66,28 +68,28 @@ impl GenServer for WellBehavedTask { type Error = (); async fn handle_call( - self, + &mut self, message: Self::CallMsg, _: &GenServerHandle, ) -> CallResponse { match message { InMessage::GetCount => { let count = self.count; - CallResponse::Reply(self, OutMsg::Count(count)) + CallResponse::Reply(OutMsg::Count(count)) } InMessage::Stop => CallResponse::Stop(OutMsg::Count(self.count)), } } async fn handle_cast( - mut self, + &mut self, _: Self::CastMsg, handle: &GenServerHandle, - ) -> CastResponse { + ) -> CastResponse { self.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), ()); - CastResponse::NoReply(self) + CastResponse::NoReply } } diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index 54531f8..90d017e 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -9,7 +9,6 @@ use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as type NameServerHandle = GenServerHandle; -#[derive(Clone)] pub struct NameServer { inner: HashMap, } @@ -45,21 +44,21 @@ impl GenServer for NameServer { type Error = std::fmt::Error; async fn handle_call( - mut self, + &mut self, message: Self::CallMsg, _handle: &NameServerHandle, ) -> CallResponse { match message.clone() { Self::CallMsg::Add { key, value } => { self.inner.insert(key, value); - CallResponse::Reply(self, Self::OutMsg::Ok) + CallResponse::Reply(Self::OutMsg::Ok) } Self::CallMsg::Find { key } => match self.inner.get(&key) { Some(result) => { let value = result.to_string(); - CallResponse::Reply(self, Self::OutMsg::Found { value }) + CallResponse::Reply(Self::OutMsg::Found { value }) } - None => CallResponse::Reply(self, Self::OutMsg::NotFound), + None => CallResponse::Reply(Self::OutMsg::NotFound), }, } } diff --git a/examples/name_server_with_error/Cargo.toml b/examples/name_server_with_error/Cargo.toml deleted file mode 100644 index 4b2c8a9..0000000 --- a/examples/name_server_with_error/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "name_server_with_error" -version = "0.1.0" -edition = "2021" - -[dependencies] -spawned-rt = { workspace = true } -spawned-concurrency = { workspace = true } -tracing = { workspace = true } - -[[bin]] -name = "name_server_with_error" -path = "src/main.rs" \ No newline at end of file diff --git a/examples/name_server_with_error/src/main.rs b/examples/name_server_with_error/src/main.rs deleted file mode 100644 index eb5ab4c..0000000 --- a/examples/name_server_with_error/src/main.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Simple example to test concurrency/Process abstraction. -//! -//! Based on Joe's Armstrong book: Programming Erlang, Second edition -//! Section 22.1 - The Road to the Generic Server -//! -//! Erlang usage example: -//! 1> server1:start(name_server, name_server). -//! true -//! 2> name_server:add(joe, "at home"). -//! ok -//! 3> name_server:find(joe). -//! {ok,"at home"} - -mod messages; -mod server; - -use std::collections::HashMap; - -use messages::NameServerOutMessage; -use server::NameServer; -use spawned_concurrency::tasks::GenServer as _; -use spawned_rt::tasks as rt; - -fn main() { - rt::run(async { - let mut name_server = NameServer { - inner: HashMap::new(), - } - .start(); - - let result = - NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; - tracing::info!("Storing value result: {result:?}"); - assert_eq!(result, NameServerOutMessage::Ok); - - let result = NameServer::find(&mut name_server, "Joe".to_string()).await; - tracing::info!("Retrieving value result: {result:?}"); - assert_eq!( - result, - NameServerOutMessage::Found { - value: "At Home".to_string() - } - ); - - let result = NameServer::find(&mut name_server, "Bob".to_string()).await; - tracing::info!("Retrieving value result: {result:?}"); - assert_eq!(result, NameServerOutMessage::NotFound); - - let result = NameServer::add( - &mut name_server, - "error".to_string(), - "Should not be added".to_string(), - ) - .await; - tracing::info!("Storing value result: {result:?}"); - assert_eq!(result, NameServerOutMessage::CallbackError); - - let result = NameServer::find(&mut name_server, "error".to_string()).await; - tracing::info!("Retrieving value result: {result:?}"); - assert_eq!( - result, - NameServerOutMessage::NotFound, - "Value must not be present as there was an error inserting it." - ); - }) -} diff --git a/examples/name_server_with_error/src/messages.rs b/examples/name_server_with_error/src/messages.rs deleted file mode 100644 index 268f926..0000000 --- a/examples/name_server_with_error/src/messages.rs +++ /dev/null @@ -1,15 +0,0 @@ -#[derive(Debug, Clone)] -pub enum NameServerInMessage { - Add { key: String, value: String }, - Find { key: String }, -} - -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq)] -pub enum NameServerOutMessage { - Ok, - Found { value: String }, - NotFound, - ServerError, - CallbackError, -} diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs deleted file mode 100644 index 47b2552..0000000 --- a/examples/name_server_with_error/src/server.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::collections::HashMap; - -use spawned_concurrency::{ - error::GenServerError, - messages::Unused, - tasks::{CallResponse, GenServer, GenServerHandle}, -}; - -use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; - -type NameServerHandle = GenServerHandle; - -#[derive(Clone)] -pub struct NameServer { - pub inner: HashMap, -} - -impl NameServer { - pub async fn add(server: &mut NameServerHandle, key: String, value: String) -> OutMessage { - match server.call(InMessage::Add { key, value }).await { - Ok(_) => OutMessage::Ok, - Err(GenServerError::Callback) => OutMessage::CallbackError, - Err(_) => OutMessage::ServerError, - } - } - - pub async fn find(server: &mut NameServerHandle, key: String) -> OutMessage { - server - .call(InMessage::Find { key }) - .await - .unwrap_or(OutMessage::ServerError) - } -} - -impl GenServer for NameServer { - type CallMsg = InMessage; - type CastMsg = Unused; - type OutMsg = OutMessage; - type Error = std::fmt::Error; - - async fn handle_call( - mut self, - message: Self::CallMsg, - _handle: &NameServerHandle, - ) -> CallResponse { - match message.clone() { - Self::CallMsg::Add { key, value } => { - self.inner.insert(key.clone(), value); - if key == "error" { - panic!("error!") - } else { - CallResponse::Reply(self, Self::OutMsg::Ok) - } - } - Self::CallMsg::Find { key } => match self.inner.get(&key) { - Some(result) => { - let value = result.to_string(); - CallResponse::Reply(self, Self::OutMsg::Found { value }) - } - None => CallResponse::Reply(self, Self::OutMsg::NotFound), - }, - } - } -} diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index 31a9fdb..f40d59d 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -13,7 +13,6 @@ use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMes type UpdateServerHandle = GenServerHandle; -#[derive(Clone)] pub struct UpdaterServer { pub url: String, pub periodicity: Duration, @@ -47,17 +46,17 @@ impl GenServer for UpdaterServer { } async fn handle_cast( - self, + &mut self, message: Self::CastMsg, _handle: &UpdateServerHandle, - ) -> CastResponse { + ) -> CastResponse { match message { Self::CastMsg::Check => { let url = self.url.clone(); tracing::info!("Fetching: {url}"); let resp = req(url).await; tracing::info!("Response: {resp:?}"); - CastResponse::NoReply(self) + CastResponse::NoReply } } }