Skip to content

Commit 57e14c5

Browse files
authored
feat(transport): support streamable http server (#152)
* feat(transport): support streamable http server * refactor(transport): add common module for shared transport utilities * perf(tracing): add more log for streamable http session * fix(test): fix port conflict in test with js * fix(transport): fix id generating and session management 1. cancel when deleting 2. use `wrapping` and `saturating` correctly * chore(naming): rename sse server to streamable http server
1 parent 0847c17 commit 57e14c5

File tree

17 files changed

+1247
-20
lines changed

17 files changed

+1247
-20
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ See [examples](examples/README.md)
199199
- `transport-sse-server`: Server SSE transport
200200
- `transport-child-process`: Client stdio transport
201201
- `transport-sse`: Client sse transport
202+
- `transport-streamable-http-server` streamable http server transport
202203

203204
## Related Resources
204205

crates/rmcp/Cargo.toml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ tower-service = { version = "0.3", optional = true }
4848
# for ws transport
4949
# tokio-tungstenite ={ version = "0.26", optional = true }
5050

51-
# for sse-server transport
51+
# for http-server transport
5252
axum = { version = "0.8", features = [], optional = true }
5353
rand = { version = "0.9", optional = true }
5454
tokio-stream = { version = "0.1", optional = true }
55+
uuid = { version = "1", features = ["v4"], optional = true }
5556

5657
# macro
5758
rmcp-macros = { version = "0.1", workspace = true, optional = true }
@@ -72,6 +73,16 @@ transport-sse-server = [
7273
"dep:axum",
7374
"dep:rand",
7475
"dep:tokio-stream",
76+
"uuid",
77+
]
78+
transport-streamable-http-server = [
79+
"transport-streamable-http-server-session",
80+
"dep:axum",
81+
"uuid",
82+
]
83+
transport-streamable-http-server-session = [
84+
"transport-async-rw",
85+
"dep:tokio-stream",
7586
]
7687
# transport-ws = ["transport-io", "dep:tokio-tungstenite"]
7788
tower = ["dep:tower-service"]
@@ -102,7 +113,7 @@ path = "tests/test_with_python.rs"
102113

103114
[[test]]
104115
name = "test_with_js"
105-
required-features = ["server", "client", "transport-sse-server", "transport-child-process"]
116+
required-features = ["server", "client", "transport-sse-server", "transport-child-process", "transport-streamable-http-server"]
106117
path = "tests/test_with_js.rs"
107118

108119
[[test]]

crates/rmcp/src/model.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl<'de> Deserialize<'de> for NumberOrString {
186186

187187
pub type RequestId = NumberOrString;
188188

189-
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
189+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Hash, Eq)]
190190
#[serde(transparent)]
191191
pub struct ProgressToken(pub NumberOrString);
192192
#[derive(Debug, Clone)]

crates/rmcp/src/service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ where
591591
}
592592
};
593593

594-
tracing::debug!(?evt, "new event");
594+
tracing::trace!(?evt, "new event");
595595
match evt {
596596
// response and error
597597
Event::ToSink(m) => {
@@ -657,7 +657,7 @@ where
657657
Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
658658
id, request, ..
659659
})) => {
660-
tracing::info!(%id, ?request, "received request");
660+
tracing::debug!(%id, ?request, "received request");
661661
{
662662
let service = shared_service.clone();
663663
let sink = sink_proxy_tx.clone();
@@ -675,7 +675,7 @@ where
675675
let result = service.handle_request(request, context).await;
676676
let response = match result {
677677
Ok(result) => {
678-
tracing::info!(%id, ?result, "response message");
678+
tracing::debug!(%id, ?result, "response message");
679679
JsonRpcMessage::response(result, id)
680680
}
681681
Err(error) => {

crates/rmcp/src/transport.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ pub use auth::{AuthError, AuthorizationManager, AuthorizationSession, Authorized
7676

7777
// #[cfg(feature = "transport-ws")]
7878
// pub mod ws;
79+
#[cfg(feature = "transport-streamable-http-server-session")]
80+
pub mod streamable_http_server;
81+
#[cfg(feature = "transport-streamable-http-server")]
82+
pub use streamable_http_server::axum::StreamableHttpServer;
83+
84+
/// Common use codes
85+
pub mod common;
7986

8087
pub trait IntoTransport<R, E, A>: Send + 'static
8188
where

crates/rmcp/src/transport/common.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#[cfg(any(
2+
feature = "transport-streamable-http-server",
3+
feature = "transport-sse-server"
4+
))]
5+
pub mod axum;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
pub type SessionId = Arc<str>;
4+
5+
pub fn session_id() -> SessionId {
6+
uuid::Uuid::new_v4().to_string().into()
7+
}
8+
9+
pub const DEFAULT_AUTO_PING_INTERVAL: Duration = Duration::from_secs(15);

crates/rmcp/src/transport/sse_server.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ use crate::{
1919
RoleServer, Service,
2020
model::ClientJsonRpcMessage,
2121
service::{RxJsonRpcMessage, TxJsonRpcMessage},
22+
transport::common::axum::{DEFAULT_AUTO_PING_INTERVAL, SessionId, session_id},
2223
};
23-
type SessionId = Arc<str>;
24+
2425
type TxStore =
2526
Arc<tokio::sync::RwLock<HashMap<SessionId, tokio::sync::mpsc::Sender<ClientJsonRpcMessage>>>>;
2627
pub type TransportReceiver = ReceiverStream<RxJsonRpcMessage<RoleServer>>;
2728

28-
const DEFAULT_AUTO_PING_INTERVAL: Duration = Duration::from_secs(15);
29-
3029
#[derive(Clone)]
3130
struct App {
3231
txs: TxStore,
@@ -56,11 +55,6 @@ impl App {
5655
}
5756
}
5857

59-
fn session_id() -> SessionId {
60-
let id = format!("{:016x}", rand::random::<u128>());
61-
Arc::from(id)
62-
}
63-
6458
#[derive(Debug, serde::Deserialize)]
6559
#[serde(rename_all = "camelCase")]
6660
pub struct PostEventQuery {
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#[cfg(feature = "transport-streamable-http-server")]
2+
pub mod axum;
3+
pub mod session;

0 commit comments

Comments
 (0)