Skip to content

Commit b19a70f

Browse files
committed
feat(rmcp): enhance transport features by decoupling reqwest
- Added `transport-sse-client-reqwest` and `transport-streamable-http-client-reqwest` features for reqwest-based implementations. - Updated documentation to reflect new client-agnostic transport capabilities. - Modified error handling in SSE transport to use `String` for content type. - Updated dependencies to include new features in examples.
1 parent 4ea231a commit b19a70f

File tree

10 files changed

+185
-23
lines changed

10 files changed

+185
-23
lines changed

crates/rmcp/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,14 @@ server-side-http = [
102102
client-side-sse = ["dep:sse-stream", "dep:http"]
103103

104104
transport-sse-client = ["client-side-sse", "transport-worker"]
105+
transport-sse-client-reqwest = ["transport-sse-client", "reqwest"]
105106

106107
transport-worker = ["dep:tokio-stream"]
107108

108109

109110
# Streamable HTTP client
110-
transport-streamable-http-client = ["client-side-sse", "transport-worker", "reqwest"]
111+
transport-streamable-http-client = ["client-side-sse", "transport-worker"]
112+
transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"]
111113

112114

113115
transport-async-rw = ["tokio/io-util", "tokio-util/codec"]

crates/rmcp/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ RMCP uses feature flags to control which components are included:
199199
- `transport-async-rw`: Async read/write support
200200
- `transport-io`: I/O stream support
201201
- `transport-child-process`: Child process support
202-
- `transport-sse-client` / `transport-sse-server`: SSE support
203-
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming
202+
- `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic)
203+
- `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client
204+
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details)
205+
- `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client
204206
- `auth`: OAuth2 authentication support
205207
- `schemars`: JSON Schema generation (for tool definitions)
206208

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#[cfg(feature = "transport-streamable-http-client")]
2-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))]
1+
#[cfg(feature = "transport-streamable-http-client-reqwest")]
2+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))]
33
mod streamable_http_client;
44

5-
#[cfg(feature = "transport-sse-client")]
6-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))]
5+
#[cfg(feature = "transport-sse-client-reqwest")]
6+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))]
77
mod sse_client;

crates/rmcp/src/transport/common/reqwest/sse_client.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ use crate::transport::{
1111
sse_client::{SseClient, SseClientConfig, SseTransportError},
1212
};
1313

14+
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
15+
fn from(e: reqwest::Error) -> Self {
16+
SseTransportError::Client(e)
17+
}
18+
}
19+
1420
impl SseClient for reqwest::Client {
1521
type Error = reqwest::Error;
1622

@@ -55,7 +61,9 @@ impl SseClient for reqwest::Client {
5561
match response.headers().get(reqwest::header::CONTENT_TYPE) {
5662
Some(ct) => {
5763
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
58-
return Err(SseTransportError::UnexpectedContentType(Some(ct.clone())));
64+
return Err(SseTransportError::UnexpectedContentType(Some(
65+
String::from_utf8_lossy(ct.as_bytes()).to_string(),
66+
)));
5967
}
6068
}
6169
None => {
@@ -68,6 +76,30 @@ impl SseClient for reqwest::Client {
6876
}
6977

7078
impl SseClientTransport<reqwest::Client> {
79+
/// Creates a new transport using reqwest with the specified SSE endpoint.
80+
///
81+
/// This is a convenience method that creates a transport using the default
82+
/// reqwest client. This method is only available when the
83+
/// `transport-sse-client-reqwest` feature is enabled.
84+
///
85+
/// # Arguments
86+
///
87+
/// * `uri` - The SSE endpoint to connect to
88+
///
89+
/// # Example
90+
///
91+
/// ```rust
92+
/// use rmcp::transport::SseClientTransport;
93+
///
94+
/// // Enable the reqwest feature in Cargo.toml:
95+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
96+
///
97+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
98+
/// ```
99+
///
100+
/// # Feature requirement
101+
///
102+
/// This method requires the `transport-sse-client-reqwest` feature.
71103
pub async fn start(
72104
uri: impl Into<Arc<str>>,
73105
) -> Result<Self, SseTransportError<reqwest::Error>> {

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,30 @@ impl StreamableHttpClient for reqwest::Client {
131131
}
132132

133133
impl StreamableHttpClientTransport<reqwest::Client> {
134+
/// Creates a new transport using reqwest with the specified URI.
135+
///
136+
/// This is a convenience method that creates a transport using the default
137+
/// reqwest client. This method is only available when the
138+
/// `transport-streamable-http-client-reqwest` feature is enabled.
139+
///
140+
/// # Arguments
141+
///
142+
/// * `uri` - The server URI to connect to
143+
///
144+
/// # Example
145+
///
146+
/// ```rust
147+
/// use rmcp::transport::StreamableHttpClientTransport;
148+
///
149+
/// // Enable the reqwest feature in Cargo.toml:
150+
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
151+
///
152+
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
153+
/// ```
154+
///
155+
/// # Feature requirement
156+
///
157+
/// This method requires the `transport-streamable-http-client-reqwest` feature.
134158
pub fn from_uri(uri: impl Into<Arc<str>>) -> Self {
135159
StreamableHttpClientTransport::with_client(
136160
reqwest::Client::default(),

crates/rmcp/src/transport/sse_client.rs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc};
33

44
use futures::{StreamExt, future::BoxFuture};
55
use http::Uri;
6-
use reqwest::header::HeaderValue;
6+
77
use sse_stream::Error as SseError;
88
use thiserror::Error;
99

@@ -28,7 +28,7 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
2828
#[error("unexpected end of stream")]
2929
UnexpectedEndOfStream,
3030
#[error("Unexpected content type: {0:?}")]
31-
UnexpectedContentType(Option<HeaderValue>),
31+
UnexpectedContentType(Option<String>),
3232
#[cfg(feature = "auth")]
3333
#[cfg_attr(docsrs, doc(cfg(feature = "auth")))]
3434
#[error("Auth error: {0}")]
@@ -39,12 +39,6 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
3939
InvalidUriParts(#[from] http::uri::InvalidUriParts),
4040
}
4141

42-
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
43-
fn from(e: reqwest::Error) -> Self {
44-
SseTransportError::Client(e)
45-
}
46-
}
47-
4842
pub trait SseClient: Clone + Send + Sync + 'static {
4943
type Error: std::error::Error + Send + Sync + 'static;
5044
fn post_message(
@@ -77,6 +71,48 @@ impl<C: SseClient> SseStreamReconnect for SseClientReconnect<C> {
7771
}
7872
}
7973
type ServerMessageStream<C> = Pin<Box<SseAutoReconnectStream<SseClientReconnect<C>>>>;
74+
75+
/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events.
76+
///
77+
/// This transport allows you to choose your preferred HTTP client implementation
78+
/// by implementing the [`SseClient`] trait. The transport handles SSE streaming
79+
/// and automatic reconnection.
80+
///
81+
/// # Usage
82+
///
83+
/// ## Using reqwest (most common)
84+
///
85+
/// ```rust
86+
/// use rmcp::transport::SseClientTransport;
87+
///
88+
/// // Enable the reqwest feature in Cargo.toml:
89+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
90+
///
91+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
92+
/// ```
93+
///
94+
/// ## Using a custom HTTP client
95+
///
96+
/// ```rust
97+
/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig};
98+
///
99+
/// struct MyHttpClient;
100+
/// impl SseClient for MyHttpClient {
101+
/// type Error = MyError;
102+
/// // ... implement the trait methods
103+
/// }
104+
///
105+
/// let config = SseClientConfig {
106+
/// sse_endpoint: "http://localhost:8000/sse".into(),
107+
/// ..Default::default()
108+
/// };
109+
/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?;
110+
/// ```
111+
///
112+
/// # Feature Flags
113+
///
114+
/// - `transport-sse-client`: Base feature providing the generic transport infrastructure
115+
/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods
80116
pub struct SseClientTransport<C: SseClient> {
81117
client: C,
82118
config: SseClientConfig,

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,77 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
478478
}
479479
}
480480

