Skip to content

Commit b89b582

Browse files
committed
test: add tests for priming behavior on stream start and close
1 parent f9ea458 commit b89b582

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
use std::time::Duration;
2+
3+
use rmcp::transport::streamable_http_server::{
4+
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
5+
};
6+
use tokio_util::sync::CancellationToken;
7+
8+
mod common;
9+
use common::calculator::Calculator;
10+
11+
#[tokio::test]
12+
async fn test_priming_on_stream_start() -> anyhow::Result<()> {
13+
let ct = CancellationToken::new();
14+
15+
// stateful_mode: true automatically enables priming with DEFAULT_RETRY_INTERVAL (3 seconds)
16+
let service: StreamableHttpService<Calculator, LocalSessionManager> =
17+
StreamableHttpService::new(
18+
|| Ok(Calculator::new()),
19+
Default::default(),
20+
StreamableHttpServerConfig {
21+
stateful_mode: true,
22+
sse_keep_alive: None,
23+
cancellation_token: ct.child_token(),
24+
..Default::default()
25+
},
26+
);
27+
28+
let router = axum::Router::new().nest_service("/mcp", service);
29+
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
30+
let addr = tcp_listener.local_addr()?;
31+
32+
let handle = tokio::spawn({
33+
let ct = ct.clone();
34+
async move {
35+
let _ = axum::serve(tcp_listener, router)
36+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
37+
.await;
38+
}
39+
});
40+
41+
// Send initialize request
42+
let client = reqwest::Client::new();
43+
let response = client
44+
.post(format!("http://{addr}/mcp"))
45+
.header("Content-Type", "application/json")
46+
.header("Accept", "application/json, text/event-stream")
47+
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
48+
.send()
49+
.await?;
50+
51+
assert_eq!(response.status(), 200);
52+
53+
let body = response.text().await?;
54+
55+
// Split SSE events by double newline
56+
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
57+
assert!(events.len() >= 2);
58+
59+
// Verify priming event (first event)
60+
let priming_event = events[0];
61+
assert!(priming_event.contains("id: 0"));
62+
assert!(priming_event.contains("retry: 3000"));
63+
assert!(priming_event.contains("data:"));
64+
65+
// Verify initialize response (second event)
66+
let response_event = events[1];
67+
assert!(response_event.contains(r#""jsonrpc":"2.0""#));
68+
assert!(response_event.contains(r#""id":1"#));
69+
70+
ct.cancel();
71+
handle.await?;
72+
73+
Ok(())
74+
}
75+
76+
#[tokio::test]
77+
async fn test_priming_on_stream_close() -> anyhow::Result<()> {
78+
use rmcp::transport::streamable_http_server::session::SessionId;
79+
use std::sync::Arc;
80+
81+
let ct = CancellationToken::new();
82+
let session_manager = Arc::new(LocalSessionManager::default());
83+
84+
// stateful_mode: true automatically enables priming with DEFAULT_RETRY_INTERVAL (3 seconds)
85+
let service = StreamableHttpService::new(
86+
|| Ok(Calculator::new()),
87+
session_manager.clone(),
88+
StreamableHttpServerConfig {
89+
stateful_mode: true,
90+
sse_keep_alive: None,
91+
cancellation_token: ct.child_token(),
92+
..Default::default()
93+
},
94+
);
95+
96+
let router = axum::Router::new().nest_service("/mcp", service);
97+
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
98+
let addr = tcp_listener.local_addr()?;
99+
100+
let handle = tokio::spawn({
101+
let ct = ct.clone();
102+
async move {
103+
let _ = axum::serve(tcp_listener, router)
104+
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
105+
.await;
106+
}
107+
});
108+
109+
// Send initialize request to create a session
110+
let client = reqwest::Client::new();
111+
let response = client
112+
.post(format!("http://{addr}/mcp"))
113+
.header("Content-Type", "application/json")
114+
.header("Accept", "application/json, text/event-stream")
115+
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
116+
.send()
117+
.await?;
118+
119+
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
120+
121+
// Open a standalone GET stream (send() returns when headers are received)
122+
let response = client
123+
.get(format!("http://{addr}/mcp"))
124+
.header("Accept", "text/event-stream")
125+
.header("mcp-session-id", session_id.to_string())
126+
.send()
127+
.await?;
128+
129+
assert_eq!(response.status(), 200);
130+
131+
// Spawn a task to read the response body (blocks until stream closes)
132+
let read_task = tokio::spawn(async move { response.text().await.unwrap() });
133+
134+
// Close the standalone stream with a 5-second retry hint
135+
let sessions = session_manager.sessions.read().await;
136+
let session = sessions.get(&session_id).unwrap();
137+
session
138+
.close_standalone_sse_stream(Some(Duration::from_secs(5)))
139+
.await?;
140+
drop(sessions);
141+
142+
// Wait for the read task to complete and verify the response
143+
let body = read_task.await?;
144+
145+
// Verify the stream received two priming events:
146+
// 1. At stream start (retry: 3000)
147+
// 2. Before close (retry: 5000)
148+
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
149+
assert_eq!(events.len(), 2);
150+
151+
// First event: priming at stream start
152+
let start_priming = events[0];
153+
assert!(start_priming.contains("id:"));
154+
assert!(start_priming.contains("retry: 3000"));
155+
assert!(start_priming.contains("data:"));
156+
157+
// Second event: priming before close
158+
let close_priming = events[1];
159+
assert!(close_priming.contains("id:"));
160+
assert!(close_priming.contains("retry: 5000"));
161+
assert!(close_priming.contains("data:"));
162+
163+
ct.cancel();
164+
handle.await?;
165+
166+
Ok(())
167+
}

crates/rmcp/tests/test_with_js.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ async fn test_with_js_streamable_http_client() -> anyhow::Result<()> {
7272
stateful_mode: true,
7373
sse_keep_alive: None,
7474
cancellation_token: ct.child_token(),
75+
..Default::default()
7576
},
7677
);
7778
let router = axum::Router::new().nest_service("/mcp", service);

0 commit comments

Comments
 (0)