Skip to content

Commit 145b5de

Browse files
authored
fix(notification): fix wrongly error report in notification (#70)
1 parent b91fe54 commit 145b5de

File tree

4 files changed

+121
-10
lines changed

4 files changed

+121
-10
lines changed

crates/rmcp/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,10 @@ path = "tests/test_with_python.rs"
9696
[[test]]
9797
name = "test_with_js"
9898
required-features = ["server", "client", "transport-sse-server", "transport-child-process"]
99-
path = "tests/test_with_js.rs"
99+
path = "tests/test_with_js.rs"
100+
101+
[[test]]
102+
name = "test_notification"
103+
required-features = ["server", "client"]
104+
path = "tests/test_notification.rs"
105+

crates/rmcp/src/handler/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl<H: ServerHandler> Service<RoleServer> for H {
103103
}
104104

105105
#[allow(unused_variables)]
106-
pub trait ServerHandler: Sized + Clone + Send + Sync + 'static {
106+
pub trait ServerHandler: Sized + Send + Sync + 'static {
107107
fn ping(
108108
&self,
109109
context: RequestContext<RoleServer>,

crates/rmcp/src/service.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,12 @@ impl<R: ServiceRole> Peer<R> {
342342
self.tx
343343
.send(PeerSinkMessage::Notification(notification, responder))
344344
.await
345-
.map_err(|_m| ServiceError::Transport(std::io::Error::other("disconnected")))?;
346-
receiver
347-
.await
348-
.map_err(|_e| ServiceError::Transport(std::io::Error::other("disconnected")))?
345+
.map_err(|_m| {
346+
ServiceError::Transport(std::io::Error::other("disconnected: receiver dropped"))
347+
})?;
348+
receiver.await.map_err(|_e| {
349+
ServiceError::Transport(std::io::Error::other("disconnected: responder dropped"))
350+
})?
349351
}
350352
pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
351353
self.send_cancellable_request(request, PeerRequestOptions::no_options())
@@ -578,10 +580,12 @@ where
578580
let send_result = sink
579581
.send(Message::Notification(notification).into_json_rpc_message())
580582
.await;
581-
if let Err(e) = send_result {
582-
let _ =
583-
responder.send(Err(ServiceError::Transport(std::io::Error::other(e))));
584-
}
583+
let response = if let Err(e) = send_result {
584+
Err(ServiceError::Transport(std::io::Error::other(e)))
585+
} else {
586+
Ok(())
587+
};
588+
let _ = responder.send(response);
585589
if let Some(param) = cancellation_param {
586590
if let Some(responder) = local_responder_pool.remove(&param.request_id) {
587591
tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::sync::Arc;
2+
3+
use rmcp::{
4+
ClientHandler, Peer, RoleClient, ServerHandler, ServiceExt,
5+
model::{
6+
ResourceUpdatedNotificationParam, ServerCapabilities, ServerInfo, SubscribeRequestParam,
7+
},
8+
};
9+
use tokio::sync::Notify;
10+
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
11+
12+
pub struct Server {}
13+
14+
impl ServerHandler for Server {
15+
fn get_info(&self) -> ServerInfo {
16+
ServerInfo {
17+
capabilities: ServerCapabilities::builder()
18+
.enable_resources()
19+
.enable_resources_subscribe()
20+
.enable_resources_list_changed()
21+
.build(),
22+
..Default::default()
23+
}
24+
}
25+
26+
async fn subscribe(
27+
&self,
28+
request: rmcp::model::SubscribeRequestParam,
29+
context: rmcp::service::RequestContext<rmcp::RoleServer>,
30+
) -> Result<(), rmcp::Error> {
31+
let uri = request.uri;
32+
let peer = context.peer;
33+
34+
tokio::spawn(async move {
35+
let span = tracing::info_span!("subscribe", uri = %uri);
36+
let _enter = span.enter();
37+
38+
if let Err(e) = peer
39+
.notify_resource_updated(ResourceUpdatedNotificationParam { uri: uri.clone() })
40+
.await
41+
{
42+
panic!("Failed to send notification: {}", e);
43+
}
44+
});
45+
46+
Ok(())
47+
}
48+
}
49+
50+
pub struct Client {
51+
receive_signal: Arc<Notify>,
52+
peer: Option<Peer<RoleClient>>,
53+
}
54+
55+
impl ClientHandler for Client {
56+
async fn on_resource_updated(&self, params: rmcp::model::ResourceUpdatedNotificationParam) {
57+
let uri = params.uri;
58+
tracing::info!("Resource updated: {}", uri);
59+
self.receive_signal.notify_one();
60+
}
61+
62+
fn set_peer(&mut self, peer: Peer<RoleClient>) {
63+
self.peer.replace(peer);
64+
}
65+
66+
fn get_peer(&self) -> Option<Peer<RoleClient>> {
67+
self.peer.clone()
68+
}
69+
}
70+
71+
#[tokio::test]
72+
async fn test_server_notification() -> anyhow::Result<()> {
73+
let _ = tracing_subscriber::registry()
74+
.with(
75+
tracing_subscriber::EnvFilter::try_from_default_env()
76+
.unwrap_or_else(|_| "debug".to_string().into()),
77+
)
78+
.with(tracing_subscriber::fmt::layer())
79+
.try_init();
80+
let (server_transport, client_transport) = tokio::io::duplex(4096);
81+
tokio::spawn(async move {
82+
let server = Server {}.serve(server_transport).await?;
83+
server.waiting().await?;
84+
anyhow::Ok(())
85+
});
86+
let receive_signal = Arc::new(Notify::new());
87+
let client = Client {
88+
peer: Default::default(),
89+
receive_signal: receive_signal.clone(),
90+
}
91+
.serve(client_transport)
92+
.await?;
93+
client
94+
.subscribe(SubscribeRequestParam {
95+
uri: "test://test-resource".to_owned(),
96+
})
97+
.await?;
98+
receive_signal.notified().await;
99+
client.cancel().await?;
100+
Ok(())
101+
}

0 commit comments

Comments
 (0)