Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);

/// Execution backend for GenServer.
///
/// Determines how the GenServer's async loop is executed.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum Backend {
/// Run on tokio async runtime (default).
/// Best for non-blocking, async workloads.
#[default]
Async,
/// Run on tokio's blocking thread pool.
/// Use for blocking operations that eventually complete.
/// The pool is shared and limited in size.
Blocking,
/// Run on a dedicated OS thread.
/// Use for long-running blocking operations or singleton services
/// that should not interfere with the async runtime.
Thread,
}

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
pub tx: mpsc::Sender<GenServerInMsg<G>>,
Expand Down Expand Up @@ -163,26 +182,24 @@ pub trait GenServer: Send + Sized {
type OutMsg: Send + Sized;
type Error: Debug + Send;

/// Start the GenServer with the default backend (Async).
fn start(self) -> GenServerHandle<Self> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Backend::Async is default, we can keep start(self) that uses that Backend internally, to avoid the breaking change.

fn start(self) -> GenServerHandle<Self> {
    self.start(Backend::Async)
}

GenServerHandle::new(self)
}

/// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't
/// happen if the task is blocking the thread. As such, for sync compute task
/// or other blocking tasks need to be in their own separate thread, and the OS
/// will manage them through hardware interrupts.
/// Start blocking provides such thread.
fn start_blocking(self) -> GenServerHandle<Self> {
GenServerHandle::new_blocking(self)
}

/// For some "singleton" GenServers that run througout the whole execution of the
/// program, it makes sense to run in their own dedicated thread to avoid interference
/// with the rest of the tasks' runtime.
/// The use of tokio::task::spawm_blocking is not recommended for these scenarios
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end
fn start_on_thread(self) -> GenServerHandle<Self> {
GenServerHandle::new_on_thread(self)
self.start_with_backend(Backend::default())
}

/// Start the GenServer with the specified backend.
///
/// # Arguments
/// * `backend` - The execution backend to use:
/// - `Backend::Async` - Run on tokio async runtime (default, best for non-blocking workloads)
/// - `Backend::Blocking` - Run on tokio's blocking thread pool (for blocking operations)
/// - `Backend::Thread` - Run on a dedicated OS thread (for long-running blocking services)
fn start_with_backend(self, backend: Backend) -> GenServerHandle<Self> {
match backend {
Backend::Async => GenServerHandle::new(self),
Backend::Blocking => GenServerHandle::new_blocking(self),
Backend::Thread => GenServerHandle::new_on_thread(self),
}
}

fn run(
Expand Down Expand Up @@ -484,6 +501,8 @@ mod tests {
}
}

const BLOCKING: Backend = Backend::Blocking;

#[test]
pub fn badly_behaved_thread_non_blocking() {
let runtime = rt::Runtime::new().unwrap();
Expand All @@ -508,7 +527,7 @@ mod tests {
pub fn badly_behaved_thread() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut badboy = BadlyBehavedTask.start_blocking();
let mut badboy = BadlyBehavedTask.start_with_backend(BLOCKING);
let _ = badboy.cast(Unused).await;
let mut goodboy = WellBehavedTask { count: 0 }.start();
let _ = goodboy.cast(Unused).await;
Expand Down
4 changes: 2 additions & 2 deletions concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mod stream_tests;
mod timer_tests;

pub use gen_server::{
send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg,
InitResult, InitResult::NoSuccess, InitResult::Success,
send_message_on, Backend, CallResponse, CastResponse, GenServer, GenServerHandle,
GenServerInMsg, InitResult, InitResult::NoSuccess, InitResult::Success,
};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
Expand Down
10 changes: 5 additions & 5 deletions examples/blocking_genserver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use std::{process::exit, thread};

use spawned_concurrency::tasks::{
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
Backend, CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
};

// We test a scenario with a badly behaved task
Expand Down Expand Up @@ -93,13 +93,13 @@ impl GenServer for WellBehavedTask {
}
}

/// Example of start_blocking to fix issues #8 https://github.com/lambdaclass/spawned/issues/8
/// Example of Backend::Thread to fix issues #8 https://github.com/lambdaclass/spawned/issues/8
/// Tasks that block can block the entire tokio runtime (and other cooperative multitasking models)
/// To fix this we implement start_blocking, which under the hood launches a new thread to deal with the issue
/// To fix this we use Backend::Thread, which under the hood launches a new thread to deal with the issue
pub fn main() {
rt::run(async move {
// If we change BadlyBehavedTask to start instead, it can stop the entire program
let mut badboy = BadlyBehavedTask::new().start_on_thread();
// If we change BadlyBehavedTask to Backend::Async instead, it can stop the entire program
let mut badboy = BadlyBehavedTask::new().start_with_backend(Backend::Thread);
let _ = badboy.cast(()).await;
let mut goodboy = WellBehavedTask::new(0).start();
let _ = goodboy.cast(()).await;
Expand Down
2 changes: 1 addition & 1 deletion examples/busy_genserver_warning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl GenServer for BusyWorker {
/// whenever we detect tasks that take too long to run.
pub fn main() {
rt::run(async move {
// If we change BusyWorker to start_blocking instead, it won't print the warning
// If we change BusyWorker to Backend::Blocking instead, it won't print the warning
let mut badboy = BusyWorker::new().start();
let _ = badboy.cast(()).await;

Expand Down