From 009fa8b66c69bc414a30232c590ad7e703d993b7 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sat, 23 Aug 2025 07:29:47 -0700 Subject: [PATCH 1/3] fix: tool cache refresh with nested handler invocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix MCP SDK issue #1298 where tool handlers fail to execute properly when streaming context is present in the parent process. The fix stores a direct reference to the list_tools function to avoid nested handler invocation which can disrupt async execution flow in streaming contexts. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/mcp/server/lowlevel/server.py | 18 +- tests/server/test_tool_cache_refresh_bug.py | 208 ++++++++++++++++++++ 2 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 tests/server/test_tool_cache_refresh_bug.py diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 8c459383c..be8e72686 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -150,6 +150,8 @@ def __init__( } self.notification_handlers: dict[type, Callable[..., Awaitable[None]]] = {} self._tool_cache: dict[str, types.Tool] = {} + # Store direct reference to list_tools function to avoid nested handler calls + self._list_tools_func: Callable[[], Awaitable[list[types.Tool]]] | None = None logger.debug("Initializing server %r", name) def create_initialization_options( @@ -383,6 +385,11 @@ async def handler(req: types.UnsubscribeRequest): def list_tools(self): def decorator(func: Callable[[], Awaitable[list[types.Tool]]]): logger.debug("Registering handler for ListToolsRequest") + + # Store direct reference to the function for cache refresh. + # This avoids nested handler invocation which can disrupt + # async execution flow in streaming contexts. + self._list_tools_func = func async def handler(_: Any): tools = await func() @@ -412,9 +419,15 @@ async def _get_cached_tool_definition(self, tool_name: str) -> types.Tool | None Returns the Tool object if found, None otherwise. """ if tool_name not in self._tool_cache: - if types.ListToolsRequest in self.request_handlers: + # Use direct function reference to avoid nested handler invocation + # which can disrupt async flow in streaming contexts + if self._list_tools_func is not None: logger.debug("Tool cache miss for %s, refreshing cache", tool_name) - await self.request_handlers[types.ListToolsRequest](None) + tools = await self._list_tools_func() + # Refresh the tool cache + self._tool_cache.clear() + for tool in tools: + self._tool_cache[tool.name] = tool tool = self._tool_cache.get(tool_name) if tool is None: @@ -458,7 +471,6 @@ async def handler(req: types.CallToolRequest): except jsonschema.ValidationError as e: return self._make_error_result(f"Input validation error: {e.message}") - # tool call results = await func(tool_name, arguments) # output normalization diff --git a/tests/server/test_tool_cache_refresh_bug.py b/tests/server/test_tool_cache_refresh_bug.py new file mode 100644 index 000000000..bbedc1c3e --- /dev/null +++ b/tests/server/test_tool_cache_refresh_bug.py @@ -0,0 +1,208 @@ +"""Test for tool cache refresh bug with nested handler invocation (issue #1298). + +This test verifies that cache refresh doesn't use nested handler invocation, +which can disrupt async execution in streaming contexts. +""" + +from typing import Any + +import anyio +import pytest + +from mcp.client.session import ClientSession +from mcp.server.lowlevel import Server +from mcp.types import ListToolsRequest, TextContent, Tool + + +@pytest.mark.anyio +async def test_no_nested_handler_invocation_on_cache_refresh(): + """Verify that cache refresh doesn't use nested handler invocation. + + Issue #1298: Tool handlers can fail when cache refresh triggers + nested handler invocation via self.request_handlers[ListToolsRequest](None), + which disrupts async execution flow in streaming contexts. + + This test verifies the fix by detecting whether nested handler + invocation occurs during cache refresh. + """ + server = Server("test-server") + + # Track handler invocations + handler_invocations = [] + + @server.list_tools() + async def list_tools(): + # Normal tool listing + await anyio.sleep(0.001) + return [ + Tool( + name="test_tool", + description="Test tool", + inputSchema={"type": "object", "properties": {}} + ) + ] + + @server.call_tool() + async def call_tool(name: str, arguments: dict[str, Any]): + # Simple tool implementation + return [TextContent(type="text", text="Tool result")] + + # Intercept the ListToolsRequest handler to detect nested invocation + original_handler = None + + def setup_handler_interceptor(): + nonlocal original_handler + original_handler = server.request_handlers.get(ListToolsRequest) + + async def interceptor(req): + # Track the invocation + # req is None for nested invocations (the problematic pattern) + # req is a proper request object for normal invocations + if req is None: + handler_invocations.append("nested") + else: + handler_invocations.append("normal") + + # Call the original handler + if original_handler: + return await original_handler(req) + return None + + server.request_handlers[ListToolsRequest] = interceptor + + # Set up the interceptor after decorators have run + setup_handler_interceptor() + + # Setup communication channels + from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream + from mcp.shared.message import SessionMessage + + server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[SessionMessage](10) + client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run( + client_to_server_receive, + server_to_client_send, + server.create_initialization_options() + ) + + async with anyio.create_task_group() as tg: + tg.start_soon(run_server) + + async with ClientSession(server_to_client_receive, client_to_server_send) as session: + await session.initialize() + + # Clear the cache to force a refresh on next tool call + server._tool_cache.clear() + + # Make a tool call - this should trigger cache refresh + result = await session.call_tool("test_tool", {}) + + # Verify the tool call succeeded + assert result is not None + assert not result.isError + assert result.content[0].text == "Tool result" + + # Check if nested handler invocation occurred + has_nested_invocation = "nested" in handler_invocations + + # The bug is present if nested handler invocation occurs + assert not has_nested_invocation, ( + "Nested handler invocation detected during cache refresh. " + "This pattern (calling request_handlers[ListToolsRequest](None)) " + "can disrupt async execution in streaming contexts (issue #1298)." + ) + + tg.cancel_scope.cancel() + + +@pytest.mark.anyio +async def test_concurrent_cache_refresh_safety(): + """Verify that concurrent tool calls with cache refresh work correctly. + + Multiple concurrent tool calls that all trigger cache refresh should + not cause issues or result in nested handler invocations. + """ + server = Server("test-server") + + # Track concurrent handler invocations + nested_invocations = 0 + + @server.list_tools() + async def list_tools(): + await anyio.sleep(0.01) # Simulate some async work + return [ + Tool( + name=f"tool_{i}", + description=f"Tool {i}", + inputSchema={"type": "object", "properties": {}} + ) + for i in range(3) + ] + + @server.call_tool() + async def call_tool(name: str, arguments: dict[str, Any]): + await anyio.sleep(0.001) + return [TextContent(type="text", text=f"Result from {name}")] + + # Intercept handler to detect nested invocations + original_handler = server.request_handlers.get(ListToolsRequest) + + async def interceptor(req): + nonlocal nested_invocations + if req is None: + nested_invocations += 1 + if original_handler: + return await original_handler(req) + return None + + if original_handler: + server.request_handlers[ListToolsRequest] = interceptor + + # Setup communication + from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream + from mcp.shared.message import SessionMessage + + server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[SessionMessage](10) + client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run( + client_to_server_receive, + server_to_client_send, + server.create_initialization_options() + ) + + async with anyio.create_task_group() as tg: + tg.start_soon(run_server) + + async with ClientSession(server_to_client_receive, client_to_server_send) as session: + await session.initialize() + + # Clear cache to force refresh + server._tool_cache.clear() + + # Make concurrent tool calls + import asyncio + results = await asyncio.gather( + session.call_tool("tool_0", {}), + session.call_tool("tool_1", {}), + session.call_tool("tool_2", {}), + return_exceptions=True + ) + + # Verify all calls succeeded + for i, result in enumerate(results): + assert not isinstance(result, Exception), f"Tool {i} failed: {result}" + assert not result.isError + assert f"tool_{i}" in result.content[0].text + + # Verify no nested invocations occurred + assert nested_invocations == 0, ( + f"Detected {nested_invocations} nested handler invocations " + "during concurrent cache refresh. This indicates the bug from " + "issue #1298 is present." + ) + + tg.cancel_scope.cancel() \ No newline at end of file From ef3c20199445c4f22e4b8495abb36b3583a0022f Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sat, 23 Aug 2025 12:38:42 -0700 Subject: [PATCH 2/3] style: fix pre-commit formatting issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove trailing whitespace - Consolidate multi-line Tool() constructors - Remove unused imports from anyio.streams.memory - Add type annotations for test interceptor functions - Apply ruff formatting rules 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/mcp/server/lowlevel/server.py | 2 +- tests/server/test_tool_cache_refresh_bug.py | 129 +++++++++----------- 2 files changed, 58 insertions(+), 73 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index be8e72686..58d47afbf 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -385,7 +385,7 @@ async def handler(req: types.UnsubscribeRequest): def list_tools(self): def decorator(func: Callable[[], Awaitable[list[types.Tool]]]): logger.debug("Registering handler for ListToolsRequest") - + # Store direct reference to the function for cache refresh. # This avoids nested handler invocation which can disrupt # async execution flow in streaming contexts. diff --git a/tests/server/test_tool_cache_refresh_bug.py b/tests/server/test_tool_cache_refresh_bug.py index bbedc1c3e..30603d035 100644 --- a/tests/server/test_tool_cache_refresh_bug.py +++ b/tests/server/test_tool_cache_refresh_bug.py @@ -4,7 +4,7 @@ which can disrupt async execution in streaming contexts. """ -from typing import Any +from typing import Any, cast import anyio import pytest @@ -17,44 +17,38 @@ @pytest.mark.anyio async def test_no_nested_handler_invocation_on_cache_refresh(): """Verify that cache refresh doesn't use nested handler invocation. - + Issue #1298: Tool handlers can fail when cache refresh triggers nested handler invocation via self.request_handlers[ListToolsRequest](None), which disrupts async execution flow in streaming contexts. - + This test verifies the fix by detecting whether nested handler invocation occurs during cache refresh. """ server = Server("test-server") - + # Track handler invocations handler_invocations = [] - + @server.list_tools() async def list_tools(): # Normal tool listing await anyio.sleep(0.001) - return [ - Tool( - name="test_tool", - description="Test tool", - inputSchema={"type": "object", "properties": {}} - ) - ] - + return [Tool(name="test_tool", description="Test tool", inputSchema={"type": "object", "properties": {}})] + @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]): # Simple tool implementation return [TextContent(type="text", text="Tool result")] - + # Intercept the ListToolsRequest handler to detect nested invocation original_handler = None - + def setup_handler_interceptor(): nonlocal original_handler original_handler = server.request_handlers.get(ListToolsRequest) - - async def interceptor(req): + + async def interceptor(req: Any) -> Any: # Track the invocation # req is None for nested invocations (the problematic pattern) # req is a proper request object for normal invocations @@ -62,147 +56,138 @@ async def interceptor(req): handler_invocations.append("nested") else: handler_invocations.append("normal") - + # Call the original handler if original_handler: return await original_handler(req) return None - - server.request_handlers[ListToolsRequest] = interceptor - + + server.request_handlers[ListToolsRequest] = cast(Any, interceptor) + # Set up the interceptor after decorators have run setup_handler_interceptor() - + # Setup communication channels - from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp.shared.message import SessionMessage - + server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[SessionMessage](10) client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[SessionMessage](10) - + async def run_server(): - await server.run( - client_to_server_receive, - server_to_client_send, - server.create_initialization_options() - ) - + await server.run(client_to_server_receive, server_to_client_send, server.create_initialization_options()) + async with anyio.create_task_group() as tg: tg.start_soon(run_server) - + async with ClientSession(server_to_client_receive, client_to_server_send) as session: await session.initialize() - + # Clear the cache to force a refresh on next tool call server._tool_cache.clear() - + # Make a tool call - this should trigger cache refresh result = await session.call_tool("test_tool", {}) - + # Verify the tool call succeeded assert result is not None assert not result.isError - assert result.content[0].text == "Tool result" - + content = result.content[0] + assert isinstance(content, TextContent) + assert content.text == "Tool result" + # Check if nested handler invocation occurred has_nested_invocation = "nested" in handler_invocations - + # The bug is present if nested handler invocation occurs assert not has_nested_invocation, ( "Nested handler invocation detected during cache refresh. " "This pattern (calling request_handlers[ListToolsRequest](None)) " "can disrupt async execution in streaming contexts (issue #1298)." ) - + tg.cancel_scope.cancel() @pytest.mark.anyio async def test_concurrent_cache_refresh_safety(): """Verify that concurrent tool calls with cache refresh work correctly. - + Multiple concurrent tool calls that all trigger cache refresh should not cause issues or result in nested handler invocations. """ server = Server("test-server") - + # Track concurrent handler invocations nested_invocations = 0 - + @server.list_tools() async def list_tools(): await anyio.sleep(0.01) # Simulate some async work return [ - Tool( - name=f"tool_{i}", - description=f"Tool {i}", - inputSchema={"type": "object", "properties": {}} - ) + Tool(name=f"tool_{i}", description=f"Tool {i}", inputSchema={"type": "object", "properties": {}}) for i in range(3) ] - + @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]): await anyio.sleep(0.001) return [TextContent(type="text", text=f"Result from {name}")] - + # Intercept handler to detect nested invocations original_handler = server.request_handlers.get(ListToolsRequest) - - async def interceptor(req): + + async def interceptor(req: Any) -> Any: nonlocal nested_invocations if req is None: nested_invocations += 1 if original_handler: return await original_handler(req) return None - + if original_handler: - server.request_handlers[ListToolsRequest] = interceptor - + server.request_handlers[ListToolsRequest] = cast(Any, interceptor) + # Setup communication - from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp.shared.message import SessionMessage - + server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[SessionMessage](10) client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[SessionMessage](10) - + async def run_server(): - await server.run( - client_to_server_receive, - server_to_client_send, - server.create_initialization_options() - ) - + await server.run(client_to_server_receive, server_to_client_send, server.create_initialization_options()) + async with anyio.create_task_group() as tg: tg.start_soon(run_server) - + async with ClientSession(server_to_client_receive, client_to_server_send) as session: await session.initialize() - + # Clear cache to force refresh server._tool_cache.clear() - + # Make concurrent tool calls import asyncio + results = await asyncio.gather( session.call_tool("tool_0", {}), session.call_tool("tool_1", {}), session.call_tool("tool_2", {}), - return_exceptions=True + return_exceptions=True, ) - + # Verify all calls succeeded for i, result in enumerate(results): assert not isinstance(result, Exception), f"Tool {i} failed: {result}" assert not result.isError - assert f"tool_{i}" in result.content[0].text - + content = result.content[0] + assert isinstance(content, TextContent) + assert f"tool_{i}" in content.text + # Verify no nested invocations occurred assert nested_invocations == 0, ( f"Detected {nested_invocations} nested handler invocations " "during concurrent cache refresh. This indicates the bug from " "issue #1298 is present." ) - - tg.cancel_scope.cancel() \ No newline at end of file + + tg.cancel_scope.cancel() From c800467616d32240939234c433fa42d3791ea404 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sun, 24 Aug 2025 08:30:08 -0700 Subject: [PATCH 3/3] fix: resolve pyright type errors in test file - Add type annotation for handler_invocations list - Import CallToolResult for proper type narrowing - Add explicit isinstance check for asyncio.gather results - Fixes type errors that were preventing CI from passing --- tests/server/test_tool_cache_refresh_bug.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/server/test_tool_cache_refresh_bug.py b/tests/server/test_tool_cache_refresh_bug.py index 30603d035..11d296707 100644 --- a/tests/server/test_tool_cache_refresh_bug.py +++ b/tests/server/test_tool_cache_refresh_bug.py @@ -11,7 +11,7 @@ from mcp.client.session import ClientSession from mcp.server.lowlevel import Server -from mcp.types import ListToolsRequest, TextContent, Tool +from mcp.types import CallToolResult, ListToolsRequest, TextContent, Tool @pytest.mark.anyio @@ -28,7 +28,7 @@ async def test_no_nested_handler_invocation_on_cache_refresh(): server = Server("test-server") # Track handler invocations - handler_invocations = [] + handler_invocations: list[str] = [] @server.list_tools() async def list_tools(): @@ -178,6 +178,8 @@ async def run_server(): # Verify all calls succeeded for i, result in enumerate(results): assert not isinstance(result, Exception), f"Tool {i} failed: {result}" + # Type narrowing: result is CallToolResult at this point, not Exception + assert isinstance(result, CallToolResult) assert not result.isError content = result.content[0] assert isinstance(content, TextContent)