Skip to content

Commit f2cb7f5

Browse files
committed
feat: implement SEP-1699 SSE polling via server-side disconnect
1 parent 61f7b7b commit f2cb7f5

File tree

5 files changed

+395
-22
lines changed

5 files changed

+395
-22
lines changed

crates/rmcp/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ server-side-http = [
9797
"dep:http-body-util",
9898
"dep:bytes",
9999
"dep:sse-stream",
100+
"dep:axum",
100101
"tower",
101102
]
102103

@@ -201,4 +202,9 @@ path = "tests/test_elicitation.rs"
201202
[[test]]
202203
name = "test_task"
203204
required-features = ["server", "client", "macros"]
204-
path = "tests/test_task.rs"
205+
path = "tests/test_task.rs"
206+
207+
[[test]]
208+
name = "test_streamable_http_priming"
209+
required-features = ["server", "client", "transport-streamable-http-server", "reqwest"]
210+
path = "tests/test_streamable_http_priming.rs"

crates/rmcp/src/transport/common/server_side_http.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,14 @@ impl sse_stream::Timer for TokioTimer {
5959

6060
#[derive(Debug, Clone)]
6161
pub struct ServerSseMessage {
62+
/// The event ID for this message. When set, clients can use this ID
63+
/// with the `Last-Event-ID` header to resume the stream from this point.
6264
pub event_id: Option<String>,
63-
pub message: Arc<ServerJsonRpcMessage>,
65+
/// The JSON-RPC message content. For priming events, set this to `None`.
66+
pub message: Option<Arc<ServerJsonRpcMessage>>,
67+
/// The retry interval hint for clients. Clients should wait this duration
68+
/// before attempting to reconnect. This maps to the SSE `retry:` field.
69+
pub retry: Option<Duration>,
6470
}
6571

6672
pub(crate) fn sse_stream_response(
@@ -71,9 +77,20 @@ pub(crate) fn sse_stream_response(
7177
use futures::StreamExt;
7278
let stream = stream
7379
.map(|message| {
74-
let data = serde_json::to_string(&message.message).expect("valid message");
75-
let mut sse = Sse::default().data(data);
80+
let mut sse = if let Some(ref msg) = message.message {
81+
let data = serde_json::to_string(msg.as_ref()).expect("valid message");
82+
Sse::default().data(data)
83+
} else {
84+
// Priming event: empty data per SSE spec (just "data:\n")
85+
Sse::default().data("")
86+
};
87+
7688
sse.id = message.event_id;
89+
90+
if let Some(retry) = message.retry {
91+
sse.retry = Some(retry.as_millis() as u64);
92+
}
93+
7794
Result::<Sse, Infallible>::Ok(sse)
7895
})
7996
.take_until(async move { ct.cancelled().await });

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl CachedTx {
201201
Self::new(tx, None)
202202
}
203203

204-
async fn send(&mut self, message: ServerJsonRpcMessage) {
204+
fn next_event_id(&self) -> EventId {
205205
let index = self.cache.back().map_or(0, |m| {
206206
m.event_id
207207
.as_deref()
@@ -211,14 +211,33 @@ impl CachedTx {
211211
.index
212212
+ 1
213213
});
214-
let event_id = EventId {
214+
EventId {
215215
http_request_id: self.http_request_id,
216216
index,
217+
}
218+
}
219+
220+
async fn send(&mut self, message: ServerJsonRpcMessage) {
221+
let event_id = self.next_event_id();
222+
let message = ServerSseMessage {
223+
event_id: Some(event_id.to_string()),
224+
message: Some(Arc::new(message)),
225+
retry: None,
217226
};
227+
self.cache_and_send(message).await;
228+
}
229+
230+
async fn send_priming(&mut self, retry: Duration) {
231+
let event_id = self.next_event_id();
218232
let message = ServerSseMessage {
219233
event_id: Some(event_id.to_string()),
220-
message: Arc::new(message),
234+
message: None,
235+
retry: Some(retry),
221236
};
237+
self.cache_and_send(message).await;
238+
}
239+
240+
async fn cache_and_send(&mut self, message: ServerSseMessage) {
222241
if self.cache.len() >= self.capacity {
223242
self.cache.pop_front();
224243
self.cache.push_back(message.clone());
@@ -525,7 +544,53 @@ impl LocalSessionWorker {
525544
}
526545
}
527546
}
547+
548+
async fn close_sse_stream(
549+
&mut self,
550+
http_request_id: Option<HttpRequestId>,
551+
retry_interval: Option<Duration>,
552+
) -> Result<(), SessionError> {
553+
match http_request_id {
554+
// Close a request-wise stream
555+
Some(id) => {
556+
let request_wise = self
557+
.tx_router
558+
.get_mut(&id)
559+
.ok_or(SessionError::ChannelClosed(Some(id)))?;
560+
561+
// Send priming event if retry interval is specified
562+
if let Some(interval) = retry_interval {
563+
request_wise.tx.send_priming(interval).await;
564+
}
565+
566+
// Close the stream by dropping the sender
567+
let (tx, _rx) = tokio::sync::mpsc::channel(1);
568+
request_wise.tx.tx = tx;
569+
570+
tracing::debug!(
571+
http_request_id = id,
572+
"closed SSE stream for server-initiated disconnection"
573+
);
574+
Ok(())
575+
}
576+
// Close the standalone (common) stream
577+
None => {
578+
// Send priming event if retry interval is specified
579+
if let Some(interval) = retry_interval {
580+
self.common.send_priming(interval).await;
581+
}
582+
583+
// Close the stream by dropping the sender
584+
let (tx, _rx) = tokio::sync::mpsc::channel(1);
585+
self.common.tx = tx;
586+
587+
tracing::debug!("closed standalone SSE stream for server-initiated disconnection");
588+
Ok(())
589+
}
590+
}
591+
}
528592
}
593+
529594
#[derive(Debug)]
530595
pub enum SessionEvent {
531596
ClientMessage {
@@ -548,6 +613,13 @@ pub enum SessionEvent {
548613
responder: oneshot::Sender<Result<ServerJsonRpcMessage, SessionError>>,
549614
},
550615
Close,
616+
CloseSseStream {
617+
/// The HTTP request ID to close. If `None`, closes the standalone (common) stream.
618+
http_request_id: Option<HttpRequestId>,
619+
/// Optional retry interval. If provided, a priming event is sent before closing.
620+
retry_interval: Option<Duration>,
621+
responder: oneshot::Sender<Result<(), SessionError>>,
622+
},
551623
}
552624

553625
#[derive(Debug, Clone)]
@@ -683,6 +755,60 @@ impl LocalSessionHandle {
683755
rx.await
684756
.map_err(|_| SessionError::SessionServiceTerminated)?
685757
}
758+
759+
/// Close an SSE stream for a specific request.
760+
///
761+
/// This closes the SSE connection for a POST request stream, but keeps the session
762+
/// and message cache active. Clients can reconnect using the `Last-Event-ID` header
763+
/// via a GET request to resume receiving messages.
764+
///
765+
/// # Arguments
766+
///
767+
/// * `http_request_id` - The HTTP request ID of the stream to close
768+
/// * `retry_interval` - Optional retry interval. If provided, a priming event is sent
769+
pub async fn close_sse_stream(
770+
&self,
771+
http_request_id: HttpRequestId,
772+
retry_interval: Option<Duration>,
773+
) -> Result<(), SessionError> {
774+
let (tx, rx) = tokio::sync::oneshot::channel();
775+
self.event_tx
776+
.send(SessionEvent::CloseSseStream {
777+
http_request_id: Some(http_request_id),
778+
retry_interval,
779+
responder: tx,
780+
})
781+
.await
782+
.map_err(|_| SessionError::SessionServiceTerminated)?;
783+
rx.await
784+
.map_err(|_| SessionError::SessionServiceTerminated)?
785+
}
786+
787+
/// Close the standalone SSE stream.
788+
///
789+
/// This closes the standalone SSE connection (established via GET request),
790+
/// but keeps the session and message cache active. Clients can reconnect using
791+
/// the `Last-Event-ID` header via a GET request to resume receiving messages.
792+
///
793+
/// # Arguments
794+
///
795+
/// * `retry_interval` - Optional retry interval. If provided, a priming event is sent
796+
pub async fn close_standalone_sse_stream(
797+
&self,
798+
retry_interval: Option<Duration>,
799+
) -> Result<(), SessionError> {
800+
let (tx, rx) = tokio::sync::oneshot::channel();
801+
self.event_tx
802+
.send(SessionEvent::CloseSseStream {
803+
http_request_id: None,
804+
retry_interval,
805+
responder: tx,
806+
})
807+
.await
808+
.map_err(|_| SessionError::SessionServiceTerminated)?;
809+
rx.await
810+
.map_err(|_| SessionError::SessionServiceTerminated)?
811+
}
686812
}
687813

688814
pub type SessionTransport = WorkerTransport<LocalSessionWorker>;
@@ -848,6 +974,15 @@ impl Worker for LocalSessionWorker {
848974
InnerEvent::FromHttpService(SessionEvent::Close) => {
849975
return Err(WorkerQuitReason::TransportClosed);
850976
}
977+
InnerEvent::FromHttpService(SessionEvent::CloseSseStream {
978+
http_request_id,
979+
retry_interval,
980+
responder,
981+
}) => {
982+
let handle_result =
983+
self.close_sse_stream(http_request_id, retry_interval).await;
984+
let _ = responder.send(handle_result);
985+
}
851986
_ => {
852987
// ignore
853988
}

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use crate::{
3232
pub struct StreamableHttpServerConfig {
3333
/// The ping message duration for SSE connections.
3434
pub sse_keep_alive: Option<Duration>,
35+
/// The retry interval for SSE priming events.
36+
pub sse_retry: Option<Duration>,
3537
/// If true, the server will create a session for each request and keep it alive.
38+
/// When enabled, SSE priming events are sent to enable client reconnection.
3639
pub stateful_mode: bool,
3740
/// Cancellation token for the Streamable HTTP server.
3841
///
@@ -45,6 +48,7 @@ impl Default for StreamableHttpServerConfig {
4548
fn default() -> Self {
4649
Self {
4750
sse_keep_alive: Some(Duration::from_secs(15)),
51+
sse_retry: Some(Duration::from_secs(3)),
4852
stateful_mode: true,
4953
cancellation_token: CancellationToken::new(),
5054
}
@@ -216,6 +220,7 @@ where
216220
.resume(&session_id, last_event_id)
217221
.await
218222
.map_err(internal_error_response("resume session"))?;
223+
// Resume doesn't need priming - client already has the event ID
219224
Ok(sse_stream_response(
220225
stream,
221226
self.config.sse_keep_alive,
@@ -228,6 +233,19 @@ where
228233
.create_standalone_stream(&session_id)
229234
.await
230235
.map_err(internal_error_response("create standalone stream"))?;
236+
// Prepend priming event if sse_retry configured
237+
let stream = if let Some(retry) = self.config.sse_retry {
238+
let priming = ServerSseMessage {
239+
event_id: Some("0".into()),
240+
message: None,
241+
retry: Some(retry),
242+
};
243+
futures::stream::once(async move { priming })
244+
.chain(stream)
245+
.left_stream()
246+
} else {
247+
stream.right_stream()
248+
};
231249
Ok(sse_stream_response(
232250
stream,
233251
self.config.sse_keep_alive,
@@ -322,6 +340,19 @@ where
322340
.create_stream(&session_id, message)
323341
.await
324342
.map_err(internal_error_response("get session"))?;
343+
// Prepend priming event if sse_retry configured
344+
let stream = if let Some(retry) = self.config.sse_retry {
345+
let priming = ServerSseMessage {
346+
event_id: Some("0".into()),
347+
message: None,
348+
retry: Some(retry),
349+
};
350+
futures::stream::once(async move { priming })
351+
.chain(stream)
352+
.left_stream()
353+
} else {
354+
stream.right_stream()
355+
};
325356
Ok(sse_stream_response(
326357
stream,
327358
self.config.sse_keep_alive,
@@ -389,15 +420,28 @@ where
389420
.initialize_session(&session_id, message)
390421
.await
391422
.map_err(internal_error_response("create stream"))?;
423+
let stream = futures::stream::once(async move {
424+
ServerSseMessage {
425+
event_id: None,
426+
message: Some(Arc::new(response)),
427+
retry: None,
428+
}
429+
});
430+
// Prepend priming event if sse_retry configured
431+
let stream = if let Some(retry) = self.config.sse_retry {
432+
let priming = ServerSseMessage {
433+
event_id: Some("0".into()),
434+
message: None,
435+
retry: Some(retry),
436+
};
437+
futures::stream::once(async move { priming })
438+
.chain(stream)
439+
.left_stream()
440+
} else {
441+
stream.right_stream()
442+
};
392443
let mut response = sse_stream_response(
393-
futures::stream::once({
394-
async move {
395-
ServerSseMessage {
396-
event_id: None,
397-
message: response.into(),
398-
}
399-
}
400-
}),
444+
stream,
401445
self.config.sse_keep_alive,
402446
self.config.cancellation_token.child_token(),
403447
);
@@ -424,14 +468,17 @@ where
424468
// on service created
425469
let _ = service.waiting().await;
426470
});
471+
// Stateless mode: no priming (no session to resume)
472+
let stream = ReceiverStream::new(receiver).map(|message| {
473+
tracing::info!(?message);
474+
ServerSseMessage {
475+
event_id: None,
476+
message: Some(Arc::new(message)),
477+
retry: None,
478+
}
479+
});
427480
Ok(sse_stream_response(
428-
ReceiverStream::new(receiver).map(|message| {
429-
tracing::info!(?message);
430-
ServerSseMessage {
431-
event_id: None,
432-
message: message.into(),
433-
}
434-
}),
481+
stream,
435482
self.config.sse_keep_alive,
436483
self.config.cancellation_token.child_token(),
437484
))

0 commit comments

Comments
 (0)