Skip to content

Commit ce2ac8b

Browse files
committed
feat(rmcp): enhance transport features by decoupling reqwest
- Added reqwest features for reqwest-based implementations. - Updated documentation - Modified error handling in SSE transport to use `String` for content type. - Updated examples to include new features
1 parent 4ea231a commit ce2ac8b

File tree

10 files changed

+321
-25
lines changed

10 files changed

+321
-25
lines changed

crates/rmcp/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,14 @@ server-side-http = [
102102
client-side-sse = ["dep:sse-stream", "dep:http"]
103103

104104
transport-sse-client = ["client-side-sse", "transport-worker"]
105+
transport-sse-client-reqwest = ["transport-sse-client", "reqwest"]
105106

106107
transport-worker = ["dep:tokio-stream"]
107108

108109

109110
# Streamable HTTP client
110-
transport-streamable-http-client = ["client-side-sse", "transport-worker", "reqwest"]
111+
transport-streamable-http-client = ["client-side-sse", "transport-worker"]
112+
transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"]
111113

112114

113115
transport-async-rw = ["tokio/io-util", "tokio-util/codec"]

crates/rmcp/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ RMCP uses feature flags to control which components are included:
199199
- `transport-async-rw`: Async read/write support
200200
- `transport-io`: I/O stream support
201201
- `transport-child-process`: Child process support
202-
- `transport-sse-client` / `transport-sse-server`: SSE support
203-
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming
202+
- `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic)
203+
- `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client
204+
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details)
205+
- `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client
204206
- `auth`: OAuth2 authentication support
205207
- `schemars`: JSON Schema generation (for tool definitions)
206208

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#[cfg(feature = "transport-streamable-http-client")]
2-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))]
1+
#[cfg(feature = "transport-streamable-http-client-reqwest")]
2+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))]
33
mod streamable_http_client;
44

5-
#[cfg(feature = "transport-sse-client")]
6-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))]
5+
#[cfg(feature = "transport-sse-client-reqwest")]
6+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))]
77
mod sse_client;

crates/rmcp/src/transport/common/reqwest/sse_client.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ use crate::transport::{
1111
sse_client::{SseClient, SseClientConfig, SseTransportError},
1212
};
1313

14+
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
15+
fn from(e: reqwest::Error) -> Self {
16+
SseTransportError::Client(e)
17+
}
18+
}
19+
1420
impl SseClient for reqwest::Client {
1521
type Error = reqwest::Error;
1622

@@ -55,7 +61,9 @@ impl SseClient for reqwest::Client {
5561
match response.headers().get(reqwest::header::CONTENT_TYPE) {
5662
Some(ct) => {
5763
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
58-
return Err(SseTransportError::UnexpectedContentType(Some(ct.clone())));
64+
return Err(SseTransportError::UnexpectedContentType(Some(
65+
String::from_utf8_lossy(ct.as_bytes()).to_string(),
66+
)));
5967
}
6068
}
6169
None => {
@@ -68,6 +76,33 @@ impl SseClient for reqwest::Client {
6876
}
6977

7078
impl SseClientTransport<reqwest::Client> {
79+
/// Creates a new transport using reqwest with the specified SSE endpoint.
80+
///
81+
/// This is a convenience method that creates a transport using the default
82+
/// reqwest client. This method is only available when the
83+
/// `transport-sse-client-reqwest` feature is enabled.
84+
///
85+
/// # Arguments
86+
///
87+
/// * `uri` - The SSE endpoint to connect to
88+
///
89+
/// # Example
90+
///
91+
/// ```rust
92+
/// use rmcp::transport::SseClientTransport;
93+
///
94+
/// // Enable the reqwest feature in Cargo.toml:
95+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
96+
///
97+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
99+
/// # Ok(())
100+
/// # }
101+
/// ```
102+
///
103+
/// # Feature requirement
104+
///
105+
/// This method requires the `transport-sse-client-reqwest` feature.
71106
pub async fn start(
72107
uri: impl Into<Arc<str>>,
73108
) -> Result<Self, SseTransportError<reqwest::Error>> {

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,30 @@ impl StreamableHttpClient for reqwest::Client {
131131
}
132132

133133
impl StreamableHttpClientTransport<reqwest::Client> {
134+
/// Creates a new transport using reqwest with the specified URI.
135+
///
136+
/// This is a convenience method that creates a transport using the default
137+
/// reqwest client. This method is only available when the
138+
/// `transport-streamable-http-client-reqwest` feature is enabled.
139+
///
140+
/// # Arguments
141+
///
142+
/// * `uri` - The server URI to connect to
143+
///
144+
/// # Example
145+
///
146+
/// ```rust,no_run
147+
/// use rmcp::transport::StreamableHttpClientTransport;
148+
///
149+
/// // Enable the reqwest feature in Cargo.toml:
150+
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
151+
///
152+
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
153+
/// ```
154+
///
155+
/// # Feature requirement
156+
///
157+
/// This method requires the `transport-streamable-http-client-reqwest` feature.
134158
pub fn from_uri(uri: impl Into<Arc<str>>) -> Self {
135159
StreamableHttpClientTransport::with_client(
136160
reqwest::Client::default(),

crates/rmcp/src/transport/sse_client.rs

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc};
33

44
use futures::{StreamExt, future::BoxFuture};
55
use http::Uri;
6-
use reqwest::header::HeaderValue;
6+
77
use sse_stream::Error as SseError;
88
use thiserror::Error;
99

@@ -28,7 +28,7 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
2828
#[error("unexpected end of stream")]
2929
UnexpectedEndOfStream,
3030
#[error("Unexpected content type: {0:?}")]
31-
UnexpectedContentType(Option<HeaderValue>),
31+
UnexpectedContentType(Option<String>),
3232
#[cfg(feature = "auth")]
3333
#[cfg_attr(docsrs, doc(cfg(feature = "auth")))]
3434
#[error("Auth error: {0}")]
@@ -39,12 +39,6 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
3939
InvalidUriParts(#[from] http::uri::InvalidUriParts),
4040
}
4141

42-
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
43-
fn from(e: reqwest::Error) -> Self {
44-
SseTransportError::Client(e)
45-
}
46-
}
47-
4842
pub trait SseClient: Clone + Send + Sync + 'static {
4943
type Error: std::error::Error + Send + Sync + 'static;
5044
fn post_message(
@@ -77,6 +71,87 @@ impl<C: SseClient> SseStreamReconnect for SseClientReconnect<C> {
7771
}
7872
}
7973
type ServerMessageStream<C> = Pin<Box<SseAutoReconnectStream<SseClientReconnect<C>>>>;
74+
75+
/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events.
76+
///
77+
/// This transport allows you to choose your preferred HTTP client implementation
78+
/// by implementing the [`SseClient`] trait. The transport handles SSE streaming
79+
/// and automatic reconnection.
80+
///
81+
/// # Usage
82+
///
83+
/// ## Using reqwest
84+
///
85+
/// ```rust
86+
/// use rmcp::transport::SseClientTransport;
87+
///
88+
/// // Enable the reqwest feature in Cargo.toml:
89+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
90+
///
91+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
92+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
93+
/// # Ok(())
94+
/// # }
95+
/// ```
96+
///
97+
/// ## Using a custom HTTP client
98+
///
99+
/// ```rust
100+
/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig};
101+
/// use std::sync::Arc;
102+
/// use futures::stream::BoxStream;
103+
/// use rmcp::model::ClientJsonRpcMessage;
104+
/// use sse_stream::{Sse, Error as SseError};
105+
/// use http::Uri;
106+
///
107+
/// #[derive(Clone)]
108+
/// struct MyHttpClient;
109+
///
110+
/// #[derive(Debug, thiserror::Error)]
111+
/// struct MyError;
112+
///
113+
/// impl std::fmt::Display for MyError {
114+
/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115+
/// write!(f, "MyError")
116+
/// }
117+
/// }
118+
///
119+
/// impl SseClient for MyHttpClient {
120+
/// type Error = MyError;
121+
///
122+
/// async fn post_message(
123+
/// &self,
124+
/// _uri: Uri,
125+
/// _message: ClientJsonRpcMessage,
126+
/// _auth_token: Option<String>,
127+
/// ) -> Result<(), rmcp::transport::sse_client::SseTransportError<Self::Error>> {
128+
/// todo!()
129+
/// }
130+
///
131+
/// async fn get_stream(
132+
/// &self,
133+
/// _uri: Uri,
134+
/// _last_event_id: Option<String>,
135+
/// _auth_token: Option<String>,
136+
/// ) -> Result<BoxStream<'static, Result<Sse, SseError>>, rmcp::transport::sse_client::SseTransportError<Self::Error>> {
137+
/// todo!()
138+
/// }
139+
/// }
140+
///
141+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
142+
/// let config = SseClientConfig {
143+
/// sse_endpoint: "http://localhost:8000/sse".into(),
144+
/// ..Default::default()
145+
/// };
146+
/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?;
147+
/// # Ok(())
148+
/// # }
149+
/// ```
150+
///
151+
/// # Feature Flags
152+
///
153+
/// - `transport-sse-client`: Base feature providing the generic transport infrastructure
154+
/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods
80155
pub struct SseClientTransport<C: SseClient> {
81156
client: C,
82157
config: SseClientConfig,

0 commit comments

Comments
 (0)