Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 307 additions & 7 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,88 @@ const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);

/// Execution backend for GenServer.
///
/// Determines how the GenServer's async loop is executed.
/// Determines how the GenServer's async loop is executed. Choose based on
/// the nature of your workload:
///
/// # Backend Comparison
///
/// | Backend | Execution Model | Best For | Limitations |
/// |---------|-----------------|----------|-------------|
/// | `Async` | Tokio task | Non-blocking I/O, async operations | Blocks runtime if sync code runs too long |
/// | `Blocking` | Tokio blocking pool | Short blocking operations (file I/O, DNS) | Shared pool with limited threads |
/// | `Thread` | Dedicated OS thread with own runtime | Long-running services, isolation from main runtime | Higher memory overhead per GenServer |
///
/// **Note**: All backends use async internally. For fully synchronous code without any async
/// runtime, use [`threads::GenServer`](crate::threads::GenServer) instead.
///
/// # Examples
///
/// ```ignore
/// // For typical async workloads (HTTP handlers, database queries)
/// let handle = MyServer::new().start();
///
/// // For occasional blocking operations (file reads, external commands)
/// let handle = MyServer::new().start_with_backend(Backend::Blocking);
///
/// // For CPU-intensive or permanently blocking services
/// let handle = MyServer::new().start_with_backend(Backend::Thread);
/// ```
///
/// # When to Use Each Backend
///
/// ## `Backend::Async` (Default)
/// - **Advantages**: Lightweight, efficient, good for high concurrency
/// - **Use when**: Your GenServer does mostly async I/O (network, database)
/// - **Avoid when**: Your code blocks (e.g., `std::thread::sleep`, heavy computation)
///
/// ## `Backend::Blocking`
/// - **Advantages**: Prevents blocking the async runtime, uses tokio's managed pool
/// - **Use when**: You have occasional blocking operations that complete quickly
/// - **Avoid when**: You need guaranteed thread availability or long-running blocks
///
/// ## `Backend::Thread`
/// - **Advantages**: Isolated from main runtime, dedicated thread won't affect other tasks
/// - **Use when**: Long-running singleton services that shouldn't share the main runtime
/// - **Avoid when**: You need many GenServers (each gets its own OS thread + runtime)
/// - **Note**: Still uses async internally (own runtime). For sync code, use `threads::GenServer`
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum Backend {
/// Run on tokio async runtime (default).
/// Best for non-blocking, async workloads.
///
/// Best for non-blocking, async workloads. The GenServer runs as a
/// lightweight tokio task, enabling high concurrency with minimal overhead.
///
/// **Warning**: If your `handle_call` or `handle_cast` blocks synchronously
/// (e.g., `std::thread::sleep`, CPU-heavy loops), it will block the entire
/// tokio runtime thread, affecting other tasks.
#[default]
Async,

/// Run on tokio's blocking thread pool.
/// Use for blocking operations that eventually complete.
/// The pool is shared and limited in size.
///
/// Use for GenServers that perform blocking operations like:
/// - Synchronous file I/O
/// - DNS lookups
/// - External process calls
/// - Short CPU-bound computations
///
/// The pool is shared across all `spawn_blocking` calls and has a default
/// limit of 512 threads. If the pool is exhausted, new blocking tasks wait.
Blocking,
/// Run on a dedicated OS thread.
/// Use for long-running blocking operations or singleton services
/// that should not interfere with the async runtime.

/// Run on a dedicated OS thread with its own async runtime.
///
/// Use for GenServers that:
/// - Need isolation from the main tokio runtime
/// - Are long-running singleton services
/// - Should not compete with other tasks for runtime resources
///
/// Each GenServer gets its own thread with a separate tokio runtime,
/// providing isolation from other async tasks. Higher memory overhead
/// (~2MB stack per thread plus runtime overhead).
///
/// **Note**: This still uses async internally. For fully synchronous code
/// without any async runtime, use [`threads::GenServer`](crate::threads::GenServer).
Thread,
}

Expand Down Expand Up @@ -643,4 +711,236 @@ mod tests {
assert!(rx.is_closed())
});
}

// ==================== Backend enum tests ====================

#[test]
pub fn backend_default_is_async() {
assert_eq!(Backend::default(), Backend::Async);
}

#[test]
#[allow(clippy::clone_on_copy)]
pub fn backend_enum_is_copy_and_clone() {
let backend = Backend::Async;
let copied = backend; // Copy
let cloned = backend.clone(); // Clone - intentionally testing Clone trait
assert_eq!(backend, copied);
assert_eq!(backend, cloned);
}

#[test]
pub fn backend_enum_debug_format() {
assert_eq!(format!("{:?}", Backend::Async), "Async");
assert_eq!(format!("{:?}", Backend::Blocking), "Blocking");
assert_eq!(format!("{:?}", Backend::Thread), "Thread");
}

#[test]
pub fn backend_enum_equality() {
assert_eq!(Backend::Async, Backend::Async);
assert_eq!(Backend::Blocking, Backend::Blocking);
assert_eq!(Backend::Thread, Backend::Thread);
assert_ne!(Backend::Async, Backend::Blocking);
assert_ne!(Backend::Async, Backend::Thread);
assert_ne!(Backend::Blocking, Backend::Thread);
}

