Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ cargo flamegraph --bin example_name

### Core Crate Structure
- **`msg/`** - Main user-facing crate (facade pattern, re-exports everything)
- **`msg-socket/`** - Socket patterns (Req/Rep, Pub/Sub, with statistics and authentication)
- **`msg-socket/`** - Socket patterns (Req/Rep, Pub/Sub, with statistics and connection hooks)
- **`msg-transport/`** - Transport layer (TCP, QUIC, IPC with pluggable trait design)
- **`msg-wire/`** - Wire protocol (authentication, compression: gzip/lz4/snappy/zstd)
- **`msg-common/`** - Shared utilities (channels, task management, time utils)
- **`msg-sim/`** - Network simulation on Linux, powered by rtnetlink.

### Key Design Patterns
- **Trait-based extensibility** - Transport, Authenticator traits for pluggability
- **Trait-based extensibility** - Transport, ConnectionHook traits for pluggability
- **Async-first** - Built on Tokio, stream-based message handling
- **Statistics collection** - Built-in latency/throughput/drop metrics in sockets
- **Connection resilience** - Automatic reconnection with configurable backoff
Expand Down Expand Up @@ -90,11 +90,41 @@ cargo flamegraph --bin example_name
- **Book** - Comprehensive guide in `book/` directory, deployed to GitHub Pages
- **Examples** - Rich example set covering all major usage patterns in `msg/examples/`

### Doc Code Blocks
- Prefer `no_run` over `rust,ignore` in documentation code blocks whenever possible
- Use `rust,ignore` only when the code cannot compile (e.g., pseudo-code or incomplete examples)

## Important Implementation Notes

### Authentication System
- Implement `Authenticator` trait for custom auth logic
- Built-in support in wire protocol, examples available
### Connection Hooks
- Implement `ConnectionHook` trait for custom authentication, handshakes, or protocol negotiation
- Built-in token-based auth via `hooks::token::ServerHook` and `hooks::token::ClientHook`

### Tracing Pattern for Async Tasks
When spawning async tasks that need tracing context, use the `WithSpan` pattern from `msg_common::span`:

```rust
use msg_common::span::{EnterSpan as _, SpanExt as _, WithSpan};

// 1. Create a span with context
let span = tracing::info_span!("connection_hook", ?addr);

// 2. Wrap the future with the span
let fut = async move { /* ... */ };
self.tasks.spawn(fut.with_span(span));

// 3. Poll with .enter() to re-enter the span, access result via .inner
if let Poll::Ready(Some(Ok(result))) = self.tasks.poll_join_next(cx).enter() {
match result.inner {
Ok(value) => { /* ... */ }
Err(e) => { /* ... */ }
}
}
```

- The `JoinSet` type becomes `JoinSet<WithSpan<T>>` instead of `JoinSet<T>`
- Logs inside the span don't need to repeat span fields (e.g., no `?addr` needed)
- Add `#[allow(clippy::type_complexity)]` to structs with complex `WithSpan` types

### Statistics Collection
- All socket types collect latency, throughput, and packet drop metrics
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

152 changes: 152 additions & 0 deletions msg-socket/src/hooks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! Connection hooks for customizing connection establishment.
//!
//! Connection hooks are attached when establishing connections and allow custom
//! authentication, handshakes, or protocol negotiations. The [`ConnectionHook`] trait
//! is called during connection setup, before the connection is used for messaging.
//!
//! # Built-in Hooks
//!
//! The [`token`] module provides ready-to-use token-based authentication hooks:
//! - [`token::ServerHook`] - Server-side hook that validates client tokens
//! - [`token::ClientHook`] - Client-side hook that sends a token to the server
//!
//! # Custom Hooks
//!
//! Implement [`ConnectionHook`] for custom authentication or protocol negotiation:
//!
//! ```no_run
//! use msg_socket::hooks::{ConnectionHook, Error, HookResult};
//! use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
//!
//! struct MyAuth;
//!
//! #[derive(Debug, thiserror::Error)]
//! enum MyAuthError {
//! #[error("invalid token")]
//! InvalidToken,
//! }
//!
//! impl<Io> ConnectionHook<Io> for MyAuth
//! where
//! Io: AsyncRead + AsyncWrite + Send + Unpin + 'static,
//! {
//! type Error = MyAuthError;
//!
//! async fn on_connection(&self, mut io: Io) -> HookResult<Io, Self::Error> {
//! let mut buf = [0u8; 32];
//! io.read_exact(&mut buf).await?;
//! if &buf == b"expected_token_value_32_bytes!!!" {
//! io.write_all(b"OK").await?;
//! Ok(io)
//! } else {
//! Err(Error::hook(MyAuthError::InvalidToken))
//! }
//! }
//! }
//! ```
//!
//! # Future Extensions
//!
//! TODO: Additional hooks may be added for different parts of the connection lifecycle
//! (e.g., disconnection, reconnection, periodic health checks).

