Skip to content

feat: keep internal error in worker's quit reason #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl StreamableHttpClient for reqwest::Client {
}
let response = request_builder.send().await?;
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED {
return Err(StreamableHttpError::SeverDoesNotSupportSse);
return Err(StreamableHttpError::ServerDoesNotSupportSse);
}
let response = response.error_for_status()?;
match response.headers().get(reqwest::header::CONTENT_TYPE) {
Expand Down
26 changes: 17 additions & 9 deletions crates/rmcp/src/transport/streamable_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ pub enum StreamableHttpError<E: std::error::Error + Send + Sync + 'static> {
#[error("Unexpected content type: {0:?}")]
UnexpectedContentType(Option<String>),
#[error("Server does not support SSE")]
SeverDoesNotSupportSse,
ServerDoesNotSupportSse,
#[error("Server does not support delete session")]
SeverDoesNotSupportDeleteSession,
ServerDoesNotSupportDeleteSession,
#[error("Tokio join error: {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
#[error("Deserialize error: {0}")]
Deserialize(#[from] serde_json::Error),
#[error("Transport channel closed")]
TransportChannelClosed,
#[error("Missing session id in response")]
MissingSessionIdInResponse,
#[cfg(feature = "auth")]
#[cfg_attr(docsrs, doc(cfg(feature = "auth")))]
#[error("Auth error: {0}")]
Expand All @@ -54,6 +56,12 @@ impl From<reqwest::Error> for StreamableHttpError<reqwest::Error> {
}
}

#[derive(Debug, Clone, Error)]
pub enum StreamableHttpProtocolError {
#[error("Missing session id in response")]
MissingSessionIdInResponse,
}

pub enum StreamableHttpPostResponse {
Accepted,
Json(ServerJsonRpcMessage, Option<String>),
Expand Down Expand Up @@ -261,7 +269,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
async fn run(
self,
mut context: super::worker::WorkerContext<Self>,
) -> Result<(), WorkerQuitReason> {
) -> Result<(), WorkerQuitReason<Self::Error>> {
let channel_buffer_capacity = self.config.channel_buffer_capacity;
let (sse_worker_tx, mut sse_worker_rx) =
tokio::sync::mpsc::channel::<ServerJsonRpcMessage>(channel_buffer_capacity);
Expand All @@ -278,7 +286,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
.post_message(config.uri.clone(), initialize_request, None, None)
.await
.map_err(WorkerQuitReason::fatal_context("send initialize request"))?
.expect_initialized::<Self::Error>()
.expect_initialized::<C::Error>()
.await
.map_err(WorkerQuitReason::fatal_context(
"process initialize response",
Expand All @@ -288,7 +296,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
} else {
if !self.config.allow_stateless {
return Err(WorkerQuitReason::fatal(
"missing session id in initialize response",
StreamableHttpError::<C::Error>::MissingSessionIdInResponse,
"process initialize response",
));
}
Expand All @@ -308,7 +316,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
Ok(_) => {
tracing::info!(session_id = session_id.as_ref(), "delete session success")
}
Err(StreamableHttpError::SeverDoesNotSupportDeleteSession) => {
Err(StreamableHttpError::ServerDoesNotSupportDeleteSession) => {
tracing::info!(
session_id = session_id.as_ref(),
"server doesn't support delete session"
Expand Down Expand Up @@ -338,7 +346,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
.map_err(WorkerQuitReason::fatal_context(
"send initialized notification",
))?
.expect_accepted::<Self::Error>()
.expect_accepted::<C::Error>()
.map_err(WorkerQuitReason::fatal_context(
"process initialized notification response",
))?;
Expand Down Expand Up @@ -373,14 +381,14 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
));
tracing::debug!("got common stream");
}
Err(StreamableHttpError::SeverDoesNotSupportSse) => {
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!("server doesn't support sse, skip common stream");
}
Err(e) => {
// fail to get common stream
tracing::error!("fail to get common stream: {e}");
return Err(WorkerQuitReason::fatal(
"fail to get general purpose event stream",
e,
"get general purpose event stream",
));
}
Expand Down
58 changes: 41 additions & 17 deletions crates/rmcp/src/transport/streamable_http_server/session/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,8 @@ pub enum SessionError {
SessionServiceTerminated,
#[error("Invalid event id")]
InvalidEventId,
#[error("Transport closed")]
TransportClosed,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Tokio join error {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
}

impl From<SessionError> for std::io::Error {
Expand All @@ -317,7 +313,7 @@ enum OutboundChannel {
RequestWise { id: HttpRequestId, close: bool },
Common,
}

#[derive(Debug)]
pub struct StreamableHttpMessageReceiver {
pub http_request_id: Option<HttpRequestId>,
pub inner: Receiver<ServerSseMessage>,
Expand Down Expand Up @@ -534,8 +530,8 @@ impl LocalSessionWorker {
}
}
}

enum SessionEvent {
#[derive(Debug)]
pub enum SessionEvent {
ClientMessage {
message: ClientJsonRpcMessage,
http_request_id: Option<HttpRequestId>,
Expand Down Expand Up @@ -695,14 +691,31 @@ impl LocalSessionHandle {

pub type SessionTransport = WorkerTransport<LocalSessionWorker>;

#[derive(Debug, Error)]
pub enum LocalSessionWorkerError {
#[error("transport terminated")]
TransportTerminated,
#[error("unexpected message: {0:?}")]
UnexpectedEvent(SessionEvent),
#[error("fail to send initialize request {0}")]
FailToSendInitializeRequest(SessionError),
#[error("fail to handle message: {0}")]
FailToHandleMessage(SessionError),
#[error("keep alive timeout after {}ms", _0.as_millis())]
KeepAliveTimeout(Duration),
#[error("Transport closed")]
TransportClosed,
#[error("Tokio join error {0}")]
TokioJoinError(#[from] tokio::task::JoinError),
}
impl Worker for LocalSessionWorker {
type Error = SessionError;
type Error = LocalSessionWorkerError;
type Role = RoleServer;
fn err_closed() -> Self::Error {
SessionError::TransportClosed
LocalSessionWorkerError::TransportClosed
}
fn err_join(e: tokio::task::JoinError) -> Self::Error {
SessionError::TokioJoinError(e)
LocalSessionWorkerError::TokioJoinError(e)
}
fn config(&self) -> crate::transport::worker::WorkerConfig {
crate::transport::worker::WorkerConfig {
Expand All @@ -711,18 +724,24 @@ impl Worker for LocalSessionWorker {
}
}
#[instrument(name = "streamable_http_session", skip_all, fields(id = self.id.as_ref()))]
async fn run(mut self, mut context: WorkerContext<Self>) -> Result<(), WorkerQuitReason> {
async fn run(
mut self,
mut context: WorkerContext<Self>,
) -> Result<(), WorkerQuitReason<Self::Error>> {
enum InnerEvent {
FromHttpService(SessionEvent),
FromHandler(WorkerSendRequest<LocalSessionWorker>),
}
// waiting for initialize request
let evt = self.event_rx.recv().await.ok_or_else(|| {
WorkerQuitReason::fatal("transport terminated", "get initialize request")
WorkerQuitReason::fatal(
LocalSessionWorkerError::TransportTerminated,
"get initialize request",
)
})?;
let SessionEvent::InitializeRequest { request, responder } = evt else {
return Err(WorkerQuitReason::fatal(
"unexpected message",
LocalSessionWorkerError::UnexpectedEvent(evt),
"get initialize request",
));
};
Expand All @@ -732,7 +751,9 @@ impl Worker for LocalSessionWorker {
.send(Ok(send_initialize_response.message))
.map_err(|_| {
WorkerQuitReason::fatal(
"failed to send initialize response to http service",
LocalSessionWorkerError::FailToSendInitializeRequest(
SessionError::SessionServiceTerminated,
),
"send initialize response",
)
})?;
Expand All @@ -749,7 +770,7 @@ impl Worker for LocalSessionWorker {
if let Some(event) = event {
InnerEvent::FromHttpService(event)
} else {
return Err(WorkerQuitReason::fatal("session dropped", "waiting next session event"))
return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::TransportTerminated, "waiting next session event"))
}
},
from_handler = context.recv_from_handler() => {
Expand All @@ -759,7 +780,7 @@ impl Worker for LocalSessionWorker {
return Err(WorkerQuitReason::Cancelled)
}
_ = keep_alive_timeout => {
return Err(WorkerQuitReason::fatal("keep live timeout", "poll next session event"))
return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::KeepAliveTimeout(keep_alive), "poll next session event"))
}
};
match event {
Expand All @@ -779,7 +800,10 @@ impl Worker for LocalSessionWorker {
// no need to unregister resource
}
};
let handle_result = self.handle_server_message(message).await;
let handle_result = self
.handle_server_message(message)
.await
.map_err(LocalSessionWorkerError::FailToHandleMessage);
let _ = responder.send(handle_result).inspect_err(|error| {
tracing::warn!(?error, "failed to send message to http service handler");
});
Expand Down
26 changes: 13 additions & 13 deletions crates/rmcp/src/transport/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use super::{IntoTransport, Transport};
use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};

#[derive(Debug, thiserror::Error)]
pub enum WorkerQuitReason {
pub enum WorkerQuitReason<E> {
#[error("Join error {0}")]
Join(#[from] tokio::task::JoinError),
#[error("Transport fatal {error}, when {context}")]
Fatal {
error: Cow<'static, str>,
error: E,
context: Cow<'static, str>,
},
#[error("Transport canncelled")]
Expand All @@ -23,18 +23,16 @@ pub enum WorkerQuitReason {
HandlerTerminated,
}

impl WorkerQuitReason {
pub fn fatal(msg: impl Into<Cow<'static, str>>, context: impl Into<Cow<'static, str>>) -> Self {
impl<E: std::error::Error + Send + 'static> WorkerQuitReason<E> {
pub fn fatal(error: E, context: impl Into<Cow<'static, str>>) -> Self {
Self::Fatal {
error: msg.into(),
error,
context: context.into(),
}
}
pub fn fatal_context<E: std::error::Error>(
context: impl Into<Cow<'static, str>>,
) -> impl FnOnce(E) -> Self {
pub fn fatal_context(context: impl Into<Cow<'static, str>>) -> impl FnOnce(E) -> Self {
|e| Self::Fatal {
error: Cow::Owned(format!("{e}")),
error: e,
context: context.into(),
}
}
Expand All @@ -48,7 +46,7 @@ pub trait Worker: Sized + Send + 'static {
fn run(
self,
context: WorkerContext<Self>,
) -> impl Future<Output = Result<(), WorkerQuitReason>> + Send;
) -> impl Future<Output = Result<(), WorkerQuitReason<Self::Error>>> + Send;
fn config(&self) -> WorkerConfig {
WorkerConfig::default()
}
Expand All @@ -62,7 +60,7 @@ pub struct WorkerSendRequest<W: Worker> {
pub struct WorkerTransport<W: Worker> {
rx: tokio::sync::mpsc::Receiver<RxJsonRpcMessage<W::Role>>,
send_service: tokio::sync::mpsc::Sender<WorkerSendRequest<W>>,
join_handle: Option<tokio::task::JoinHandle<Result<(), WorkerQuitReason>>>,
join_handle: Option<tokio::task::JoinHandle<Result<(), WorkerQuitReason<W::Error>>>>,
_drop_guard: tokio_util::sync::DropGuard,
ct: CancellationToken,
}
Expand Down Expand Up @@ -159,14 +157,16 @@ impl<W: Worker> WorkerContext<W> {
pub async fn send_to_handler(
&mut self,
item: RxJsonRpcMessage<W::Role>,
) -> Result<(), WorkerQuitReason> {
) -> Result<(), WorkerQuitReason<W::Error>> {
self.to_handler_tx
.send(item)
.await
.map_err(|_| WorkerQuitReason::HandlerTerminated)
}

pub async fn recv_from_handler(&mut self) -> Result<WorkerSendRequest<W>, WorkerQuitReason> {
pub async fn recv_from_handler(
&mut self,
) -> Result<WorkerSendRequest<W>, WorkerQuitReason<W::Error>> {
self.from_handler_rx
.recv()
.await
Expand Down