Skip to content

Commit eb54b1f

Browse files
authored
Add method to call with timeout (#31)
* add call w/ timeout fn * improve test * always call with timeout
1 parent 5fab043 commit eb54b1f

File tree

4 files changed

+78
-6
lines changed

4 files changed

+78
-6
lines changed

concurrency/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub enum GenServerError {
1010
CallMsgUnused,
1111
#[error("Unsupported Cast Messages on this GenServer")]
1212
CastMsgUnused,
13+
#[error("Call to GenServer timed out")]
14+
CallTimeout,
1315
}
1416

1517
impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for GenServerError {

concurrency/src/tasks/gen_server.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
22
//! See examples/name_server for a usage example.
33
use futures::future::FutureExt as _;
4-
use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken};
5-
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe};
4+
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
5+
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
66

77
use crate::error::GenServerError;
88

9+
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
10+
911
#[derive(Debug)]
1012
pub struct GenServerHandle<G: GenServer + 'static> {
1113
pub tx: mpsc::Sender<GenServerInMsg<G>>,
@@ -74,14 +76,24 @@ impl<G: GenServer> GenServerHandle<G> {
7476
}
7577

7678
pub async fn call(&mut self, message: G::CallMsg) -> Result<G::OutMsg, GenServerError> {
79+
self.call_with_timeout(message, DEFAULT_CALL_TIMEOUT).await
80+
}
81+
82+
pub async fn call_with_timeout(
83+
&mut self,
84+
message: G::CallMsg,
85+
duration: Duration,
86+
) -> Result<G::OutMsg, GenServerError> {
7787
let (oneshot_tx, oneshot_rx) = oneshot::channel::<Result<G::OutMsg, GenServerError>>();
7888
self.tx.send(GenServerInMsg::Call {
7989
sender: oneshot_tx,
8090
message,
8191
})?;
82-
match oneshot_rx.await {
83-
Ok(result) => result,
84-
Err(_) => Err(GenServerError::Server),
92+
93+
match timeout(duration, oneshot_rx).await {
94+
Ok(Ok(result)) => result,
95+
Ok(Err(_)) => Err(GenServerError::Server),
96+
Err(_) => Err(GenServerError::CallTimeout),
8597
}
8698
}
8799

@@ -434,4 +446,61 @@ mod tests {
434446
goodboy.call(InMessage::Stop).await.unwrap();
435447
});
436448
}
449+
450+
const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
451+
452+
#[derive(Default)]
453+
struct SomeTask;
454+
455+
#[derive(Clone)]
456+
enum SomeTaskCallMsg {
457+
SlowOperation,
458+
FastOperation,
459+
}
460+
461+
impl GenServer for SomeTask {
462+
type CallMsg = SomeTaskCallMsg;
463+
type CastMsg = ();
464+
type OutMsg = ();
465+
type State = ();
466+
type Error = ();
467+
468+
async fn handle_call(
469+
&mut self,
470+
message: Self::CallMsg,
471+
_handle: &GenServerHandle<Self>,
472+
_state: Self::State,
473+
) -> CallResponse<Self> {
474+
match message {
475+
SomeTaskCallMsg::SlowOperation => {
476+
// Simulate a slow operation that will not resolve in time
477+
rt::sleep(TIMEOUT_DURATION * 2).await;
478+
CallResponse::Reply((), ())
479+
}
480+
SomeTaskCallMsg::FastOperation => {
481+
// Simulate a fast operation that resolves in time
482+
rt::sleep(TIMEOUT_DURATION / 2).await;
483+
CallResponse::Reply((), ())
484+
}
485+
}
486+
}
487+
}
488+
489+
#[test]
490+
pub fn unresolving_task_times_out() {
491+
let runtime = rt::Runtime::new().unwrap();
492+
runtime.block_on(async move {
493+
let mut unresolving_task = SomeTask::start(());
494+
495+
let result = unresolving_task
496+
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
497+
.await;
498+
assert!(matches!(result, Ok(())));
499+
500+
let result = unresolving_task
501+
.call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION)
502+
.await;
503+
assert!(matches!(result, Err(GenServerError::CallTimeout)));
504+
});
505+
}
437506
}

rt/src/tasks/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::tracing::init_tracing;
1616
pub use crate::tasks::tokio::mpsc;
1717
pub use crate::tasks::tokio::oneshot;
1818
pub use crate::tasks::tokio::sleep;
19+
pub use crate::tasks::tokio::timeout;
1920
pub use crate::tasks::tokio::CancellationToken;
2021
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
2122
pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream};

rt/src/tasks/tokio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub mod oneshot;
55
pub use tokio::{
66
runtime::Runtime,
77
task::{spawn, spawn_blocking, JoinHandle},
8-
time::sleep,
8+
time::{sleep, timeout},
99
};
1010
pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream};
1111
pub use tokio_util::sync::CancellationToken;

0 commit comments

Comments
 (0)