Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1993,12 +1993,12 @@ Run from the repository root:
import asyncio

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client


async def main():
# Connect to a streamable HTTP server
async with streamablehttp_client("http://localhost:8000/mcp") as (
async with streamable_http_client("http://localhost:8000/mcp") as (
read_stream,
write_stream,
_,
Expand Down Expand Up @@ -2126,7 +2126,8 @@ from pydantic import AnyUrl

from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken


Expand Down Expand Up @@ -2180,15 +2181,16 @@ async def main():
callback_handler=handle_callback,
)

async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()

tools = await session.list_tools()
print(f"Available tools: {[tool.name for tool in tools.tools]}")
tools = await session.list_tools()
print(f"Available tools: {[tool.name for tool in tools.tools]}")

resources = await session.list_resources()
print(f"Available resources: {[r.uri for r in resources.resources]}")
resources = await session.list_resources()
print(f"Available resources: {[r.uri for r in resources.resources]}")


def run():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import threading
import time
import webbrowser
from datetime import timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse

from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken


Expand Down Expand Up @@ -205,12 +205,12 @@ async def _default_redirect_handler(authorization_url: str) -> None:
await self._run_session(read_stream, write_stream, None)
else:
print("📡 Opening StreamableHTTP transport connection with auth...")
async with streamablehttp_client(
url=self.server_url,
auth=oauth_auth,
timeout=timedelta(seconds=60),
) as (read_stream, write_stream, get_session_id):
await self._run_session(read_stream, write_stream, get_session_id)
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
async with streamable_http_client(
url=self.server_url,
httpx_client=custom_client,
) as (read_stream, write_stream, get_session_id):
await self._run_session(read_stream, write_stream, get_session_id)

except Exception as e:
print(f"❌ Failed to connect: {e}")
Expand Down
18 changes: 10 additions & 8 deletions examples/snippets/clients/oauth_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared._httpx_utils import create_mcp_http_client
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have private methods being exposed in examples. 🤔

from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken


Expand Down Expand Up @@ -68,15 +69,16 @@ async def main():
callback_handler=handle_callback,
)

async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()

tools = await session.list_tools()
print(f"Available tools: {[tool.name for tool in tools.tools]}")
tools = await session.list_tools()
print(f"Available tools: {[tool.name for tool in tools.tools]}")

resources = await session.list_resources()
print(f"Available resources: {[r.uri for r in resources.resources]}")
resources = await session.list_resources()
print(f"Available resources: {[r.uri for r in resources.resources]}")


def run():
Expand Down
4 changes: 2 additions & 2 deletions examples/snippets/clients/streamable_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import asyncio

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client