481+
/// A client-agnostic HTTP transport for RMCP that supports streaming responses.
482+
///
483+
/// This transport allows you to choose your preferred HTTP client implementation
484+
/// by implementing the [`StreamableHttpClient`] trait. The transport handles
485+
/// session management, SSE streaming, and automatic reconnection.
486+
///
487+
/// # Usage
488+
///
489+
/// ## Using reqwest
490+
///
491+
/// ```rust
492+
/// use rmcp::transport::StreamableHttpClientTransport;
493+
///
494+
/// // Enable the reqwest feature in Cargo.toml:
495+
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
496+
///
497+
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
498+
/// ```
499+
///
500+
/// ## Using a custom HTTP client
501+
///
502+
/// ```rust
503+
/// use rmcp::transport::streamable_http_client::{
504+
/// StreamableHttpClient,
505+
/// StreamableHttpClientTransport,
506+
/// StreamableHttpClientTransportConfig
507+
/// };
508+
///
509+
/// struct MyHttpClient;
510+
/// impl StreamableHttpClient for MyHttpClient {
511+
/// type Error = MyError;
512+
/// // ... implement the trait methods
513+
/// }
514+
///
515+
/// let transport = StreamableHttpClientTransport::with_client(
516+
/// MyHttpClient,
517+
/// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp")
518+
/// );
519+
/// ```
520+
///
521+
/// # Feature Flags
522+
///
523+
/// - `transport-streamable-http-client`: Base feature providing the generic transport infrastructure
524+
/// - `transport-streamable-http-client-reqwest`: Includes reqwest HTTP client support with convenience methods
481525
pub type StreamableHttpClientTransport<C> = WorkerTransport<StreamableHttpClientWorker<C>>;
482526

