Skip to content

Include reqwest in transport-streamble-http-client feature #376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/rmcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ server-side-http = [
client-side-sse = ["dep:sse-stream", "dep:http"]

transport-sse-client = ["client-side-sse", "transport-worker"]
transport-sse-client-reqwest = ["transport-sse-client", "reqwest"]

transport-worker = ["dep:tokio-stream"]


# Streamable HTTP client
transport-streamable-http-client = ["client-side-sse", "transport-worker"]
transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"]


transport-async-rw = ["tokio/io-util", "tokio-util/codec"]
Expand Down
6 changes: 4 additions & 2 deletions crates/rmcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ RMCP uses feature flags to control which components are included:
- `transport-async-rw`: Async read/write support
- `transport-io`: I/O stream support
- `transport-child-process`: Child process support
- `transport-sse-client` / `transport-sse-server`: SSE support
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming
- `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic)
- `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details)
- `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client
- `auth`: OAuth2 authentication support
- `schemars`: JSON Schema generation (for tool definitions)

Expand Down
8 changes: 4 additions & 4 deletions crates/rmcp/src/transport/common/reqwest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "transport-streamable-http-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))]
#[cfg(feature = "transport-streamable-http-client-reqwest")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))]
mod streamable_http_client;

#[cfg(feature = "transport-sse-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))]
#[cfg(feature = "transport-sse-client-reqwest")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))]
mod sse_client;
34 changes: 33 additions & 1 deletion crates/rmcp/src/transport/common/reqwest/sse_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ use crate::transport::{
sse_client::{SseClient, SseClientConfig, SseTransportError},
};

impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
fn from(e: reqwest::Error) -> Self {
SseTransportError::Client(e)
}
}

impl SseClient for reqwest::Client {
type Error = reqwest::Error;

Expand Down Expand Up @@ -55,7 +61,9 @@ impl SseClient for reqwest::Client {
match response.headers().get(reqwest::header::CONTENT_TYPE) {
Some(ct) => {
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
return Err(SseTransportError::UnexpectedContentType(Some(ct.clone())));
return Err(SseTransportError::UnexpectedContentType(Some(
String::from_utf8_lossy(ct.as_bytes()).to_string(),
)));
}
}
None => {
Expand All @@ -68,6 +76,30 @@ impl SseClient for reqwest::Client {
}

impl SseClientTransport<reqwest::Client> {
/// Creates a new transport using reqwest with the specified SSE endpoint.
///
/// This is a convenience method that creates a transport using the default
/// reqwest client. This method is only available when the
/// `transport-sse-client-reqwest` feature is enabled.
///
/// # Arguments
///
/// * `uri` - The SSE endpoint to connect to
///
/// # Example
///
/// ```rust
/// use rmcp::transport::SseClientTransport;
///
/// // Enable the reqwest feature in Cargo.toml:
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
///
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
/// ```
///
/// # Feature requirement
///
/// This method requires the `transport-sse-client-reqwest` feature.
pub async fn start(
uri: impl Into<Arc<str>>,
) -> Result<Self, SseTransportError<reqwest::Error>> {
Expand Down
30 changes: 30 additions & 0 deletions crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ use crate::{
},
};

impl From<reqwest::Error> for StreamableHttpError<reqwest::Error> {
fn from(e: reqwest::Error) -> Self {
StreamableHttpError::Client(e)
}
}