async def main():
# Connect to a streamable HTTP server
async with streamablehttp_client("http://localhost:8000/mcp") as (
async with streamable_http_client("http://localhost:8000/mcp") as (
read_stream,
write_stream,
_,
Expand Down
21 changes: 15 additions & 6 deletions src/mcp/client/session_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
from typing import Any, TypeAlias

import anyio
import httpx
from pydantic import BaseModel
from typing_extensions import Self

import mcp
from mcp import types
from mcp.client.sse import sse_client
from mcp.client.stdio import StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client
from mcp.shared._httpx_utils import create_mcp_http_client
from mcp.shared.exceptions import McpError


Expand All @@ -44,7 +46,7 @@ class SseServerParameters(BaseModel):


class StreamableHttpParameters(BaseModel):
"""Parameters for intializing a streamablehttp_client."""
"""Parameters for intializing a streamable_http_client."""

# The endpoint URL.
url: str
Expand Down Expand Up @@ -250,11 +252,18 @@ async def _establish_session(
)
read, write = await session_stack.enter_async_context(client)
else:
client = streamablehttp_client(
url=server_params.url,
httpx_client = create_mcp_http_client(
headers=server_params.headers,
timeout=server_params.timeout,
sse_read_timeout=server_params.sse_read_timeout,
timeout=httpx.Timeout(
server_params.timeout.total_seconds(),
read=server_params.sse_read_timeout.total_seconds(),
),
)
await session_stack.enter_async_context(httpx_client)

client = streamable_http_client(
url=server_params.url,
httpx_client=httpx_client,
terminate_on_close=server_params.terminate_on_close,
)
read, write, _ = await session_stack.enter_async_context(client)
Expand Down
108 changes: 90 additions & 18 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
and session management.
"""

import contextlib
import logging
from collections.abc import AsyncGenerator, Awaitable, Callable
from contextlib import asynccontextmanager
Expand All @@ -17,8 +18,14 @@
from anyio.abc import TaskGroup
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from httpx_sse import EventSource, ServerSentEvent, aconnect_sse
from typing_extensions import deprecated

from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client
from mcp.shared._httpx_utils import (
MCP_DEFAULT_SSE_READ_TIMEOUT,
MCP_DEFAULT_TIMEOUT,
McpHttpClientFactory,
create_mcp_http_client,
)
from mcp.shared.message import ClientMessageMetadata, SessionMessage
from mcp.types import (
ErrorData,
Expand Down Expand Up @@ -101,9 +108,9 @@ def __init__(
self.session_id = None
self.protocol_version = None
self.request_headers = {
**self.headers,
Copy link
Contributor

@felixweinberger felixweinberger Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important change 1/2 ⚠️

Previously we would have streamablehttp_client create the httpx.AsyncClient via a factory after initializing the transport. That means we could use these transport.request_headers when creating the httpx.AsyncClient to ensure the client and the transport have the same headers.

If we now accept httpx_client as an argument to streamable_http_client, that client might have custom headers! In fact by default httpx.AsyncClient creates headers here that need to be overriden (hence moving the ACCEPT and CONTENT_TYPE after)

Therefore the transport needs to override these headers now if they're present to be configured correctly.

ACCEPT: f"{JSON}, {SSE}",
CONTENT_TYPE: JSON,
**self.headers,
}

def _prepare_request_headers(self, base_headers: dict[str, str]) -> dict[str, str]:
Expand Down Expand Up @@ -442,14 +449,11 @@ def get_session_id(self) -> str | None:


@asynccontextmanager
async def streamablehttp_client(
async def streamable_http_client(
url: str,
headers: dict[str, str] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
*,
httpx_client: httpx.AsyncClient | None = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
httpx_client: httpx.AsyncClient | None = None,
http_client: httpx.AsyncClient | None = None,

terminate_on_close: bool = True,
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
auth: httpx.Auth | None = None,
) -> AsyncGenerator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
Expand All @@ -461,30 +465,57 @@ async def streamablehttp_client(
"""
Client transport for StreamableHTTP.

`sse_read_timeout` determines how long (in seconds) the client will wait for a new
event before disconnecting. All other HTTP operations are controlled by `timeout`.
Args:
url: The MCP server endpoint URL.
httpx_client: Optional pre-configured httpx.AsyncClient. If None, a default
client with recommended MCP timeouts will be created. To configure headers,
authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
terminate_on_close: If True, send a DELETE request to terminate the session
when the context exits.

Yields:
Tuple containing:
- read_stream: Stream for reading messages from the server
- write_stream: Stream for sending messages to the server
- get_session_id_callback: Function to retrieve the current session ID
"""
transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout, auth)

Example:
See examples/snippets/clients/ for usage patterns.
"""
read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0)
write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0)

# Determine if we need to create and manage the client
client_provided = httpx_client is not None
client = httpx_client

if client is None:
# Create default client with recommended MCP timeouts
client = create_mcp_http_client()

# Extract configuration from the client to pass to transport
headers_dict = dict(client.headers) if client.headers else None
timeout = client.timeout.connect if (client.timeout and client.timeout.connect is not None) else MCP_DEFAULT_TIMEOUT
sse_read_timeout = (
client.timeout.read if (client.timeout and client.timeout.read is not None) else MCP_DEFAULT_SSE_READ_TIMEOUT
)
auth = client.auth

# Create transport with extracted configuration
transport = StreamableHTTPTransport(url, headers_dict, timeout, sse_read_timeout, auth)

# Sync client headers with transport's merged headers (includes MCP protocol requirements)
client.headers.update(transport.request_headers)
Comment on lines +507 to +508
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important change 2/2 ⚠️

We need to sync the headers on the client back so that they match with the transport after the transport has overriden them. If the user passes in a httpx_client they created, that could have arbitrary headers that no longer match the headers on the transport.

Overall I find this quite horrible - but I'm not sure how else to make this work via an object instead of a factory. Open for suggestions...


async with anyio.create_task_group() as tg:
try:
logger.debug(f"Connecting to StreamableHTTP endpoint: {url}")

async with httpx_client_factory(
headers=transport.request_headers,
timeout=httpx.Timeout(transport.timeout, read=transport.sse_read_timeout),
auth=transport.auth,
) as client:
# Define callbacks that need access to tg
async with contextlib.AsyncExitStack() as stack:
# Only manage client lifecycle if we created it
if not client_provided:
await stack.enter_async_context(client)

def start_get_stream() -> None:
tg.start_soon(transport.handle_get_stream, client, read_stream_writer)

Expand All @@ -511,3 +542,44 @@ def start_get_stream() -> None:
finally:
await read_stream_writer.aclose()
await write_stream.aclose()


@deprecated("Use `streamable_http_client` instead.")
@asynccontextmanager
async def streamablehttp_client(
url: str,
headers: dict[str, str] | None = None,
timeout: float | timedelta = 30,
sse_read_timeout: float | timedelta = 60 * 5,
terminate_on_close: bool = True,
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
auth: httpx.Auth | None = None,
) -> AsyncGenerator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
GetSessionIdCallback,
],
None,
]:
# Convert timeout parameters
timeout_seconds = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout
sse_read_timeout_seconds = (
sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout
)

