Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions concurrency/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum GenServerError {
CallMsgUnused,
#[error("Unsupported Cast Messages on this GenServer")]
CastMsgUnused,
#[error("Call to GenServer timed out")]
CallTimeout,
}

impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for GenServerError {
Expand Down
79 changes: 74 additions & 5 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe};
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

use crate::error::GenServerError;

const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
pub tx: mpsc::Sender<GenServerInMsg<G>>,
Expand Down Expand Up @@ -74,14 +76,24 @@ impl<G: GenServer> GenServerHandle<G> {
}

pub async fn call(&mut self, message: G::CallMsg) -> Result<G::OutMsg, GenServerError> {
self.call_with_timeout(message, DEFAULT_CALL_TIMEOUT).await
}

pub async fn call_with_timeout(
&mut self,
message: G::CallMsg,
duration: Duration,
) -> Result<G::OutMsg, GenServerError> {
let (oneshot_tx, oneshot_rx) = oneshot::channel::<Result<G::OutMsg, GenServerError>>();
self.tx.send(GenServerInMsg::Call {
sender: oneshot_tx,
message,
})?;
match oneshot_rx.await {
Ok(result) => result,
Err(_) => Err(GenServerError::Server),

match timeout(duration, oneshot_rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(GenServerError::Server),
Err(_) => Err(GenServerError::CallTimeout),
}
}

Expand Down Expand Up @@ -434,4 +446,61 @@ mod tests {
goodboy.call(InMessage::Stop).await.unwrap();
});
}

const TIMEOUT_DURATION: Duration = Duration::from_millis(100);

#[derive(Default)]
struct SomeTask;

#[derive(Clone)]
enum SomeTaskCallMsg {
SlowOperation,
FastOperation,
}

impl GenServer for SomeTask {
type CallMsg = SomeTaskCallMsg;
type CastMsg = ();
type OutMsg = ();
type State = ();
type Error = ();

async fn handle_call(
&mut self,
message: Self::CallMsg,
_handle: &GenServerHandle<Self>,
_state: Self::State,
) -> CallResponse<Self> {
match message {
SomeTaskCallMsg::SlowOperation => {
// Simulate a slow operation that will not resolve in time
rt::sleep(TIMEOUT_DURATION * 2).await;
CallResponse::Reply((), ())
}
SomeTaskCallMsg::FastOperation => {
// Simulate a fast operation that resolves in time
rt::sleep(TIMEOUT_DURATION / 2).await;
CallResponse::Reply((), ())
}
}
}
}

#[test]
pub fn unresolving_task_times_out() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut unresolving_task = SomeTask::start(());

let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Ok(())));

let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Err(GenServerError::CallTimeout)));
});
}
}
1 change: 1 addition & 0 deletions rt/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::tracing::init_tracing;
pub use crate::tasks::tokio::mpsc;
pub use crate::tasks::tokio::oneshot;
pub use crate::tasks::tokio::sleep;
pub use crate::tasks::tokio::timeout;
pub use crate::tasks::tokio::CancellationToken;
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream};
Expand Down
2 changes: 1 addition & 1 deletion rt/src/tasks/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod oneshot;
pub use tokio::{
runtime::Runtime,
task::{spawn, spawn_blocking, JoinHandle},
time::sleep,
time::{sleep, timeout},
};
pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream};
pub use tokio_util::sync::CancellationToken;