use std::{error::Error as StdError, future::Future, io, pin::Pin, sync::Arc};

use tokio::io::{AsyncRead, AsyncWrite};

pub mod token;

/// Error type for connection hooks.
///
/// Distinguishes between I/O errors and hook-specific errors.
#[derive(Debug, thiserror::Error)]
pub enum Error<E> {
/// An I/O error occurred.
#[error("IO error: {0}")]
Io(#[from] io::Error),
/// A hook-specific error.
#[error("Hook error: {0}")]
Hook(#[source] E),
}

impl<E> Error<E> {
/// Create a hook error from a hook-specific error.
pub fn hook(err: E) -> Self {
Error::Hook(err)
}
}

/// Result type for connection hooks.
///
/// This is intentionally named `HookResult` (not `Result`) to make it clear this is not
/// `std::result::Result`. A `HookResult` can be:
/// - `Ok(io)` - success, returns the IO stream
/// - `Err(Error::Io(..))` - an I/O error occurred
/// - `Err(Error::Hook(..))` - a hook-specific error occurred
pub type HookResult<T, E> = std::result::Result<T, Error<E>>;

/// Type-erased hook result used internally by drivers.
pub(crate) type ErasedHookResult<T> = HookResult<T, Box<dyn StdError + Send + Sync>>;

/// Connection hook executed during connection establishment.
///
/// For server sockets: called when a connection is accepted.
/// For client sockets: called after connecting.
///
/// The connection hook receives the raw IO stream and has full control over the handshake protocol.
pub trait ConnectionHook<Io>: Send + Sync + 'static
where
Io: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
/// The hook-specific error type.
type Error: StdError + Send + Sync + 'static;

/// Called when a connection is established.
///
/// # Arguments
/// * `io` - The raw IO stream for this connection
///
/// # Returns
/// - `Ok(io)` - The IO stream on success (potentially wrapped/transformed)
/// - `Err(Error::Io(..))` - An I/O error occurred
/// - `Err(Error::Hook(Self::Error))` - A hook-specific error to reject the connection
fn on_connection(&self, io: Io) -> impl Future<Output = HookResult<Io, Self::Error>> + Send;
}

// ============================================================================
// Type-erased connection hook for internal use
// ============================================================================

/// Type-erased connection hook for internal use.
///
/// This trait allows storing connection hooks with different concrete types behind a single
/// `Arc<dyn ConnectionHookErased<Io>>`. The hook error type is erased to `Box<dyn Error>`.
pub(crate) trait ConnectionHookErased<Io>: Send + Sync + 'static
where
Io: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
fn on_connection(
self: Arc<Self>,
io: Io,
) -> Pin<Box<dyn Future<Output = ErasedHookResult<Io>> + Send + 'static>>;
}

impl<T, Io> ConnectionHookErased<Io> for T
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice pattern! Simplifies implementation for any consumers of this API

where
T: ConnectionHook<Io>,
Io: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
fn on_connection(
self: Arc<Self>,
io: Io,
) -> Pin<Box<dyn Future<Output = ErasedHookResult<Io>> + Send + 'static>> {
Box::pin(async move {
ConnectionHook::on_connection(&*self, io).await.map_err(|e| match e {
Error::Io(io_err) => Error::Io(io_err),
Error::Hook(hook_err) => {
Error::Hook(Box::new(hook_err) as Box<dyn StdError + Send + Sync>)
}
})
})
}
}
Loading
Loading