Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/google/adk/tools/mcp_tool/mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@
from typing import Any
from typing import Dict
from typing import Optional
from typing import Protocol
from typing import runtime_checkable
from typing import TextIO
from typing import Union

import anyio
from pydantic import BaseModel
from pydantic import ConfigDict

try:
from mcp import ClientSession
from mcp import StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import create_mcp_http_client
from mcp.client.streamable_http import McpHttpClientFactory
from mcp.client.streamable_http import streamablehttp_client
except ImportError as e:

Expand Down Expand Up @@ -84,6 +89,11 @@ class SseConnectionParams(BaseModel):
sse_read_timeout: float = 60 * 5.0


@runtime_checkable
class CheckableMcpHttpClientFactory(McpHttpClientFactory, Protocol):
pass


class StreamableHTTPConnectionParams(BaseModel):
"""Parameters for the MCP Streamable HTTP connection.

Expand All @@ -99,13 +109,18 @@ class StreamableHTTPConnectionParams(BaseModel):
Streamable HTTP server.
terminate_on_close: Whether to terminate the MCP Streamable HTTP server
when the connection is closed.
httpx_client_factory: Factory function to create a custom HTTPX client. If
not provided, a default factory will be used.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

url: str
headers: dict[str, Any] | None = None
timeout: float = 5.0
sse_read_timeout: float = 60 * 5.0
terminate_on_close: bool = True
httpx_client_factory: CheckableMcpHttpClientFactory = create_mcp_http_client


def retry_on_closed_resource(func):
Expand Down Expand Up @@ -285,6 +300,7 @@ def _create_client(self, merged_headers: Optional[Dict[str, str]] = None):
seconds=self._connection_params.sse_read_timeout
),
terminate_on_close=self._connection_params.terminate_on_close,
httpx_client_factory=self._connection_params.httpx_client_factory,
)
else:
raise ValueError(
Expand Down
53 changes: 53 additions & 0 deletions tests/unittests/tools/mcp_tool/test_mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,59 @@ def test_init_with_streamable_http_params(self):

assert manager._connection_params == http_params

@patch("google.adk.tools.mcp_tool.mcp_session_manager.streamablehttp_client")
def test_init_with_streamable_http_custom_httpx_factory(
self, mock_streamablehttp_client
):
"""Test that streamablehttp_client is called with custom httpx_client_factory."""
from datetime import timedelta

custom_httpx_factory = Mock()

http_params = StreamableHTTPConnectionParams(
url="https://example.com/mcp",
timeout=15.0,
httpx_client_factory=custom_httpx_factory,
)
manager = MCPSessionManager(http_params)

manager._create_client()

mock_streamablehttp_client.assert_called_once_with(
url="https://example.com/mcp",
headers=None,
timeout=timedelta(seconds=15.0),
sse_read_timeout=timedelta(seconds=300.0),
terminate_on_close=True,
httpx_client_factory=custom_httpx_factory,
)

@pytest.mark.asyncio
@patch("google.adk.tools.mcp_tool.mcp_session_manager.streamablehttp_client")
async def test_init_with_streamable_http_default_httpx_factory(
self, mock_streamablehttp_client
):
"""Test that streamablehttp_client is called with custom httpx_client_factory."""
from datetime import timedelta

from mcp.client.streamable_http import create_mcp_http_client

http_params = StreamableHTTPConnectionParams(
url="https://example.com/mcp", timeout=15.0
)
manager = MCPSessionManager(http_params)

manager._create_client()

mock_streamablehttp_client.assert_called_once_with(
url="https://example.com/mcp",
headers=None,
timeout=timedelta(seconds=15.0),
sse_read_timeout=timedelta(seconds=300.0),
terminate_on_close=True,
httpx_client_factory=create_mcp_http_client,
)

def test_generate_session_key_stdio(self):
"""Test session key generation for stdio connections."""
manager = MCPSessionManager(self.mock_stdio_connection_params)
Expand Down