Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Tests for StreamableHTTPSessionManager."""

from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, patch

import anyio
import pytest

from mcp.server import streamable_http_manager
from mcp.server.lowlevel import Server
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
Expand Down Expand Up @@ -197,3 +198,65 @@ async def mock_receive():
"Session ID should be removed from _server_instances after an exception"
)
assert not manager._server_instances, "No sessions should be tracked after the only session crashes"


@pytest.mark.anyio
async def test_stateless_requests_memory_cleanup():
"""Test that stateless requests actually clean up resources using real transports."""
app = Server("test-stateless-real-cleanup")
manager = StreamableHTTPSessionManager(app=app, stateless=True)

# Track created transport instances
created_transports = []

# Patch StreamableHTTPServerTransport constructor to track instances

original_constructor = streamable_http_manager.StreamableHTTPServerTransport

def track_transport(*args, **kwargs):
transport = original_constructor(*args, **kwargs)
created_transports.append(transport)
return transport

with patch.object(streamable_http_manager, "StreamableHTTPServerTransport", side_effect=track_transport):
async with manager.run():
# Mock app.run to complete immediately
app.run = AsyncMock(return_value=None)

# Send a simple request
sent_messages = []

async def mock_send(message):
sent_messages.append(message)

scope = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"accept", b"application/json, text/event-stream"),
],
}

# Empty body to trigger early return
async def mock_receive():
return {
"type": "http.request",
"body": b"",
"more_body": False,
}

# Send a request
await manager.handle_request(scope, mock_receive, mock_send)

# Verify transport was created
assert len(created_transports) == 1, "Should have created one transport"

transport = created_transports[0]

# The key assertion - transport should be terminated
assert transport._terminated, "Transport should be terminated after stateless request"

# Verify internal state is cleaned up
assert len(transport._request_streams) == 0, "Transport should have no active request streams"
Loading