impl StreamableHttpClient for reqwest::Client {
type Error = reqwest::Error;

Expand Down Expand Up @@ -125,6 +131,30 @@ impl StreamableHttpClient for reqwest::Client {
}

impl StreamableHttpClientTransport<reqwest::Client> {
/// Creates a new transport using reqwest with the specified URI.
///
/// This is a convenience method that creates a transport using the default
/// reqwest client. This method is only available when the
/// `transport-streamable-http-client-reqwest` feature is enabled.
///
/// # Arguments
///
/// * `uri` - The server URI to connect to
///
/// # Example
///
/// ```rust
/// use rmcp::transport::StreamableHttpClientTransport;
///
/// // Enable the reqwest feature in Cargo.toml:
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
///
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
/// ```
///
/// # Feature requirement
///
/// This method requires the `transport-streamable-http-client-reqwest` feature.
pub fn from_uri(uri: impl Into<Arc<str>>) -> Self {
StreamableHttpClientTransport::with_client(
reqwest::Client::default(),
Expand Down
52 changes: 44 additions & 8 deletions crates/rmcp/src/transport/sse_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{pin::Pin, sync::Arc};

use futures::{StreamExt, future::BoxFuture};
use http::Uri;
use reqwest::header::HeaderValue;

use sse_stream::Error as SseError;
use thiserror::Error;

Expand All @@ -28,7 +28,7 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
#[error("unexpected end of stream")]
UnexpectedEndOfStream,
#[error("Unexpected content type: {0:?}")]
UnexpectedContentType(Option<HeaderValue>),
UnexpectedContentType(Option<String>),
#[cfg(feature = "auth")]
#[cfg_attr(docsrs, doc(cfg(feature = "auth")))]
#[error("Auth error: {0}")]
Expand All @@ -39,12 +39,6 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
InvalidUriParts(#[from] http::uri::InvalidUriParts),
}

impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
fn from(e: reqwest::Error) -> Self {
SseTransportError::Client(e)
}
}

pub trait SseClient: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
fn post_message(
Expand Down Expand Up @@ -77,6 +71,48 @@ impl<C: SseClient> SseStreamReconnect for SseClientReconnect<C> {
}
}
type ServerMessageStream<C> = Pin<Box<SseAutoReconnectStream<SseClientReconnect<C>>>>;

/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events.
///
/// This transport allows you to choose your preferred HTTP client implementation
/// by implementing the [`SseClient`] trait. The transport handles SSE streaming
/// and automatic reconnection.
///
/// # Usage
///
/// ## Using reqwest (most common)
///
/// ```rust
/// use rmcp::transport::SseClientTransport;
///
/// // Enable the reqwest feature in Cargo.toml:
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
///
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
/// ```
///
/// ## Using a custom HTTP client
///
/// ```rust
/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig};
///
/// struct MyHttpClient;
/// impl SseClient for MyHttpClient {
/// type Error = MyError;
/// // ... implement the trait methods
/// }
///
/// let config = SseClientConfig {
/// sse_endpoint: "http://localhost:8000/sse".into(),
/// ..Default::default()
/// };
/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?;
/// ```
///
/// # Feature Flags
///
/// - `transport-sse-client`: Base feature providing the generic transport infrastructure
/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods
pub struct SseClientTransport<C: SseClient> {
client: C,
config: SseClientConfig,
Expand Down
73 changes: 68 additions & 5 deletions crates/rmcp/src/transport/streamable_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ pub enum StreamableHttpError<E: std::error::Error + Send + Sync + 'static> {
Auth(#[from] crate::transport::auth::AuthError),
}

impl From<reqwest::Error> for StreamableHttpError<reqwest::Error> {
fn from(e: reqwest::Error) -> Self {
StreamableHttpError::Client(e)
}
}

pub enum StreamableHttpPostResponse {
Accepted,
Expand Down Expand Up @@ -483,9 +478,77 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
}
}

/// A client-agnostic HTTP transport for RMCP that supports streaming responses.
///
/// This transport allows you to choose your preferred HTTP client implementation
/// by implementing the [`StreamableHttpClient`] trait. The transport handles
/// session management, SSE streaming, and automatic reconnection.
///
/// # Usage
///
/// ## Using reqwest
///
/// ```rust
/// use rmcp::transport::StreamableHttpClientTransport;
///
/// // Enable the reqwest feature in Cargo.toml:
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
///
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
/// ```
///
/// ## Using a custom HTTP client
///
/// ```rust
/// use rmcp::transport::streamable_http_client::{
/// StreamableHttpClient,
/// StreamableHttpClientTransport,
/// StreamableHttpClientTransportConfig
/// };
///
/// struct MyHttpClient;
/// impl StreamableHttpClient for MyHttpClient {
/// type Error = MyError;
/// // ... implement the trait methods
/// }
///
/// let transport = StreamableHttpClientTransport::with_client(
/// MyHttpClient,
/// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp")
/// );
/// ```
///
/// # Feature Flags
///
/// - `transport-streamable-http-client`: Base feature providing the generic transport infrastructure
/// - `transport-streamable-http-client-reqwest`: Includes reqwest HTTP client support with convenience methods
pub type StreamableHttpClientTransport<C> = WorkerTransport<StreamableHttpClientWorker<C>>;

impl<C: StreamableHttpClient> StreamableHttpClientTransport<C> {
/// Creates a new transport with a custom HTTP client implementation.
///
/// This method allows you to use any HTTP client that implements the [`StreamableHttpClient`] trait.
/// Use this when you want to use a custom HTTP client or when the reqwest feature is not enabled.
///
/// # Arguments
///
/// * `client` - Your HTTP client implementation
/// * `config` - Transport configuration including the server URI
///
/// # Example
///
/// ```rust
/// use rmcp::transport::streamable_http_client::{
/// StreamableHttpClient,
/// StreamableHttpClientTransport,
/// StreamableHttpClientTransportConfig
/// };
///
/// let transport = StreamableHttpClientTransport::with_client(
/// MyHttpClient,
/// StreamableHttpClientTransportConfig::with_uri("http://localhost:8000/mcp")
/// );
/// ```
pub fn with_client(client: C, config: StreamableHttpClientTransportConfig) -> Self {
let worker = StreamableHttpClientWorker::new(client, config);
WorkerTransport::spawn(worker)
Expand Down
2 changes: 1 addition & 1 deletion examples/clients/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rmcp = { workspace = true, features = [
"client",
"transport-sse-client",
"reqwest",
"transport-streamable-http-client",
"transport-streamable-http-client-reqwest",
"transport-child-process",
"tower",
"auth"
Expand Down
5 changes: 2 additions & 3 deletions examples/rig-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ rig-core = "0.15.1"
tokio = { version = "1", features = ["full"] }
rmcp = { workspace = true, features = [
"client",
"reqwest",
"transport-child-process",
"transport-sse-client",
"transport-streamable-http-client"
"transport-sse-client-reqwest",
"transport-streamable-http-client-reqwest"
] }
anyhow = "1.0"
serde_json = "1"
Expand Down
5 changes: 2 additions & 3 deletions examples/simple-chat-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ toml = "0.9"
rmcp = { workspace = true, features = [
"client",
"transport-child-process",
"transport-sse-client",
"transport-streamable-http-client",
"reqwest"
"transport-sse-client-reqwest",
"transport-streamable-http-client-reqwest"
], no-default-features = true }
clap = { version = "4.0", features = ["derive"] }