Skip to content

feat: Add Worker transport for isolated task execution #92

@avrabe

Description

@avrabe

Summary

Add a Worker transport abstraction that enables running MCP handlers in isolated tokio tasks with channel-based communication.

Motivation

Useful for:

  • Parallel tool execution with isolation
  • Custom runtime architectures
  • Background processing with cancellation support
  • Sandboxed execution contexts

Proposed Implementation

/// Worker trait - implement to define custom worker behavior
#[async_trait]
pub trait Worker: Send + 'static {
    type Error: std::error::Error + Send;
    
    async fn run(self, ctx: WorkerContext) -> Result<(), Self::Error>;
}

/// Context provided to workers
pub struct WorkerContext {
    rx: mpsc::Receiver<JsonRpcMessage>,
    tx: mpsc::Sender<JsonRpcMessage>,
    cancellation: CancellationToken,
}

impl WorkerContext {
    pub async fn recv(&mut self) -> Option<JsonRpcMessage>;
    pub async fn send(&self, msg: JsonRpcMessage) -> Result<(), SendError>;
    pub fn cancellation_token(&self) -> CancellationToken;
}

/// Transport wrapper that spawns worker on tokio task
pub struct WorkerTransport {
    tx: mpsc::Sender<JsonRpcMessage>,
    rx: mpsc::Receiver<JsonRpcMessage>,
    handle: JoinHandle<Result<(), WorkerError>>,
    cancellation: CancellationToken,
}

impl WorkerTransport {
    pub fn spawn<W: Worker>(worker: W, buffer_size: usize) -> Self;
    pub async fn cancel(&self);
}

impl Transport for WorkerTransport {
    // Channel-based send/receive
}

Use Case Example

struct ToolWorker {
    tool_registry: Arc<ToolRegistry>,
}

#[async_trait]
impl Worker for ToolWorker {
    type Error = ToolError;
    
    async fn run(self, mut ctx: WorkerContext) -> Result<(), Self::Error> {
        while let Some(msg) = ctx.recv().await {
            let result = self.tool_registry.handle(msg).await;
            ctx.send(result).await?;
        }
        Ok(())
    }
}

// Spawn isolated tool executor
let transport = WorkerTransport::spawn(ToolWorker { tool_registry }, 32);

Code Reuse

Existing Reuse
Transport trait Interface
CancellationToken From tokio-util
JSON-RPC types From mcp-protocol

Priority

Medium - enables advanced architectures but not required for basic functionality.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions