Skip to content

Commit fdded3b

Browse files
committed
feat(transport): expose internal worker error in fatal
1 parent 9190457 commit fdded3b

File tree

3 files changed

+41
-36
lines changed

3 files changed

+41
-36
lines changed

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
269269
async fn run(
270270
self,
271271
mut context: super::worker::WorkerContext<Self>,
272-
) -> Result<(), WorkerQuitReason> {
272+
) -> Result<(), WorkerQuitReason<Self::Error>> {
273273
let channel_buffer_capacity = self.config.channel_buffer_capacity;
274274
let (sse_worker_tx, mut sse_worker_rx) =
275275
tokio::sync::mpsc::channel::<ServerJsonRpcMessage>(channel_buffer_capacity);
@@ -286,7 +286,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
286286
.post_message(config.uri.clone(), initialize_request, None, None)
287287
.await
288288
.map_err(WorkerQuitReason::fatal_context("send initialize request"))?
289-
.expect_initialized::<Self::Error>()
289+
.expect_initialized::<C::Error>()
290290
.await
291291
.map_err(WorkerQuitReason::fatal_context(
292292
"process initialize response",
@@ -346,7 +346,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
346346
.map_err(WorkerQuitReason::fatal_context(
347347
"send initialized notification",
348348
))?
349-
.expect_accepted::<Self::Error>()
349+
.expect_accepted::<C::Error>()
350350
.map_err(WorkerQuitReason::fatal_context(
351351
"process initialized notification response",
352352
))?;

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,8 @@ pub enum SessionError {
296296
SessionServiceTerminated,
297297
#[error("Invalid event id")]
298298
InvalidEventId,
299-
#[error("Transport closed")]
300-
TransportClosed,
301299
#[error("IO error: {0}")]
302300
Io(#[from] std::io::Error),
303-
#[error("Tokio join error {0}")]
304-
TokioJoinError(#[from] tokio::task::JoinError),
305301
}
306302

307303
impl From<SessionError> for std::io::Error {
@@ -696,24 +692,30 @@ impl LocalSessionHandle {
696692
pub type SessionTransport = WorkerTransport<LocalSessionWorker>;
697693

698694
#[derive(Debug, Error)]
699-
pub enum LocalSessionError {
695+
pub enum LocalSessionWorkerError {
700696
#[error("transport terminated")]
701697
TransportTerminated,
702698
#[error("unexpected message: {0:?}")]
703699
UnexpectedEvent(SessionEvent),
704700
#[error("fail to send initialize request {0}")]
705701
FailToSendInitializeRequest(SessionError),
706-
#[error("keep alive timeout")]
707-
KeepAliveTimeout,
702+
#[error("fail to handle message: {0}")]
703+
FailToHandleMessage(SessionError),
704+
#[error("keep alive timeout after {}ms", _0.as_millis())]
705+
KeepAliveTimeout(Duration),
706+
#[error("Transport closed")]
707+
TransportClosed,
708+
#[error("Tokio join error {0}")]
709+
TokioJoinError(#[from] tokio::task::JoinError),
708710
}
709711
impl Worker for LocalSessionWorker {
710-
type Error = SessionError;
712+
type Error = LocalSessionWorkerError;
711713
type Role = RoleServer;
712714
fn err_closed() -> Self::Error {
713-
SessionError::TransportClosed
715+
LocalSessionWorkerError::TransportClosed
714716
}
715717
fn err_join(e: tokio::task::JoinError) -> Self::Error {
716-
SessionError::TokioJoinError(e)
718+
LocalSessionWorkerError::TokioJoinError(e)
717719
}
718720
fn config(&self) -> crate::transport::worker::WorkerConfig {
719721
crate::transport::worker::WorkerConfig {
@@ -722,21 +724,24 @@ impl Worker for LocalSessionWorker {
722724
}
723725
}
724726
#[instrument(name = "streamable_http_session", skip_all, fields(id = self.id.as_ref()))]
725-
async fn run(mut self, mut context: WorkerContext<Self>) -> Result<(), WorkerQuitReason> {
727+
async fn run(
728+
mut self,
729+
mut context: WorkerContext<Self>,
730+
) -> Result<(), WorkerQuitReason<Self::Error>> {
726731
enum InnerEvent {
727732
FromHttpService(SessionEvent),
728733
FromHandler(WorkerSendRequest<LocalSessionWorker>),
729734
}
730735
// waiting for initialize request
731736
let evt = self.event_rx.recv().await.ok_or_else(|| {
732737
WorkerQuitReason::fatal(
733-
LocalSessionError::TransportTerminated,
738+
LocalSessionWorkerError::TransportTerminated,
734739
"get initialize request",
735740
)
736741
})?;
737742
let SessionEvent::InitializeRequest { request, responder } = evt else {
738743
return Err(WorkerQuitReason::fatal(
739-
LocalSessionError::UnexpectedEvent(evt),
744+
LocalSessionWorkerError::UnexpectedEvent(evt),
740745
"get initialize request",
741746
));
742747
};
@@ -746,7 +751,7 @@ impl Worker for LocalSessionWorker {
746751
.send(Ok(send_initialize_response.message))
747752
.map_err(|_| {
748753
WorkerQuitReason::fatal(
749-
LocalSessionError::FailToSendInitializeRequest(
754+
LocalSessionWorkerError::FailToSendInitializeRequest(
750755
SessionError::SessionServiceTerminated,
751756
),
752757
"send initialize response",
@@ -765,7 +770,7 @@ impl Worker for LocalSessionWorker {
765770
if let Some(event) = event {
766771
InnerEvent::FromHttpService(event)
767772
} else {
768-
return Err(WorkerQuitReason::fatal(LocalSessionError::TransportTerminated, "waiting next session event"))
773+
return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::TransportTerminated, "waiting next session event"))
769774
}
770775
},
771776
from_handler = context.recv_from_handler() => {
@@ -775,7 +780,7 @@ impl Worker for LocalSessionWorker {
775780
return Err(WorkerQuitReason::Cancelled)
776781
}
777782
_ = keep_alive_timeout => {
778-
return Err(WorkerQuitReason::fatal(LocalSessionError::KeepAliveTimeout, "poll next session event"))
783+
return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::KeepAliveTimeout(keep_alive), "poll next session event"))
779784
}
780785
};
781786
match event {
@@ -795,7 +800,10 @@ impl Worker for LocalSessionWorker {
795800
// no need to unregister resource
796801
}
797802
};
798-
let handle_result = self.handle_server_message(message).await;
803+
let handle_result = self
804+
.handle_server_message(message)
805+
.await
806+
.map_err(LocalSessionWorkerError::FailToHandleMessage);
799807
let _ = responder.send(handle_result).inspect_err(|error| {
800808
tracing::warn!(?error, "failed to send message to http service handler");
801809
});

crates/rmcp/src/transport/worker.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ use super::{IntoTransport, Transport};
77
use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
88

99
#[derive(Debug, thiserror::Error)]
10-
pub enum WorkerQuitReason {
10+
pub enum WorkerQuitReason<E> {
1111
#[error("Join error {0}")]
1212
Join(#[from] tokio::task::JoinError),
1313
#[error("Transport fatal {error}, when {context}")]
1414
Fatal {
15-
error: Box<dyn std::error::Error + Send>,
15+
error: E,
1616
context: Cow<'static, str>,
1717
},
1818
#[error("Transport canncelled")]
@@ -23,21 +23,16 @@ pub enum WorkerQuitReason {
2323
HandlerTerminated,
2424
}
2525

26-
impl WorkerQuitReason {
27-
pub fn fatal(
28-
error: impl std::error::Error + Send + 'static,
29-
context: impl Into<Cow<'static, str>>,
30-
) -> Self {
26+
impl<E: std::error::Error + Send + 'static> WorkerQuitReason<E> {
27+
pub fn fatal(error: E, context: impl Into<Cow<'static, str>>) -> Self {
3128
Self::Fatal {
32-
error: Box::new(error),
29+
error,
3330
context: context.into(),
3431
}
3532
}
36-
pub fn fatal_context<E: std::error::Error + Send + 'static>(
37-
context: impl Into<Cow<'static, str>>,
38-
) -> impl FnOnce(E) -> Self {
33+
pub fn fatal_context(context: impl Into<Cow<'static, str>>) -> impl FnOnce(E) -> Self {
3934
|e| Self::Fatal {
40-
error: Box::new(e),
35+
error: e,
4136
context: context.into(),
4237
}
4338
}
@@ -51,7 +46,7 @@ pub trait Worker: Sized + Send + 'static {
5146
fn run(
5247
self,
5348
context: WorkerContext<Self>,
54-
) -> impl Future<Output = Result<(), WorkerQuitReason>> + Send;
49+
) -> impl Future<Output = Result<(), WorkerQuitReason<Self::Error>>> + Send;
5550
fn config(&self) -> WorkerConfig {
5651
WorkerConfig::default()
5752
}
@@ -65,7 +60,7 @@ pub struct WorkerSendRequest<W: Worker> {
6560
pub struct WorkerTransport<W: Worker> {
6661
rx: tokio::sync::mpsc::Receiver<RxJsonRpcMessage<W::Role>>,
6762
send_service: tokio::sync::mpsc::Sender<WorkerSendRequest<W>>,
68-
join_handle: Option<tokio::task::JoinHandle<Result<(), WorkerQuitReason>>>,
63+
join_handle: Option<tokio::task::JoinHandle<Result<(), WorkerQuitReason<W::Error>>>>,
6964
_drop_guard: tokio_util::sync::DropGuard,
7065
ct: CancellationToken,
7166
}
@@ -162,14 +157,16 @@ impl<W: Worker> WorkerContext<W> {
162157
pub async fn send_to_handler(
163158
&mut self,
164159
item: RxJsonRpcMessage<W::Role>,
165-
) -> Result<(), WorkerQuitReason> {
160+
) -> Result<(), WorkerQuitReason<W::Error>> {
166161
self.to_handler_tx
167162
.send(item)
168163
.await
169164
.map_err(|_| WorkerQuitReason::HandlerTerminated)
170165
}
171166

172-
pub async fn recv_from_handler(&mut self) -> Result<WorkerSendRequest<W>, WorkerQuitReason> {
167+
pub async fn recv_from_handler(
168+
&mut self,
169+
) -> Result<WorkerSendRequest<W>, WorkerQuitReason<W::Error>> {
173170
self.from_handler_rx
174171
.recv()
175172
.await

0 commit comments

Comments
 (0)