483527
impl<C: StreamableHttpClient> StreamableHttpClientTransport<C> {
528+
/// Creates a new transport with a custom HTTP client implementation.
529+
///
530+
/// This method allows you to use any HTTP client that implements the [`StreamableHttpClient`] trait.
531+
/// Use this when you want to use a custom HTTP client or when the reqwest feature is not enabled.
532+
///
533+
/// # Arguments
534+
///
535+
/// * `client` - Your HTTP client implementation
536+
/// * `config` - Transport configuration including the server URI
537+
///
538+
/// # Example
539+
///
540+
/// ```rust
541+
/// use rmcp::transport::streamable_http_client::{
542+
/// StreamableHttpClient,
543+
/// StreamableHttpClientTransport,
544+
/// StreamableHttpClientTransportConfig
545+
/// };
546+
///
547+
/// let transport = StreamableHttpClientTransport::with_client(
548+
/// MyHttpClient,
549+
/// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp")
550+
/// );
551+
/// ```
484552
pub fn with_client(client: C, config: StreamableHttpClientTransportConfig) -> Self {
485553
let worker = StreamableHttpClientWorker::new(client, config);
486554
WorkerTransport::spawn(worker)

examples/clients/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ rmcp = { workspace = true, features = [
1111
"client",
1212
"transport-sse-client",
1313
"reqwest",
14-
"transport-streamable-http-client",
14+
"transport-streamable-http-client-reqwest",
1515
"transport-child-process",
1616
"tower",
1717
"auth"

examples/rig-integration/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@ rig-core = "0.15.1"
1717
tokio = { version = "1", features = ["full"] }
1818
rmcp = { workspace = true, features = [
1919
"client",
20-
"reqwest",
2120
"transport-child-process",
22-
"transport-sse-client",
23-
"transport-streamable-http-client"
21+
"transport-sse-client-reqwest",
22+
"transport-streamable-http-client-reqwest"
2423
] }
2524
anyhow = "1.0"
2625
serde_json = "1"

examples/simple-chat-client/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ toml = "0.9"
1717
rmcp = { workspace = true, features = [
1818
"client",
1919
"transport-child-process",
20-
"transport-sse-client",
21-
"transport-streamable-http-client",
22-
"reqwest"
20+
"transport-sse-client-reqwest",
21+
"transport-streamable-http-client-reqwest"
2322
], no-default-features = true }
2423
clap = { version = "4.0", features = ["derive"] }

0 commit comments

Comments
 (0)