diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index a406adfa3..9a4c695b8 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -1,10 +1,11 @@ """Tests for StreamableHTTPSessionManager.""" -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch import anyio import pytest +from mcp.server import streamable_http_manager from mcp.server.lowlevel import Server from mcp.server.streamable_http import MCP_SESSION_ID_HEADER from mcp.server.streamable_http_manager import StreamableHTTPSessionManager @@ -197,3 +198,65 @@ async def mock_receive(): "Session ID should be removed from _server_instances after an exception" ) assert not manager._server_instances, "No sessions should be tracked after the only session crashes" + + +@pytest.mark.anyio +async def test_stateless_requests_memory_cleanup(): + """Test that stateless requests actually clean up resources using real transports.""" + app = Server("test-stateless-real-cleanup") + manager = StreamableHTTPSessionManager(app=app, stateless=True) + + # Track created transport instances + created_transports = [] + + # Patch StreamableHTTPServerTransport constructor to track instances + + original_constructor = streamable_http_manager.StreamableHTTPServerTransport + + def track_transport(*args, **kwargs): + transport = original_constructor(*args, **kwargs) + created_transports.append(transport) + return transport + + with patch.object(streamable_http_manager, "StreamableHTTPServerTransport", side_effect=track_transport): + async with manager.run(): + # Mock app.run to complete immediately + app.run = AsyncMock(return_value=None) + + # Send a simple request + sent_messages = [] + + async def mock_send(message): + sent_messages.append(message) + + scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"content-type", b"application/json"), + (b"accept", b"application/json, text/event-stream"), + ], + } + + # Empty body to trigger early return + async def mock_receive(): + return { + "type": "http.request", + "body": b"", + "more_body": False, + } + + # Send a request + await manager.handle_request(scope, mock_receive, mock_send) + + # Verify transport was created + assert len(created_transports) == 1, "Should have created one transport" + + transport = created_transports[0] + + # The key assertion - transport should be terminated + assert transport._terminated, "Transport should be terminated after stateless request" + + # Verify internal state is cleaned up + assert len(transport._request_streams) == 0, "Transport should have no active request streams"