Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions concurrency/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum GenServerError {
CallMsgUnused,
#[error("Unsupported Cast Messages on this GenServer")]
CastMsgUnused,
#[error("Call to GenServer timed out")]
CallTimeout,
}

impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for GenServerError {
Expand Down
79 changes: 77 additions & 2 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe};
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

use crate::error::GenServerError;

Expand Down Expand Up @@ -85,6 +85,24 @@ impl<G: GenServer> GenServerHandle<G> {
}
}

pub async fn call_with_timeout(
&mut self,
message: G::CallMsg,
duration: Duration,
) -> Result<G::OutMsg, GenServerError> {
let (oneshot_tx, oneshot_rx) = oneshot::channel::<Result<G::OutMsg, GenServerError>>();
self.tx.send(GenServerInMsg::Call {
sender: oneshot_tx,
message,
})?;

match timeout(duration, oneshot_rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(GenServerError::Server),
Err(_) => Err(GenServerError::CallTimeout),
}
}

pub async fn cast(&mut self, message: G::CastMsg) -> Result<(), GenServerError> {
self.tx
.send(GenServerInMsg::Cast { message })
Expand Down Expand Up @@ -434,4 +452,61 @@ mod tests {
goodboy.call(InMessage::Stop).await.unwrap();
});
}

const TIMEOUT_DURATION: Duration = Duration::from_millis(100);

#[derive(Default)]
struct SomeTask;

#[derive(Clone)]
enum SomeTaskCallMsg {
SlowOperation,
FastOperation,
}

impl GenServer for SomeTask {
type CallMsg = SomeTaskCallMsg;
type CastMsg = ();
type OutMsg = ();
type State = ();
type Error = ();

async fn handle_call(
&mut self,
message: Self::CallMsg,
_handle: &GenServerHandle<Self>,
_state: Self::State,
) -> CallResponse<Self> {
match message {
SomeTaskCallMsg::SlowOperation => {
// Simulate a slow operation that will not resolve in time
rt::sleep(TIMEOUT_DURATION * 2).await;
CallResponse::Reply((), ())
}
SomeTaskCallMsg::FastOperation => {
// Simulate a fast operation that resolves in time
rt::sleep(TIMEOUT_DURATION / 2).await;
CallResponse::Reply((), ())
}
}
}
}

#[test]
pub fn unresolving_task_times_out() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut unresolving_task = SomeTask::start(());

let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Ok(())));

let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Err(GenServerError::CallTimeout)));
});
}
}
1 change: 1 addition & 0 deletions rt/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::tracing::init_tracing;
pub use crate::tasks::tokio::mpsc;
pub use crate::tasks::tokio::oneshot;
pub use crate::tasks::tokio::sleep;
pub use crate::tasks::tokio::timeout;
pub use crate::tasks::tokio::CancellationToken;
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream};
Expand Down
2 changes: 1 addition & 1 deletion rt/src/tasks/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod oneshot;
pub use tokio::{
runtime::Runtime,
task::{spawn, spawn_blocking, JoinHandle},
time::sleep,
time::{sleep, timeout},
};
pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream};
pub use tokio_util::sync::CancellationToken;