# Create httpx client using the factory with old-style parameters
client = httpx_client_factory(
headers=headers,
timeout=httpx.Timeout(timeout_seconds, read=sse_read_timeout_seconds),
auth=auth,
)

# Manage client lifecycle since we created it
async with client:
async with streamable_http_client(
url,
httpx_client=client,
terminate_on_close=terminate_on_close,
) as streams:
yield streams
8 changes: 6 additions & 2 deletions src/mcp/shared/_httpx_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import httpx

__all__ = ["create_mcp_http_client"]
__all__ = ["create_mcp_http_client", "MCP_DEFAULT_TIMEOUT", "MCP_DEFAULT_SSE_READ_TIMEOUT"]

# Default MCP timeout configuration
MCP_DEFAULT_TIMEOUT = 30.0 # General operations (seconds)
MCP_DEFAULT_SSE_READ_TIMEOUT = 300.0 # SSE streams - 5 minutes (seconds)


class McpHttpClientFactory(Protocol):
Expand Down Expand Up @@ -68,7 +72,7 @@ def create_mcp_http_client(

# Handle timeout
if timeout is None:
kwargs["timeout"] = httpx.Timeout(30.0)
kwargs["timeout"] = httpx.Timeout(MCP_DEFAULT_TIMEOUT, read=MCP_DEFAULT_SSE_READ_TIMEOUT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we'd rely on the transport setting this to 60 * 5 when creating our client, but we can't rely on that anymore as the client may now be created before the transport.

else:
kwargs["timeout"] = timeout

Expand Down
6 changes: 3 additions & 3 deletions tests/client/test_http_unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pytest

from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.client.streamable_http import streamable_http_client

# Test constants with various Unicode characters
UNICODE_TEST_STRINGS = {
Expand Down Expand Up @@ -189,7 +189,7 @@ async def test_streamable_http_client_unicode_tool_call(running_unicode_server:
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamablehttp_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

Expand Down Expand Up @@ -221,7 +221,7 @@ async def test_streamable_http_client_unicode_prompts(running_unicode_server: st
base_url = running_unicode_server
endpoint_url = f"{base_url}/mcp"

async with streamablehttp_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

Expand Down
Loading
Loading