// ==================== Backend functionality tests ====================

/// Simple counter GenServer for testing all backends
struct Counter {
count: u64,
}

#[derive(Clone)]
enum CounterCall {
Get,
Increment,
Stop,
}

#[derive(Clone)]
enum CounterCast {
Increment,
}

impl GenServer for Counter {
type CallMsg = CounterCall;
type CastMsg = CounterCast;
type OutMsg = u64;
type Error = ();

async fn handle_call(
&mut self,
message: Self::CallMsg,
_: &GenServerHandle<Self>,
) -> CallResponse<Self> {
match message {
CounterCall::Get => CallResponse::Reply(self.count),
CounterCall::Increment => {
self.count += 1;
CallResponse::Reply(self.count)
}
CounterCall::Stop => CallResponse::Stop(self.count),
}
}

async fn handle_cast(
&mut self,
message: Self::CastMsg,
_: &GenServerHandle<Self>,
) -> CastResponse {
match message {
CounterCast::Increment => {
self.count += 1;
CastResponse::NoReply
}
}
}
}

#[test]
pub fn backend_async_handles_call_and_cast() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut counter = Counter { count: 0 }.start();

// Test call
let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 0);

let result = counter.call(CounterCall::Increment).await.unwrap();
assert_eq!(result, 1);

// Test cast
counter.cast(CounterCast::Increment).await.unwrap();
rt::sleep(Duration::from_millis(10)).await; // Give time for cast to process

let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 2);

// Stop
let final_count = counter.call(CounterCall::Stop).await.unwrap();
assert_eq!(final_count, 2);
});
}

#[test]
pub fn backend_blocking_handles_call_and_cast() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut counter = Counter { count: 0 }.start_with_backend(Backend::Blocking);

// Test call
let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 0);

let result = counter.call(CounterCall::Increment).await.unwrap();
assert_eq!(result, 1);

// Test cast
counter.cast(CounterCast::Increment).await.unwrap();
rt::sleep(Duration::from_millis(50)).await; // Give time for cast to process

let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 2);

// Stop
let final_count = counter.call(CounterCall::Stop).await.unwrap();
assert_eq!(final_count, 2);
});
}

#[test]
pub fn backend_thread_handles_call_and_cast() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut counter = Counter { count: 0 }.start_with_backend(Backend::Thread);

// Test call
let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 0);

let result = counter.call(CounterCall::Increment).await.unwrap();
assert_eq!(result, 1);

// Test cast
counter.cast(CounterCast::Increment).await.unwrap();
rt::sleep(Duration::from_millis(50)).await; // Give time for cast to process

let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 2);

// Stop
let final_count = counter.call(CounterCall::Stop).await.unwrap();
assert_eq!(final_count, 2);
});
}

#[test]
pub fn backend_thread_isolates_blocking_work() {
// Similar to badly_behaved_thread but using Backend::Thread
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut badboy = BadlyBehavedTask.start_with_backend(Backend::Thread);
let _ = badboy.cast(Unused).await;
let mut goodboy = WellBehavedTask { count: 0 }.start();
let _ = goodboy.cast(Unused).await;
rt::sleep(Duration::from_secs(1)).await;
let count = goodboy.call(InMessage::GetCount).await.unwrap();

// goodboy should have run normally because badboy is on a separate thread
match count {
OutMsg::Count(num) => {
assert_eq!(num, 10);
}
}
goodboy.call(InMessage::Stop).await.unwrap();
});
}

#[test]
pub fn multiple_backends_concurrent() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
// Start counters on all three backends
let mut async_counter = Counter { count: 0 }.start();
let mut blocking_counter = Counter { count: 100 }.start_with_backend(Backend::Blocking);
let mut thread_counter = Counter { count: 200 }.start_with_backend(Backend::Thread);

// Increment each
async_counter.call(CounterCall::Increment).await.unwrap();
blocking_counter.call(CounterCall::Increment).await.unwrap();
thread_counter.call(CounterCall::Increment).await.unwrap();

// Verify each has independent state
let async_val = async_counter.call(CounterCall::Get).await.unwrap();
let blocking_val = blocking_counter.call(CounterCall::Get).await.unwrap();
let thread_val = thread_counter.call(CounterCall::Get).await.unwrap();

assert_eq!(async_val, 1);
assert_eq!(blocking_val, 101);
assert_eq!(thread_val, 201);

// Clean up
async_counter.call(CounterCall::Stop).await.unwrap();
blocking_counter.call(CounterCall::Stop).await.unwrap();
thread_counter.call(CounterCall::Stop).await.unwrap();
});
}

#[test]
pub fn backend_default_works_in_start() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
// Using Backend::default() should work the same as Backend::Async
let mut counter = Counter { count: 42 }.start_with_backend(Backend::Async);

let result = counter.call(CounterCall::Get).await.unwrap();
assert_eq!(result, 42);

counter.call(CounterCall::Stop).await.unwrap();
});
}
}