Skip to content

Commit c1e282e

Browse files
authored
fix: more friendly interface to get service error (#190)
1 parent 64995cc commit c1e282e

File tree

4 files changed

+17
-65
lines changed

4 files changed

+17
-65
lines changed

crates/rmcp/src/handler/client.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,6 @@ impl<H: ClientHandler> Service<RoleClient> for H {
5353
Ok(())
5454
}
5555

56-
fn get_peer(&self) -> Option<Peer<RoleClient>> {
57-
self.get_peer()
58-
}
59-
60-
fn set_peer(&mut self, peer: Peer<RoleClient>) {
61-
self.set_peer(peer);
62-
}
63-
6456
fn get_info(&self) -> <RoleClient as ServiceRole>::Info {
6557
self.get_info()
6658
}

crates/rmcp/src/handler/server.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,6 @@ impl<H: ServerHandler> Service<RoleServer> for H {
8989
Ok(())
9090
}
9191

92-
fn get_peer(&self) -> Option<Peer<RoleServer>> {
93-
self.get_peer()
94-
}
95-
96-
fn set_peer(&mut self, peer: Peer<RoleServer>) {
97-
self.set_peer(peer);
98-
}
99-
10092
fn get_info(&self) -> <RoleServer as ServiceRole>::Info {
10193
self.get_info()
10294
}

crates/rmcp/src/service.rs

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ use tracing::instrument;
3636
pub enum ServiceError {
3737
#[error("Mcp error: {0}")]
3838
McpError(McpError),
39-
#[error("Transport error: {0}")]
40-
Transport(std::io::Error),
39+
#[error("Transport send error: {0}")]
40+
TransportSend(Box<dyn std::error::Error + Send + Sync>),
41+
#[error("Transport closed")]
42+
TransportClosed,
4143
#[error("Unexpected response type")]
4244
UnexpectedResponse,
4345
#[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
@@ -98,8 +100,6 @@ pub trait Service<R: ServiceRole>: Send + Sync + 'static {
98100
&self,
99101
notification: R::PeerNot,
100102
) -> impl Future<Output = Result<(), McpError>> + Send + '_;
101-
fn get_peer(&self) -> Option<Peer<R>>;
102-
fn set_peer(&mut self, peer: Peer<R>);
103103
fn get_info(&self) -> R::Info;
104104
}
105105

@@ -148,14 +148,6 @@ impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
148148
DynService::handle_notification(self.as_ref(), notification)
149149
}
150150

151-
fn get_peer(&self) -> Option<Peer<R>> {
152-
DynService::get_peer(self.as_ref())
153-
}
154-
155-
fn set_peer(&mut self, peer: Peer<R>) {
156-
DynService::set_peer(self.as_mut(), peer)
157-
}
158-
159151
fn get_info(&self) -> R::Info {
160152
DynService::get_info(self.as_ref())
161153
}
@@ -168,8 +160,6 @@ pub trait DynService<R: ServiceRole>: Send + Sync {
168160
context: RequestContext<R>,
169161
) -> BoxFuture<Result<R::Resp, McpError>>;
170162
fn handle_notification(&self, notification: R::PeerNot) -> BoxFuture<Result<(), McpError>>;
171-
fn get_peer(&self) -> Option<Peer<R>>;
172-
fn set_peer(&mut self, peer: Peer<R>);
173163
fn get_info(&self) -> R::Info;
174164
}
175165

@@ -184,12 +174,6 @@ impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
184174
fn handle_notification(&self, notification: R::PeerNot) -> BoxFuture<Result<(), McpError>> {
185175
Box::pin(self.handle_notification(notification))
186176
}
187-
fn get_peer(&self) -> Option<Peer<R>> {
188-
self.get_peer()
189-
}
190-
fn set_peer(&mut self, peer: Peer<R>) {
191-
self.set_peer(peer)
192-
}
193177
fn get_info(&self) -> R::Info {
194178
self.get_info()
195179
}
@@ -255,9 +239,7 @@ impl<R: ServiceRole> RequestHandle<R> {
255239
pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
256240
if let Some(timeout) = self.options.timeout {
257241
let timeout_result = tokio::time::timeout(timeout, async move {
258-
self.rx
259-
.await
260-
.map_err(|_e| ServiceError::Transport(std::io::Error::other("disconnected")))?
242+
self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
261243
})
262244
.await;
263245
match timeout_result {
@@ -278,9 +260,7 @@ impl<R: ServiceRole> RequestHandle<R> {
278260
}
279261
}
280262
} else {
281-
self.rx
282-
.await
283-
.map_err(|_e| ServiceError::Transport(std::io::Error::other("disconnected")))?
263+
self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
284264
}
285265
}
286266

