diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 8e9b48e9..7b2ceaee 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -102,12 +102,14 @@ server-side-http = [ client-side-sse = ["dep:sse-stream", "dep:http"] transport-sse-client = ["client-side-sse", "transport-worker"] +transport-sse-client-reqwest = ["transport-sse-client", "reqwest"] transport-worker = ["dep:tokio-stream"] # Streamable HTTP client transport-streamable-http-client = ["client-side-sse", "transport-worker"] +transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"] transport-async-rw = ["tokio/io-util", "tokio-util/codec"] diff --git a/crates/rmcp/README.md b/crates/rmcp/README.md index 578a228a..1ce43196 100644 --- a/crates/rmcp/README.md +++ b/crates/rmcp/README.md @@ -199,8 +199,10 @@ RMCP uses feature flags to control which components are included: - `transport-async-rw`: Async read/write support - `transport-io`: I/O stream support - `transport-child-process`: Child process support - - `transport-sse-client` / `transport-sse-server`: SSE support - - `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming + - `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic) + - `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client + - `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details) + - `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client - `auth`: OAuth2 authentication support - `schemars`: JSON Schema generation (for tool definitions) diff --git a/crates/rmcp/src/transport/common/reqwest.rs b/crates/rmcp/src/transport/common/reqwest.rs index 5395d571..4f9dc0dc 100644 --- a/crates/rmcp/src/transport/common/reqwest.rs +++ b/crates/rmcp/src/transport/common/reqwest.rs @@ -1,7 +1,7 @@ -#[cfg(feature = "transport-streamable-http-client")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))] +#[cfg(feature = "transport-streamable-http-client-reqwest")] +#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))] mod streamable_http_client; -#[cfg(feature = "transport-sse-client")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))] +#[cfg(feature = "transport-sse-client-reqwest")] +#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))] mod sse_client; diff --git a/crates/rmcp/src/transport/common/reqwest/sse_client.rs b/crates/rmcp/src/transport/common/reqwest/sse_client.rs index 37fe7841..71764c2d 100644 --- a/crates/rmcp/src/transport/common/reqwest/sse_client.rs +++ b/crates/rmcp/src/transport/common/reqwest/sse_client.rs @@ -11,6 +11,12 @@ use crate::transport::{ sse_client::{SseClient, SseClientConfig, SseTransportError}, }; +impl From for SseTransportError { + fn from(e: reqwest::Error) -> Self { + SseTransportError::Client(e) + } +} + impl SseClient for reqwest::Client { type Error = reqwest::Error; @@ -55,7 +61,9 @@ impl SseClient for reqwest::Client { match response.headers().get(reqwest::header::CONTENT_TYPE) { Some(ct) => { if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) { - return Err(SseTransportError::UnexpectedContentType(Some(ct.clone()))); + return Err(SseTransportError::UnexpectedContentType(Some( + String::from_utf8_lossy(ct.as_bytes()).to_string(), + ))); } } None => { @@ -68,6 +76,30 @@ impl SseClient for reqwest::Client { } impl SseClientTransport { + /// Creates a new transport using reqwest with the specified SSE endpoint. + /// + /// This is a convenience method that creates a transport using the default + /// reqwest client. This method is only available when the + /// `transport-sse-client-reqwest` feature is enabled. + /// + /// # Arguments + /// + /// * `uri` - The SSE endpoint to connect to + /// + /// # Example + /// + /// ```rust + /// use rmcp::transport::SseClientTransport; + /// + /// // Enable the reqwest feature in Cargo.toml: + /// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] } + /// + /// let transport = SseClientTransport::start("http://localhost:8000/sse").await?; + /// ``` + /// + /// # Feature requirement + /// + /// This method requires the `transport-sse-client-reqwest` feature. pub async fn start( uri: impl Into>, ) -> Result> { diff --git a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs index fd3aa1d5..2b3ccc14 100644 --- a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs +++ b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs @@ -14,6 +14,12 @@ use crate::{ }, }; +impl From for StreamableHttpError { + fn from(e: reqwest::Error) -> Self { + StreamableHttpError::Client(e) + } +} + impl StreamableHttpClient for reqwest::Client { type Error = reqwest::Error; @@ -125,6 +131,30 @@ impl StreamableHttpClient for reqwest::Client { } impl StreamableHttpClientTransport { + /// Creates a new transport using reqwest with the specified URI. + /// + /// This is a convenience method that creates a transport using the default + /// reqwest client. This method is only available when the + /// `transport-streamable-http-client-reqwest` feature is enabled. + /// + /// # Arguments + /// + /// * `uri` - The server URI to connect to + /// + /// # Example + /// + /// ```rust + /// use rmcp::transport::StreamableHttpClientTransport; + /// + /// // Enable the reqwest feature in Cargo.toml: + /// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] } + /// + /// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp"); + /// ``` + /// + /// # Feature requirement + /// + /// This method requires the `transport-streamable-http-client-reqwest` feature. pub fn from_uri(uri: impl Into>) -> Self { StreamableHttpClientTransport::with_client( reqwest::Client::default(), diff --git a/crates/rmcp/src/transport/sse_client.rs b/crates/rmcp/src/transport/sse_client.rs index 7b6f280c..fbf472b1 100644 --- a/crates/rmcp/src/transport/sse_client.rs +++ b/crates/rmcp/src/transport/sse_client.rs @@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc}; use futures::{StreamExt, future::BoxFuture}; use http::Uri; -use reqwest::header::HeaderValue; + use sse_stream::Error as SseError; use thiserror::Error; @@ -28,7 +28,7 @@ pub enum SseTransportError { #[error("unexpected end of stream")] UnexpectedEndOfStream, #[error("Unexpected content type: {0:?}")] - UnexpectedContentType(Option), + UnexpectedContentType(Option), #[cfg(feature = "auth")] #[cfg_attr(docsrs, doc(cfg(feature = "auth")))] #[error("Auth error: {0}")] @@ -39,12 +39,6 @@ pub enum SseTransportError { InvalidUriParts(#[from] http::uri::InvalidUriParts), } -impl From for SseTransportError { - fn from(e: reqwest::Error) -> Self { - SseTransportError::Client(e) - } -} - pub trait SseClient: Clone + Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; fn post_message( @@ -77,6 +71,48 @@ impl SseStreamReconnect for SseClientReconnect { } } type ServerMessageStream = Pin>>>; + +/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events. +/// +/// This transport allows you to choose your preferred HTTP client implementation +/// by implementing the [`SseClient`] trait. The transport handles SSE streaming +/// and automatic reconnection. +/// +/// # Usage +/// +/// ## Using reqwest (most common) +/// +/// ```rust +/// use rmcp::transport::SseClientTransport; +/// +/// // Enable the reqwest feature in Cargo.toml: +/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] } +/// +/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?; +/// ``` +/// +/// ## Using a custom HTTP client +/// +/// ```rust +/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig}; +/// +/// struct MyHttpClient; +/// impl SseClient for MyHttpClient { +/// type Error = MyError; +/// // ... implement the trait methods +/// } +/// +/// let config = SseClientConfig { +/// sse_endpoint: "http://localhost:8000/sse".into(), +/// ..Default::default() +/// }; +/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?; +/// ``` +/// +/// # Feature Flags +/// +/// - `transport-sse-client`: Base feature providing the generic transport infrastructure +/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods pub struct SseClientTransport { client: C, config: SseClientConfig, diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 20159a01..6d0b28c3 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -48,11 +48,6 @@ pub enum StreamableHttpError { Auth(#[from] crate::transport::auth::AuthError), } -impl From for StreamableHttpError { - fn from(e: reqwest::Error) -> Self { - StreamableHttpError::Client(e) - } -} pub enum StreamableHttpPostResponse { Accepted, @@ -483,9 +478,77 @@ impl Worker for StreamableHttpClientWorker { } } +/// A client-agnostic HTTP transport for RMCP that supports streaming responses. +/// +/// This transport allows you to choose your preferred HTTP client implementation +/// by implementing the [`StreamableHttpClient`] trait. The transport handles +/// session management, SSE streaming, and automatic reconnection. +/// +/// # Usage +/// +/// ## Using reqwest +/// +/// ```rust +/// use rmcp::transport::StreamableHttpClientTransport; +/// +/// // Enable the reqwest feature in Cargo.toml: +/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] } +/// +/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp"); +/// ``` +/// +/// ## Using a custom HTTP client +/// +/// ```rust +/// use rmcp::transport::streamable_http_client::{ +/// StreamableHttpClient, +/// StreamableHttpClientTransport, +/// StreamableHttpClientTransportConfig +/// }; +/// +/// struct MyHttpClient; +/// impl StreamableHttpClient for MyHttpClient { +/// type Error = MyError; +/// // ... implement the trait methods +/// } +/// +/// let transport = StreamableHttpClientTransport::with_client( +/// MyHttpClient, +/// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp") +/// ); +/// ``` +/// +/// # Feature Flags +/// +/// - `transport-streamable-http-client`: Base feature providing the generic transport infrastructure +/// - `transport-streamable-http-client-reqwest`: Includes reqwest HTTP client support with convenience methods pub type StreamableHttpClientTransport = WorkerTransport>; impl StreamableHttpClientTransport { + /// Creates a new transport with a custom HTTP client implementation. + /// + /// This method allows you to use any HTTP client that implements the [`StreamableHttpClient`] trait. + /// Use this when you want to use a custom HTTP client or when the reqwest feature is not enabled. + /// + /// # Arguments + /// + /// * `client` - Your HTTP client implementation + /// * `config` - Transport configuration including the server URI + /// + /// # Example + /// + /// ```rust + /// use rmcp::transport::streamable_http_client::{ + /// StreamableHttpClient, + /// StreamableHttpClientTransport, + /// StreamableHttpClientTransportConfig + /// }; + /// + /// let transport = StreamableHttpClientTransport::with_client( + /// MyHttpClient, + /// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp") + /// ); + /// ``` pub fn with_client(client: C, config: StreamableHttpClientTransportConfig) -> Self { let worker = StreamableHttpClientWorker::new(client, config); WorkerTransport::spawn(worker) diff --git a/examples/clients/Cargo.toml b/examples/clients/Cargo.toml index 5dcb2dc6..bafb6c14 100644 --- a/examples/clients/Cargo.toml +++ b/examples/clients/Cargo.toml @@ -11,7 +11,7 @@ rmcp = { workspace = true, features = [ "client", "transport-sse-client", "reqwest", - "transport-streamable-http-client", + "transport-streamable-http-client-reqwest", "transport-child-process", "tower", "auth" diff --git a/examples/rig-integration/Cargo.toml b/examples/rig-integration/Cargo.toml index 4f643d8a..a472086d 100644 --- a/examples/rig-integration/Cargo.toml +++ b/examples/rig-integration/Cargo.toml @@ -17,10 +17,9 @@ rig-core = "0.15.1" tokio = { version = "1", features = ["full"] } rmcp = { workspace = true, features = [ "client", - "reqwest", "transport-child-process", - "transport-sse-client", - "transport-streamable-http-client" + "transport-sse-client-reqwest", + "transport-streamable-http-client-reqwest" ] } anyhow = "1.0" serde_json = "1" diff --git a/examples/simple-chat-client/Cargo.toml b/examples/simple-chat-client/Cargo.toml index c99353ee..612b7a86 100644 --- a/examples/simple-chat-client/Cargo.toml +++ b/examples/simple-chat-client/Cargo.toml @@ -17,8 +17,7 @@ toml = "0.9" rmcp = { workspace = true, features = [ "client", "transport-child-process", - "transport-sse-client", - "transport-streamable-http-client", - "reqwest" + "transport-sse-client-reqwest", + "transport-streamable-http-client-reqwest" ], no-default-features = true } clap = { version = "4.0", features = ["derive"] }