Skip to content

Commit 2535277

Browse files
committed
Added spawn_on_thread and send_message_on
1 parent 7ec21e9 commit 2535277

File tree

3 files changed

+67
-6
lines changed

3 files changed

+67
-6
lines changed

concurrency/src/tasks/gen_server.rs

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use crate::{
55
tasks::InitResult::{NoSuccess, Success},
66
};
77
use futures::future::FutureExt as _;
8-
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
8+
use spawned_rt::{
9+
tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle},
10+
threads,
11+
};
912
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
1013

1114
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
@@ -27,7 +30,7 @@ impl<G: GenServer> Clone for GenServerHandle<G> {
2730
}
2831

2932
impl<G: GenServer> GenServerHandle<G> {
30-
pub(crate) fn new(gen_server: G) -> Self {
33+
fn new(gen_server: G) -> Self {
3134
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
3235
let cancellation_token = CancellationToken::new();
3336
let handle = GenServerHandle {
@@ -51,7 +54,7 @@ impl<G: GenServer> GenServerHandle<G> {
5154
handle_clone
5255
}
5356

54-
pub(crate) fn new_blocking(gen_server: G) -> Self {
57+
fn new_blocking(gen_server: G) -> Self {
5558
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
5659
let cancellation_token = CancellationToken::new();
5760
let handle = GenServerHandle {
@@ -70,6 +73,25 @@ impl<G: GenServer> GenServerHandle<G> {
7073
handle_clone
7174
}
7275

76+
fn new_on_thread(gen_server: G) -> Self {
77+
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
78+
let cancellation_token = CancellationToken::new();
79+
let handle = GenServerHandle {
80+
tx,
81+
cancellation_token,
82+
};
83+
let handle_clone = handle.clone();
84+
// Ignore the JoinHandle for now. Maybe we'll use it in the future
85+
let _join_handle = threads::spawn(|| {
86+
threads::block_on(async move {
87+
if let Err(error) = gen_server.run(&handle, &mut rx).await {
88+
tracing::trace!(%error, "GenServer crashed")
89+
};
90+
})
91+
});
92+
handle_clone
93+
}
94+
7395
pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
7496
self.tx.clone()
7597
}
@@ -153,6 +175,15 @@ pub trait GenServer: Send + Sized {
153175
GenServerHandle::new_blocking(self)
154176
}
155177

178+
/// For some "singleton" GenServers that run througout the whole execution of the
179+
/// program, it makes sense to run in their own dedicated thread to avoid interference
180+
/// with the rest of the tasks' runtime.
181+
/// The use of tokio::task::spawm_blocking is not recommended for these scenarios
182+
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end
183+
fn start_on_thread(self) -> GenServerHandle<Self> {
184+
GenServerHandle::new_on_thread(self)
185+
}
186+
156187
fn run(
157188
self,
158189
handle: &GenServerHandle<Self>,
@@ -300,6 +331,36 @@ pub trait GenServer: Send + Sized {
300331
}
301332
}
302333

334+
/// Spawns a task that awaits on a future and sends messages to a GenServer.
335+
///
336+
/// This function returns a handle to the spawned task.
337+
pub fn send_message_on<T, U>(
338+
handle: GenServerHandle<T>,
339+
future: U,
340+
message: T::CastMsg,
341+
) -> JoinHandle<()>
342+
where
343+
T: GenServer,
344+
U: Future + Send + 'static,
345+
<U as Future>::Output: Send,
346+
{
347+
let cancelation_token = handle.cancellation_token();
348+
let mut handle_clone = handle.clone();
349+
let join_handle = spawned_rt::tasks::spawn(async move {
350+
tracing::info!("Ctrl+C listener started");
351+
let is_cancelled = core::pin::pin!(cancelation_token.cancelled());
352+
let signal = core::pin::pin!(future);
353+
match futures::future::select(is_cancelled, signal).await {
354+
futures::future::Either::Left(_) => tracing::error!("GenServer stopped"),
355+
futures::future::Either::Right(_) => {
356+
tracing::info!("Sending shutdown to PeerTable Server");
357+
handle_clone.cast(message).await.unwrap();
358+
}
359+
}
360+
});
361+
join_handle
362+
}
363+
303364
#[cfg(debug_assertions)]
304365
mod warn_on_block {
305366
use super::*;

concurrency/src/tasks/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ mod stream_tests;
1212
mod timer_tests;
1313

1414
pub use gen_server::{
15-
CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult,
16-
InitResult::NoSuccess, InitResult::Success,
15+
send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg,
16+
InitResult, InitResult::NoSuccess, InitResult::Success,
1717
};
1818
pub use process::{send, Process, ProcessInfo};
1919
pub use stream::spawn_listener;

examples/blocking_genserver/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl GenServer for WellBehavedTask {
9999
pub fn main() {
100100
rt::run(async move {
101101
// If we change BadlyBehavedTask to start instead, it can stop the entire program
102-
let mut badboy = BadlyBehavedTask::new().start_blocking();
102+
let mut badboy = BadlyBehavedTask::new().start_on_thread();
103103
let _ = badboy.cast(()).await;
104104
let mut goodboy = WellBehavedTask::new(0).start();
105105
let _ = goodboy.cast(()).await;

0 commit comments

Comments
 (0)