diff --git a/examples/mcp/streamablehttp_custom_client_example/README.md b/examples/mcp/streamablehttp_custom_client_example/README.md new file mode 100644 index 000000000..1569b3c28 --- /dev/null +++ b/examples/mcp/streamablehttp_custom_client_example/README.md @@ -0,0 +1,62 @@ +# Custom HTTP Client Factory Example + +This example demonstrates how to use the new `httpx_client_factory` parameter in `MCPServerStreamableHttp` to configure custom HTTP client behavior for MCP StreamableHTTP connections. + +## Features Demonstrated + +- **Custom SSL Configuration**: Configure SSL certificates and verification settings +- **Custom Headers**: Add custom headers to all HTTP requests +- **Custom Timeouts**: Set custom timeout values for requests +- **Proxy Configuration**: Configure HTTP proxy settings +- **Custom Retry Logic**: Set up custom retry behavior (through httpx configuration) + +## Running the Example + +1. Make sure you have `uv` installed: https://docs.astral.sh/uv/getting-started/installation/ + +2. Run the example: + ```bash + cd examples/mcp/streamablehttp_custom_client_example + uv run main.py + ``` + +## Code Examples + +### Basic Custom Client + +```python +import httpx +from agents.mcp import MCPServerStreamableHttp + +def create_custom_http_client() -> httpx.AsyncClient: + return httpx.AsyncClient( + verify=False, # Disable SSL verification for testing + timeout=httpx.Timeout(60.0, read=120.0), + headers={"X-Custom-Client": "my-app"}, + ) + +async with MCPServerStreamableHttp( + name="Custom Client Server", + params={ + "url": "http://localhost:8000/mcp", + "httpx_client_factory": create_custom_http_client, + }, +) as server: + # Use the server... +``` + +## Use Cases + +- **Corporate Networks**: Configure proxy settings for corporate environments +- **SSL/TLS Requirements**: Use custom SSL certificates for secure connections +- **Custom Authentication**: Add custom headers for API authentication +- **Network Optimization**: Configure timeouts and connection pooling +- **Debugging**: Disable SSL verification for development environments + +## Benefits + +- **Flexibility**: Configure HTTP client behavior to match your network requirements +- **Security**: Use custom SSL certificates and authentication methods +- **Performance**: Optimize timeouts and connection settings for your use case +- **Compatibility**: Work with corporate proxies and network restrictions + diff --git a/examples/mcp/streamablehttp_custom_client_example/main.py b/examples/mcp/streamablehttp_custom_client_example/main.py new file mode 100644 index 000000000..41e26ec35 --- /dev/null +++ b/examples/mcp/streamablehttp_custom_client_example/main.py @@ -0,0 +1,116 @@ +"""Example demonstrating custom httpx_client_factory for MCPServerStreamableHttp. + +This example shows how to configure custom HTTP client behavior for MCP StreamableHTTP +connections, including SSL certificates, proxy settings, and custom timeouts. +""" + +import asyncio +import os +import shutil +import subprocess +import time +from typing import Any + +import httpx + +from agents import Agent, Runner, gen_trace_id, trace +from agents.mcp import MCPServer, MCPServerStreamableHttp +from agents.model_settings import ModelSettings + + +def create_custom_http_client( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, +) -> httpx.AsyncClient: + """Create a custom HTTP client with specific configurations. + + This function demonstrates how to configure: + - Custom SSL verification settings + - Custom timeouts + - Custom headers + - Proxy settings (commented out) + """ + if headers is None: + headers = { + "X-Custom-Client": "agents-mcp-example", + "User-Agent": "OpenAI-Agents-MCP/1.0", + } + if timeout is None: + timeout = httpx.Timeout(60.0, read=120.0) + if auth is None: + auth = None + return httpx.AsyncClient( + # Disable SSL verification for testing (not recommended for production) + verify=False, + # Set custom timeout + timeout=httpx.Timeout(60.0, read=120.0), + # Add custom headers that will be sent with every request + headers=headers, + ) + + +async def run_with_custom_client(mcp_server: MCPServer): + """Run the agent with a custom HTTP client configuration.""" + agent = Agent( + name="Assistant", + instructions="Use the tools to answer the questions.", + mcp_servers=[mcp_server], + model_settings=ModelSettings(tool_choice="required"), + ) + + # Use the `add` tool to add two numbers + message = "Add these numbers: 7 and 22." + print(f"Running: {message}") + result = await Runner.run(starting_agent=agent, input=message) + print(result.final_output) + + +async def main(): + """Main function demonstrating different HTTP client configurations.""" + + print("=== Example: Custom HTTP Client with SSL disabled and custom headers ===") + async with MCPServerStreamableHttp( + name="Streamable HTTP with Custom Client", + params={ + "url": "http://localhost:8000/mcp", + "httpx_client_factory": create_custom_http_client, + }, + ) as server: + trace_id = gen_trace_id() + with trace(workflow_name="Custom HTTP Client Example", trace_id=trace_id): + print(f"View trace: https://platform.openai.com/logs/trace?trace_id={trace_id}\n") + await run_with_custom_client(server) + + +if __name__ == "__main__": + # Let's make sure the user has uv installed + if not shutil.which("uv"): + raise RuntimeError( + "uv is not installed. Please install it: https://docs.astral.sh/uv/getting-started/installation/" + ) + + # We'll run the Streamable HTTP server in a subprocess. Usually this would be a remote server, but for this + # demo, we'll run it locally at http://localhost:8000/mcp + process: subprocess.Popen[Any] | None = None + try: + this_dir = os.path.dirname(os.path.abspath(__file__)) + server_file = os.path.join(this_dir, "server.py") + + print("Starting Streamable HTTP server at http://localhost:8000/mcp ...") + + # Run `uv run server.py` to start the Streamable HTTP server + process = subprocess.Popen(["uv", "run", server_file]) + # Give it 3 seconds to start + time.sleep(3) + + print("Streamable HTTP server started. Running example...\n\n") + except Exception as e: + print(f"Error starting Streamable HTTP server: {e}") + exit(1) + + try: + asyncio.run(main()) + finally: + if process: + process.terminate() diff --git a/examples/mcp/streamablehttp_custom_client_example/server.py b/examples/mcp/streamablehttp_custom_client_example/server.py new file mode 100644 index 000000000..a078ee00f --- /dev/null +++ b/examples/mcp/streamablehttp_custom_client_example/server.py @@ -0,0 +1,23 @@ +import random + +from mcp.server.fastmcp import FastMCP + +# Create server +mcp = FastMCP("Echo Server") + + +@mcp.tool() +def add(a: int, b: int) -> int: + """Add two numbers""" + print(f"[debug-server] add({a}, {b})") + return a + b + + +@mcp.tool() +def get_secret_word() -> str: + print("[debug-server] get_secret_word()") + return random.choice(["apple", "banana", "cherry"]) + + +if __name__ == "__main__": + mcp.run(transport="streamable-http") diff --git a/src/agents/mcp/server.py b/src/agents/mcp/server.py index 0acb1345a..0dd493653 100644 --- a/src/agents/mcp/server.py +++ b/src/agents/mcp/server.py @@ -20,7 +20,7 @@ from ..exceptions import UserError from ..logger import logger from ..run_context import RunContextWrapper -from .util import ToolFilter, ToolFilterContext, ToolFilterStatic +from .util import HttpClientFactory, ToolFilter, ToolFilterContext, ToolFilterStatic T = TypeVar("T") @@ -575,6 +575,9 @@ class MCPServerStreamableHttpParams(TypedDict): terminate_on_close: NotRequired[bool] """Terminate on close""" + httpx_client_factory: NotRequired[HttpClientFactory] + """Custom HTTP client factory for configuring httpx.AsyncClient behavior.""" + class MCPServerStreamableHttp(_MCPServerWithClientSession): """MCP server implementation that uses the Streamable HTTP transport. See the [spec] @@ -597,9 +600,9 @@ def __init__( Args: params: The params that configure the server. This includes the URL of the server, - the headers to send to the server, the timeout for the HTTP request, and the - timeout for the Streamable HTTP connection and whether we need to - terminate on close. + the headers to send to the server, the timeout for the HTTP request, the + timeout for the Streamable HTTP connection, whether we need to + terminate on close, and an optional custom HTTP client factory. cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be cached and only fetched from the server once. If `False`, the tools list will be @@ -645,13 +648,24 @@ def create_streams( ] ]: """Create the streams for the server.""" - return streamablehttp_client( - url=self.params["url"], - headers=self.params.get("headers", None), - timeout=self.params.get("timeout", 5), - sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), - terminate_on_close=self.params.get("terminate_on_close", True), - ) + # Only pass httpx_client_factory if it's provided + if "httpx_client_factory" in self.params: + return streamablehttp_client( + url=self.params["url"], + headers=self.params.get("headers", None), + timeout=self.params.get("timeout", 5), + sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), + terminate_on_close=self.params.get("terminate_on_close", True), + httpx_client_factory=self.params["httpx_client_factory"], + ) + else: + return streamablehttp_client( + url=self.params["url"], + headers=self.params.get("headers", None), + timeout=self.params.get("timeout", 5), + sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5), + terminate_on_close=self.params.get("terminate_on_close", True), + ) @property def name(self) -> str: diff --git a/src/agents/mcp/util.py b/src/agents/mcp/util.py index 07c556439..6cfe5c96d 100644 --- a/src/agents/mcp/util.py +++ b/src/agents/mcp/util.py @@ -1,8 +1,9 @@ import functools import json from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, Union +import httpx from typing_extensions import NotRequired, TypedDict from .. import _debug @@ -21,6 +22,21 @@ from .server import MCPServer +class HttpClientFactory(Protocol): + """Protocol for HTTP client factory functions. + + This interface matches the MCP SDK's McpHttpClientFactory but is defined locally + to avoid accessing internal MCP SDK modules. + """ + + def __call__( + self, + headers: Optional[dict[str, str]] = None, + timeout: Optional[httpx.Timeout] = None, + auth: Optional[httpx.Auth] = None, + ) -> httpx.AsyncClient: ... + + @dataclass class ToolFilterContext: """Context information available to tool filter functions.""" diff --git a/tests/mcp/test_streamable_http_client_factory.py b/tests/mcp/test_streamable_http_client_factory.py new file mode 100644 index 000000000..f78807c13 --- /dev/null +++ b/tests/mcp/test_streamable_http_client_factory.py @@ -0,0 +1,247 @@ +"""Tests for MCPServerStreamableHttp httpx_client_factory functionality.""" + +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from agents.mcp import MCPServerStreamableHttp + + +class TestMCPServerStreamableHttpClientFactory: + """Test cases for custom httpx_client_factory parameter.""" + + @pytest.mark.asyncio + async def test_default_httpx_client_factory(self): + """Test that default behavior works when no custom factory is provided.""" + # Mock the streamablehttp_client to avoid actual network calls + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "http://localhost:8000/mcp", + "headers": {"Authorization": "Bearer token"}, + "timeout": 10, + } + ) + + # Create streams should not pass httpx_client_factory when not provided + server.create_streams() + + # Verify streamablehttp_client was called with correct parameters + mock_client.assert_called_once_with( + url="http://localhost:8000/mcp", + headers={"Authorization": "Bearer token"}, + timeout=10, + sse_read_timeout=300, # Default value + terminate_on_close=True, # Default value + # httpx_client_factory should not be passed when not provided + ) + + @pytest.mark.asyncio + async def test_custom_httpx_client_factory(self): + """Test that custom httpx_client_factory is passed correctly.""" + + # Create a custom factory function + def custom_factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + verify=False, # Disable SSL verification for testing + timeout=httpx.Timeout(60.0), + headers={"X-Custom-Header": "test"}, + ) + + # Mock the streamablehttp_client to avoid actual network calls + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "http://localhost:8000/mcp", + "headers": {"Authorization": "Bearer token"}, + "timeout": 10, + "httpx_client_factory": custom_factory, + } + ) + + # Create streams should pass the custom factory + server.create_streams() + + # Verify streamablehttp_client was called with the custom factory + mock_client.assert_called_once_with( + url="http://localhost:8000/mcp", + headers={"Authorization": "Bearer token"}, + timeout=10, + sse_read_timeout=300, # Default value + terminate_on_close=True, # Default value + httpx_client_factory=custom_factory, + ) + + @pytest.mark.asyncio + async def test_custom_httpx_client_factory_with_ssl_cert(self): + """Test custom factory with SSL certificate configuration.""" + + def ssl_cert_factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + verify="/path/to/cert.pem", # Custom SSL certificate + timeout=httpx.Timeout(120.0), + ) + + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "https://secure-server.com/mcp", + "timeout": 30, + "httpx_client_factory": ssl_cert_factory, + } + ) + + server.create_streams() + + mock_client.assert_called_once_with( + url="https://secure-server.com/mcp", + headers=None, + timeout=30, + sse_read_timeout=300, + terminate_on_close=True, + httpx_client_factory=ssl_cert_factory, + ) + + @pytest.mark.asyncio + async def test_custom_httpx_client_factory_with_proxy(self): + """Test custom factory with proxy configuration.""" + + def proxy_factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + proxy="http://proxy.example.com:8080", + timeout=httpx.Timeout(60.0), + ) + + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "http://localhost:8000/mcp", + "httpx_client_factory": proxy_factory, + } + ) + + server.create_streams() + + mock_client.assert_called_once_with( + url="http://localhost:8000/mcp", + headers=None, + timeout=5, # Default value + sse_read_timeout=300, + terminate_on_close=True, + httpx_client_factory=proxy_factory, + ) + + @pytest.mark.asyncio + async def test_custom_httpx_client_factory_with_retry_logic(self): + """Test custom factory with retry logic configuration.""" + + def retry_factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + timeout=httpx.Timeout(30.0), + # Note: httpx doesn't have built-in retry, but this shows how + # a custom factory could be used to configure retry behavior + # through middleware or other mechanisms + ) + + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "http://localhost:8000/mcp", + "httpx_client_factory": retry_factory, + } + ) + + server.create_streams() + + mock_client.assert_called_once_with( + url="http://localhost:8000/mcp", + headers=None, + timeout=5, + sse_read_timeout=300, + terminate_on_close=True, + httpx_client_factory=retry_factory, + ) + + def test_httpx_client_factory_type_annotation(self): + """Test that the type annotation is correct for httpx_client_factory.""" + from agents.mcp.server import MCPServerStreamableHttpParams + + # This test ensures the type annotation is properly set + # We can't easily test the TypedDict at runtime, but we can verify + # that the import works and the type is available + assert hasattr(MCPServerStreamableHttpParams, "__annotations__") + + # Verify that the httpx_client_factory parameter is in the annotations + annotations = MCPServerStreamableHttpParams.__annotations__ + assert "httpx_client_factory" in annotations + + # The annotation should contain the string representation of the type + annotation_str = str(annotations["httpx_client_factory"]) + assert "HttpClientFactory" in annotation_str + + @pytest.mark.asyncio + async def test_all_parameters_with_custom_factory(self): + """Test that all parameters work together with custom factory.""" + + def comprehensive_factory( + headers: dict[str, str] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient( + verify=False, + timeout=httpx.Timeout(90.0), + headers={"X-Test": "value"}, + ) + + with patch("agents.mcp.server.streamablehttp_client") as mock_client: + mock_client.return_value = MagicMock() + + server = MCPServerStreamableHttp( + params={ + "url": "https://api.example.com/mcp", + "headers": {"Authorization": "Bearer token"}, + "timeout": 45, + "sse_read_timeout": 600, + "terminate_on_close": False, + "httpx_client_factory": comprehensive_factory, + } + ) + + server.create_streams() + + mock_client.assert_called_once_with( + url="https://api.example.com/mcp", + headers={"Authorization": "Bearer token"}, + timeout=45, + sse_read_timeout=600, + terminate_on_close=False, + httpx_client_factory=comprehensive_factory, + )