@@ -373,12 +353,8 @@ impl<R: ServiceRole> Peer<R> {
373353
responder,
374354
})
375355
.await
376-
.map_err(|_m| {
377-
ServiceError::Transport(std::io::Error::other("disconnected: receiver dropped"))
378-
})?;
379-
receiver.await.map_err(|_e| {
380-
ServiceError::Transport(std::io::Error::other("disconnected: responder dropped"))
381-
})?
356+
.map_err(|_m| ServiceError::TransportClosed)?;
357+
receiver.await.map_err(|_e| ServiceError::TransportClosed)?
382358
}
383359
pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
384360
self.send_request_with_option(request, PeerRequestOptions::no_options())
@@ -416,7 +392,7 @@ impl<R: ServiceRole> Peer<R> {
416392
responder,
417393
})
418394
.await
419-
.map_err(|_m| ServiceError::Transport(std::io::Error::other("disconnected")))?;
395+
.map_err(|_m| ServiceError::TransportClosed)?;
420396
Ok(RequestHandle {
421397
id,
422398
rx: receiver,
@@ -428,6 +404,10 @@ impl<R: ServiceRole> Peer<R> {
428404
pub fn peer_info(&self) -> &R::PeerInfo {
429405
&self.info
430406
}
407+
408+
pub fn is_transport_closed(&self) -> bool {
409+
self.tx.is_closed()
410+
}
431411
}
432412

433413
#[derive(Debug)]
@@ -518,7 +498,7 @@ where
518498

519499
#[instrument(skip_all)]
520500
async fn serve_inner<R, S, T, E, A>(
521-
mut service: S,
501+
service: S,
522502
transport: T,
523503
peer: Peer<R>,
524504
mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
@@ -540,7 +520,6 @@ where
540520
tracing::info!(?peer_info, "Service initialized as server");
541521
}
542522

543-
service.set_peer(peer.clone());
544523
let mut local_responder_pool =
545524
HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
546525
let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
@@ -631,8 +610,7 @@ where
631610
Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
632611
if let Err(e) = result {
633612
if let Some(responder) = local_responder_pool.remove(&id) {
634-
let _ = responder
635-
.send(Err(ServiceError::Transport(std::io::Error::other(e))));
613+
let _ = responder.send(Err(ServiceError::TransportSend(Box::new(e))));
636614
}
637615
}
638616
}
@@ -642,7 +620,7 @@ where
642620
cancellation_param,
643621
}) => {
644622
let response = if let Err(e) = result {
645-
Err(ServiceError::Transport(std::io::Error::other(e)))
623+
Err(ServiceError::TransportSend(Box::new(e)))
646624
} else {
647625
Ok(())
648626
};

crates/rmcp/src/service/tower.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ use std::{future::poll_fn, marker::PhantomData};
22

33
use tower_service::Service as TowerService;
44

5-
use crate::service::{Peer, RequestContext, Service, ServiceRole};
5+
use crate::service::{RequestContext, Service, ServiceRole};
66

77
pub struct TowerHandler<S, R: ServiceRole> {
88
pub service: S,
99
pub info: R::Info,
10-
pub peer: Option<Peer<R>>,
1110
role: PhantomData<R>,
1211
}
1312

@@ -17,7 +16,6 @@ impl<S, R: ServiceRole> TowerHandler<S, R> {
1716
service,
1817
role: PhantomData,
1918
info,
20-
peer: None,
2119
}
2220
}
2321
}
@@ -48,14 +46,6 @@ where
4846
std::future::ready(Ok(()))
4947
}
5048

51-
fn get_peer(&self) -> Option<Peer<R>> {
52-
self.peer.clone()
53-
}
54-
55-
fn set_peer(&mut self, peer: Peer<R>) {
56-
self.peer = Some(peer);
57-
}
58-
5949
fn get_info(&self) -> R::Info {
6050
self.info.clone()
6151
}

0 commit comments

Comments
 (0)