Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@
- [Prompts](#prompts)
- [Images](#images)
- [Context](#context)
- [Authentication](#authentication)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My IDE added those... Which seems correct.

- [Running Your Server](#running-your-server)
- [Development Mode](#development-mode)
- [Claude Desktop Integration](#claude-desktop-integration)
- [Direct Execution](#direct-execution)
- [Streamable HTTP Transport](#streamable-http-transport)
- [Mounting to an Existing ASGI Server](#mounting-to-an-existing-asgi-server)
- [Examples](#examples)
- [Echo Server](#echo-server)
- [SQLite Explorer](#sqlite-explorer)
- [Advanced Usage](#advanced-usage)
- [Low-Level Server](#low-level-server)
- [Writing MCP Clients](#writing-mcp-clients)
- [OAuth Authentication for Clients](#oauth-authentication-for-clients)
- [MCP Primitives](#mcp-primitives)
- [Server Capabilities](#server-capabilities)
- [Documentation](#documentation)
Expand Down Expand Up @@ -73,7 +76,7 @@ The Model Context Protocol allows applications to provide context for LLMs in a

### Adding MCP to your python project

We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects.
We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects.

If you haven't created a uv-managed project yet, create one:

Expand Down Expand Up @@ -790,13 +793,13 @@ if __name__ == "__main__":
Clients can also connect using [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http):

```python
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession


async def main():
# Connect to a streamable HTTP server
async with streamablehttp_client("example/mcp") as (
async with streamable_http_client("example/mcp") as (
read_stream,
write_stream,
_,
Expand All @@ -816,7 +819,7 @@ The SDK includes [authorization support](https://modelcontextprotocol.io/specifi
```python
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken


Expand Down Expand Up @@ -852,7 +855,7 @@ async def main():
)

# Use with streamable HTTP client
async with streamablehttp_client(
async with streamable_http_client(
"https://api.example.com/mcp", auth=oauth_auth
) as (read, write, _):
async with ClientSession(read, write) as session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken


Expand Down Expand Up @@ -208,7 +208,7 @@ async def _default_redirect_handler(authorization_url: str) -> None:
await self._run_session(read_stream, write_stream, None)
else:
print("📡 Opening StreamableHTTP transport connection with auth...")
async with streamablehttp_client(
async with streamable_http_client(
url=self.server_url,
auth=oauth_auth,
timeout=timedelta(seconds=60),
Expand Down
10 changes: 5 additions & 5 deletions src/mcp/client/session_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from mcp import types
from mcp.client.sse import sse_client
from mcp.client.stdio import StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared.exceptions import McpError


Expand All @@ -44,7 +44,7 @@ class SseServerParameters(BaseModel):


class StreamableHttpParameters(BaseModel):
"""Parameters for intializing a streamablehttp_client."""
"""Parameters for intializing a streamable_http_client."""

# The endpoint URL.
url: str
Expand Down Expand Up @@ -252,11 +252,11 @@ async def _establish_session(
)
read, write = await session_stack.enter_async_context(client)
else:
client = streamablehttp_client(
client = streamable_http_client(
url=server_params.url,
headers=server_params.headers,
timeout=server_params.timeout,
sse_read_timeout=server_params.sse_read_timeout,
timeout=server_params.timeout.total_seconds(),
sse_read_timeout=server_params.sse_read_timeout.total_seconds(),
terminate_on_close=server_params.terminate_on_close,
)
read, write, _ = await session_stack.enter_async_context(client)
Expand Down
86 changes: 71 additions & 15 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import logging
import warnings
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass
Expand Down Expand Up @@ -71,7 +72,7 @@ class RequestContext:
session_message: SessionMessage
metadata: ClientMessageMetadata | None
read_stream_writer: StreamWriter
sse_read_timeout: timedelta
sse_read_timeout: float


class StreamableHTTPTransport:
Expand All @@ -81,8 +82,8 @@ def __init__(
self,
url: str,
headers: dict[str, Any] | None = None,
timeout: timedelta = timedelta(seconds=30),
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
Comment on lines +85 to +86
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is backwards compatible.

auth: httpx.Auth | None = None,
) -> None:
"""Initialize the StreamableHTTP transport.
Expand All @@ -96,8 +97,25 @@ def __init__(
"""
self.url = url
self.headers = headers or {}

if isinstance(timeout, timedelta):
warnings.warn(
"`timeout` as `timedelta` is deprecated. Use `float` instead.",
DeprecationWarning,
stacklevel=2,
)
timeout = timeout.total_seconds()
self.timeout = timeout

if isinstance(sse_read_timeout, timedelta):
warnings.warn(
"`sse_read_timeout` as `timedelta` is deprecated. Use `float` instead.",
DeprecationWarning,
stacklevel=2,
)
sse_read_timeout = sse_read_timeout.total_seconds()
self.sse_read_timeout = sse_read_timeout

self.auth = auth
self.session_id: str | None = None
self.request_headers = {
Expand Down Expand Up @@ -194,9 +212,7 @@ async def handle_get_stream(
"GET",
self.url,
headers=headers,
timeout=httpx.Timeout(
self.timeout.seconds, read=self.sse_read_timeout.seconds
),
timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout),
) as event_source:
event_source.response.raise_for_status()
logger.debug("GET SSE connection established")
Expand Down Expand Up @@ -225,9 +241,7 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
"GET",
self.url,
headers=headers,
timeout=httpx.Timeout(
self.timeout.seconds, read=ctx.sse_read_timeout.seconds
),
timeout=httpx.Timeout(self.timeout, read=ctx.sse_read_timeout),
) as event_source:
event_source.response.raise_for_status()
logger.debug("Resumption GET SSE connection established")
Expand Down Expand Up @@ -446,6 +460,52 @@ async def streamablehttp_client(
`sse_read_timeout` determines how long (in seconds) the client will wait for a new
event before disconnecting. All other HTTP operations are controlled by `timeout`.

Yields:
Tuple containing:
- read_stream: Stream for reading messages from the server
- write_stream: Stream for sending messages to the server
- get_session_id_callback: Function to retrieve the current session ID
"""
warnings.warn(
"`streamablehttp_client` is deprecated. Use `streamable_http_client` instead.",
DeprecationWarning,
stacklevel=2,
)
async with streamable_http_client(
url,
headers,
timeout.total_seconds(),
sse_read_timeout.total_seconds(),
Comment on lines +477 to +478
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use total_seconds(), not seconds. Only seconds was wrong.

terminate_on_close,
httpx_client_factory,
auth,
) as (read_stream, write_stream, get_session_id):
yield (read_stream, write_stream, get_session_id)


@asynccontextmanager
async def streamable_http_client(
url: str,
headers: dict[str, Any] | None = None,
timeout: float = 30,
sse_read_timeout: float = 60 * 5,
terminate_on_close: bool = True,
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
auth: httpx.Auth | None = None,
) -> AsyncGenerator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback,
],
None,
]:
"""
Client transport for StreamableHTTP.

`sse_read_timeout` determines how long (in seconds) the client will wait for a new
event before disconnecting. All other HTTP operations are controlled by `timeout`.

Yields:
Tuple containing:
- read_stream: Stream for reading messages from the server
Expand All @@ -468,7 +528,7 @@ async def streamablehttp_client(
async with httpx_client_factory(
headers=transport.request_headers,
timeout=httpx.Timeout(
transport.timeout.seconds, read=transport.sse_read_timeout.seconds
transport.timeout, read=transport.sse_read_timeout
),
auth=transport.auth,
) as client:
Expand All @@ -489,11 +549,7 @@ def start_get_stream() -> None:
)

try:
yield (
read_stream,
write_stream,
transport.get_session_id,
)
yield (read_stream, write_stream, transport.get_session_id)
finally:
if transport.session_id and terminate_on_close:
await transport.terminate_session(client)
Expand Down
4 changes: 2 additions & 2 deletions tests/client/test_session_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ async def test_disconnect_non_existent_server(self):
url="http://test.com/stream", terminate_on_close=False
),
"streamablehttp",
"mcp.client.session_group.streamablehttp_client",
"mcp.client.session_group.streamable_http_client",
), # url, headers, timeout, sse_read_timeout, terminate_on_close
],
)
Expand All @@ -316,7 +316,7 @@ async def test_establish_session_parameterized(
mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read")
mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write")

# streamablehttp_client's __aenter__ returns three values
# streamable_http_client's __aenter__ returns three values
if client_type_name == "streamablehttp":
mock_extra_stream_val = mock.AsyncMock(name="StreamableExtra")
mock_client_cm_instance.__aenter__.return_value = (
Expand Down
8 changes: 4 additions & 4 deletions tests/server/fastmcp/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import mcp.types as types
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.resources import FunctionResource
from mcp.shared.context import RequestContext
Expand Down Expand Up @@ -464,7 +464,7 @@ async def test_fastmcp_streamable_http(
) -> None:
"""Test that FastMCP works with StreamableHTTP transport."""
# Connect to the server using StreamableHTTP
async with streamablehttp_client(http_server_url + "/mcp") as (
async with streamable_http_client(http_server_url + "/mcp") as (
read_stream,
write_stream,
_,
Expand All @@ -489,7 +489,7 @@ async def test_fastmcp_stateless_streamable_http(
) -> None:
"""Test that FastMCP works with stateless StreamableHTTP transport."""
# Connect to the server using StreamableHTTP
async with streamablehttp_client(stateless_http_server_url + "/mcp") as (
async with streamable_http_client(stateless_http_server_url + "/mcp") as (
read_stream,
write_stream,
_,
Expand Down Expand Up @@ -909,7 +909,7 @@ async def test_fastmcp_all_features_streamable_http(
collector = NotificationCollector()

# Connect to the server using StreamableHTTP
async with streamablehttp_client(everything_http_server_url + "/mcp") as (
async with streamable_http_client(everything_http_server_url + "/mcp") as (
read_stream,
write_stream,
_,
Expand Down
Loading
Loading