From 92f48636dff659cbe341cc1735608e312378f4b3 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 18 Jul 2025 15:00:16 -0300 Subject: [PATCH 1/3] add call w/ timeout fn --- concurrency/src/error.rs | 2 + concurrency/src/tasks/gen_server.rs | 58 ++++++++++++++++++++++++++++- rt/src/tasks/mod.rs | 1 + rt/src/tasks/tokio/mod.rs | 2 +- 4 files changed, 60 insertions(+), 3 deletions(-) diff --git a/concurrency/src/error.rs b/concurrency/src/error.rs index 9faa0c4..c1a37db 100644 --- a/concurrency/src/error.rs +++ b/concurrency/src/error.rs @@ -10,6 +10,8 @@ pub enum GenServerError { CallMsgUnused, #[error("Unsupported Cast Messages on this GenServer")] CastMsgUnused, + #[error("Call to GenServer timed out")] + CallTimeout, } impl From> for GenServerError { diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index f04bb79..cc4c1f5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -1,8 +1,8 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. use futures::future::FutureExt as _; -use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken}; -use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe}; +use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; +use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; use crate::error::GenServerError; @@ -85,6 +85,24 @@ impl GenServerHandle { } } + pub async fn call_with_timeout( + &mut self, + message: G::CallMsg, + duration: Duration, + ) -> Result { + let (oneshot_tx, oneshot_rx) = oneshot::channel::>(); + self.tx.send(GenServerInMsg::Call { + sender: oneshot_tx, + message, + })?; + + match timeout(duration, oneshot_rx).await { + Ok(Ok(result)) => result, + Ok(Err(_)) => Err(GenServerError::Server), + Err(_) => Err(GenServerError::CallTimeout), + } + } + pub async fn cast(&mut self, message: G::CastMsg) -> Result<(), GenServerError> { self.tx .send(GenServerInMsg::Cast { message }) @@ -434,4 +452,40 @@ mod tests { goodboy.call(InMessage::Stop).await.unwrap(); }); } + + const TIMEOUT_DURATION: Duration = Duration::from_millis(100); + + #[derive(Default)] + struct UnresolvingTask; + + impl GenServer for UnresolvingTask { + type CallMsg = (); + type CastMsg = (); + type OutMsg = (); + type State = (); + type Error = (); + + async fn handle_call( + &mut self, + _message: Self::CallMsg, + _handle: &GenServerHandle, + _state: Self::State, + ) -> CallResponse { + // Simulate a task that we know won't resolve in time + rt::sleep(TIMEOUT_DURATION * 2).await; + CallResponse::Reply((), ()) + } + } + + #[test] + pub fn unresolving_task_times_out() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut unresolving_task = UnresolvingTask::start(()); + let result = unresolving_task + .call_with_timeout((), TIMEOUT_DURATION) + .await; + assert!(matches!(result, Err(GenServerError::CallTimeout))); + }); + } } diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 10de5fd..5291f69 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -16,6 +16,7 @@ use crate::tracing::init_tracing; pub use crate::tasks::tokio::mpsc; pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; +pub use crate::tasks::tokio::timeout; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream}; diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 6abf60d..eac39e0 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -5,7 +5,7 @@ pub mod oneshot; pub use tokio::{ runtime::Runtime, task::{spawn, spawn_blocking, JoinHandle}, - time::sleep, + time::{sleep, timeout}, }; pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream}; pub use tokio_util::sync::CancellationToken; From 5649740cd173afe071cf09c277ec684a8b4841ee Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 18 Jul 2025 15:25:15 -0300 Subject: [PATCH 2/3] improve test --- concurrency/src/tasks/gen_server.rs | 39 ++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index cc4c1f5..74ba7e5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -456,10 +456,16 @@ mod tests { const TIMEOUT_DURATION: Duration = Duration::from_millis(100); #[derive(Default)] - struct UnresolvingTask; + struct SomeTask; - impl GenServer for UnresolvingTask { - type CallMsg = (); + #[derive(Clone)] + enum SomeTaskCallMsg { + SlowOperation, + FastOperation, + } + + impl GenServer for SomeTask { + type CallMsg = SomeTaskCallMsg; type CastMsg = (); type OutMsg = (); type State = (); @@ -467,13 +473,22 @@ mod tests { async fn handle_call( &mut self, - _message: Self::CallMsg, + message: Self::CallMsg, _handle: &GenServerHandle, _state: Self::State, ) -> CallResponse { - // Simulate a task that we know won't resolve in time - rt::sleep(TIMEOUT_DURATION * 2).await; - CallResponse::Reply((), ()) + match message { + SomeTaskCallMsg::SlowOperation => { + // Simulate a slow operation that will not resolve in time + rt::sleep(TIMEOUT_DURATION * 2).await; + CallResponse::Reply((), ()) + } + SomeTaskCallMsg::FastOperation => { + // Simulate a fast operation that resolves in time + rt::sleep(TIMEOUT_DURATION / 2).await; + CallResponse::Reply((), ()) + } + } } } @@ -481,9 +496,15 @@ mod tests { pub fn unresolving_task_times_out() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut unresolving_task = UnresolvingTask::start(()); + let mut unresolving_task = SomeTask::start(()); + + let result = unresolving_task + .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) + .await; + assert!(matches!(result, Ok(()))); + let result = unresolving_task - .call_with_timeout((), TIMEOUT_DURATION) + .call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION) .await; assert!(matches!(result, Err(GenServerError::CallTimeout))); }); From c3ce6291447801087a1544d43025386b6e9b9c7a Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 18 Jul 2025 16:23:05 -0300 Subject: [PATCH 3/3] always call with timeout --- concurrency/src/tasks/gen_server.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 74ba7e5..fd03f00 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -6,6 +6,8 @@ use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; use crate::error::GenServerError; +const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); + #[derive(Debug)] pub struct GenServerHandle { pub tx: mpsc::Sender>, @@ -74,15 +76,7 @@ impl GenServerHandle { } pub async fn call(&mut self, message: G::CallMsg) -> Result { - let (oneshot_tx, oneshot_rx) = oneshot::channel::>(); - self.tx.send(GenServerInMsg::Call { - sender: oneshot_tx, - message, - })?; - match oneshot_rx.await { - Ok(result) => result, - Err(_) => Err(GenServerError::Server), - } + self.call_with_timeout(message, DEFAULT_CALL_TIMEOUT).await } pub async fn call_with_timeout(