Skip to content
Closed
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"examples/updater_threads",
"examples/blocking_genserver",
"examples/busy_genserver_warning",
"examples/supervisor",
]

[workspace.dependencies]
Expand Down
20 changes: 10 additions & 10 deletions concurrency/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
#[derive(Debug, thiserror::Error)]
pub enum GenServerError {
pub enum ActorError {
#[error("Callback Error")]
Callback,
#[error("Initialization error")]
Initialization,
#[error("Server error")]
Server,
#[error("Unsupported Call Messages on this GenServer")]
CallMsgUnused,
#[error("Unsupported Cast Messages on this GenServer")]
CastMsgUnused,
#[error("Call to GenServer timed out")]
CallTimeout,
#[error("Unsupported Request on this Actor")]
RequestUnused,
#[error("Unsupported Message on this Actor")]
MessageUnused,
#[error("Request to Actor timed out")]
RequestTimeout,
}

impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for GenServerError {
impl<T> From<spawned_rt::threads::mpsc::SendError<T>> for ActorError {
fn from(_value: spawned_rt::threads::mpsc::SendError<T>) -> Self {
Self::Server
}
}

impl<T> From<spawned_rt::tasks::mpsc::SendError<T>> for GenServerError {
impl<T> From<spawned_rt::tasks::mpsc::SendError<T>> for ActorError {
fn from(_value: spawned_rt::tasks::mpsc::SendError<T>) -> Self {
Self::Server
}
Expand All @@ -32,7 +32,7 @@ mod tests {

#[test]
fn test_error_into_std_error() {
let error: &dyn std::error::Error = &GenServerError::Callback;
let error: &dyn std::error::Error = &ActorError::Callback;
assert_eq!(error.to_string(), "Callback Error");
}
}
35 changes: 35 additions & 0 deletions concurrency/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
//! spawned concurrency
//! Some basic traits and structs to implement concurrent code à-la-Erlang.
pub mod error;
pub mod link;
pub mod messages;
pub mod pid;
pub mod process_table;
pub mod registry;
pub mod supervisor;
pub mod tasks;
pub mod threads;

/// Backend selection for Actor execution.
///
/// Determines how an Actor is spawned and executed:
/// - `Async`: Runs on the async runtime (tokio tasks) - cooperative multitasking
/// - `Blocking`: Runs on a blocking thread pool (spawn_blocking) - for blocking I/O
/// - `Thread`: Runs on a dedicated OS thread - for long-running singletons
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum Backend {
/// Run on the async runtime (default). Best for non-blocking, I/O-bound tasks.
#[default]
Async,
/// Run on a blocking thread pool. Best for blocking I/O or CPU-bound tasks.
Blocking,
/// Run on a dedicated OS thread. Best for long-running singleton actors.
Thread,
}

// Re-export commonly used types at the crate root
pub use link::{MonitorRef, SystemMessage};
pub use pid::{ExitReason, HasPid, Pid};
pub use process_table::LinkError;
pub use registry::RegistryError;
pub use supervisor::{
BoxedChildHandle, ChildHandle, ChildInfo, ChildSpec, ChildType, DynamicSupervisor,
DynamicSupervisorCall, DynamicSupervisorCast, DynamicSupervisorError, DynamicSupervisorResponse,
DynamicSupervisorSpec, RestartIntensityTracker, RestartStrategy, RestartType, Shutdown,
Supervisor, SupervisorCall, SupervisorCast, SupervisorCounts, SupervisorError,
SupervisorResponse, SupervisorSpec,
};
177 changes: 177 additions & 0 deletions concurrency/src/link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//! Process linking and monitoring types.
//!
//! This module provides the types used for process linking and monitoring:
//! - `MonitorRef`: A reference to an active monitor
//! - `SystemMessage`: Messages delivered by the runtime (DOWN, EXIT, Timeout)

use crate::pid::{ExitReason, Pid};
use std::sync::atomic::{AtomicU64, Ordering};

/// Global counter for generating unique monitor references.
static NEXT_MONITOR_REF: AtomicU64 = AtomicU64::new(1);

/// A reference to an active monitor.
///
/// When you monitor another process, you receive a `MonitorRef` that
/// can be used to cancel the monitor later.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct MonitorRef(u64);

impl MonitorRef {
/// Create a new unique monitor reference.
pub(crate) fn new() -> Self {
Self(NEXT_MONITOR_REF.fetch_add(1, Ordering::SeqCst))
}

/// Get the raw ID.
pub fn id(&self) -> u64 {
self.0
}
}

impl std::fmt::Display for MonitorRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "#Ref<{}>", self.0)
}
}

/// System messages delivered to actors via handle_info.
///
/// These messages are automatically generated by the runtime when:
/// - A monitored process exits (Down)
/// - A linked process exits (Exit)
/// - A timer fires (Timeout)
#[derive(Clone, Debug, PartialEq)]
pub enum SystemMessage {
/// A monitored process has exited.
///
/// Received when a process you are monitoring terminates.
/// Unlike links, monitors don't cause the monitoring process to crash.
Down {
/// The Pid of the process that exited.
pid: Pid,
/// The monitor reference (same as returned by `monitor()`).
monitor_ref: MonitorRef,
/// Why the process exited.
reason: ExitReason,
},

/// A linked process has exited.
///
/// Only received if `trap_exit(true)` was called.
/// Otherwise, linked process exits cause the current process to crash.
Exit {
/// The Pid of the linked process that exited.
pid: Pid,
/// Why the process exited.
reason: ExitReason,
},

/// A timer has fired.
///
/// Received when a timer set with `send_after_info` or similar fires.
Timeout {
/// Optional reference to identify which timer fired.
reference: Option<u64>,
},
}

impl SystemMessage {
/// Check if this is a Down message.
pub fn is_down(&self) -> bool {
matches!(self, SystemMessage::Down { .. })
}

/// Check if this is an Exit message.
pub fn is_exit(&self) -> bool {
matches!(self, SystemMessage::Exit { .. })
}

/// Check if this is a Timeout message.
pub fn is_timeout(&self) -> bool {
matches!(self, SystemMessage::Timeout { .. })
}

/// Get the Pid from a Down or Exit message.
pub fn pid(&self) -> Option<Pid> {
match self {
SystemMessage::Down { pid, .. } => Some(*pid),
SystemMessage::Exit { pid, .. } => Some(*pid),
SystemMessage::Timeout { .. } => None,
}
}

/// Get the exit reason from a Down or Exit message.
pub fn reason(&self) -> Option<&ExitReason> {
match self {
SystemMessage::Down { reason, .. } => Some(reason),
SystemMessage::Exit { reason, .. } => Some(reason),
SystemMessage::Timeout { .. } => None,
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn monitor_ref_uniqueness() {
let ref1 = MonitorRef::new();
let ref2 = MonitorRef::new();
let ref3 = MonitorRef::new();

assert_ne!(ref1, ref2);
assert_ne!(ref2, ref3);
assert_ne!(ref1, ref3);

// IDs should be monotonically increasing
assert!(ref1.id() < ref2.id());
assert!(ref2.id() < ref3.id());
}

#[test]
fn system_message_down() {
let pid = Pid::new();
let monitor_ref = MonitorRef::new();
let msg = SystemMessage::Down {
pid,
monitor_ref,
reason: ExitReason::Normal,
};

assert!(msg.is_down());
assert!(!msg.is_exit());
assert!(!msg.is_timeout());
assert_eq!(msg.pid(), Some(pid));
assert_eq!(msg.reason(), Some(&ExitReason::Normal));
}

#[test]
fn system_message_exit() {
let pid = Pid::new();
let msg = SystemMessage::Exit {
pid,
reason: ExitReason::Shutdown,
};

assert!(!msg.is_down());
assert!(msg.is_exit());
assert!(!msg.is_timeout());
assert_eq!(msg.pid(), Some(pid));
assert_eq!(msg.reason(), Some(&ExitReason::Shutdown));
}

#[test]
fn system_message_timeout() {
let msg = SystemMessage::Timeout {
reference: Some(42),
};

assert!(!msg.is_down());
assert!(!msg.is_exit());
assert!(msg.is_timeout());
assert_eq!(msg.pid(), None);
assert_eq!(msg.reason(), None);
}
}
Loading