Skip to content

Commit 2955d84

Browse files
committed
fix: handle ClosedResourceError in StreamableHTTP message router
1 parent 71889d7 commit 2955d84

File tree

2 files changed

+239
-0
lines changed

2 files changed

+239
-0
lines changed

src/mcp/server/streamable_http.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,11 @@ async def message_router():
875875
for message. Still processing message as the client
876876
might reconnect and replay."""
877877
)
878+
except anyio.ClosedResourceError:
879+
if self._terminated:
880+
logging.debug("Read stream closed by client")
881+
else:
882+
logging.exception("Unexpected closure of read stream in message router")
878883
except Exception:
879884
logger.exception("Error in message router")
880885

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
"""Test for issue #1363 - Race condition in StreamableHTTP transport causes ClosedResourceError.
2+
3+
This test reproduces the race condition described in issue #1363 where MCP servers
4+
in HTTP Streamable mode experience ClosedResourceError exceptions when requests
5+
fail validation early (e.g., due to incorrect Accept headers).
6+
7+
The race condition occurs because:
8+
1. Transport setup creates a message_router task
9+
2. Message router enters async for write_stream_reader loop
10+
3. write_stream_reader calls checkpoint() in receive(), yielding control
11+
4. Request handling processes HTTP request
12+
5. If validation fails early, request returns immediately
13+
6. Transport termination closes all streams including write_stream_reader
14+
7. Message router may still be in checkpoint() yield and hasn't returned to check stream state
15+
8. When message router resumes, it encounters a closed stream, raising ClosedResourceError
16+
"""
17+
18+
import socket
19+
import subprocess
20+
import sys
21+
import time
22+
from collections.abc import AsyncGenerator
23+
from contextlib import asynccontextmanager
24+
25+
import httpx
26+
import pytest
27+
import uvicorn
28+
from starlette.applications import Starlette
29+
from starlette.routing import Mount
30+
from starlette.types import Receive, Scope, Send
31+
32+
from mcp.server import Server
33+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
34+
from mcp.types import Tool
35+
36+
SERVER_NAME = "test_race_condition_server"
37+
38+
39+
def check_server_logs_for_errors(process, test_name: str):
40+
"""
41+
Check server logs for ClosedResourceError and other race condition errors.
42+
43+
Args:
44+
process: The server process
45+
test_name: Name of the test for better error messages
46+
"""
47+
# Get logs from the process
48+
try:
49+
stdout, stderr = process.communicate(timeout=10)
50+
server_logs = stderr + stdout
51+
except Exception:
52+
server_logs = ""
53+
54+
# Check for specific race condition errors
55+
errors_found = []
56+
57+
if "ClosedResourceError" in server_logs:
58+
errors_found.append("ClosedResourceError")
59+
60+
if "Error in message router" in server_logs:
61+
errors_found.append("Error in message router")
62+
63+
if "anyio.ClosedResourceError" in server_logs:
64+
errors_found.append("anyio.ClosedResourceError")
65+
66+
# Assert no race condition errors occurred
67+
if errors_found:
68+
error_msg = f"Test '{test_name}' found race condition errors: {', '.join(errors_found)}\n"
69+
error_msg += f"Server logs:\n{server_logs}"
70+
pytest.fail(error_msg)
71+
72+
# If we get here, no race condition errors were found
73+
print(f"✓ Test '{test_name}' passed: No race condition errors detected")
74+
75+
76+
@pytest.fixture
77+
def server_port() -> int:
78+
with socket.socket() as s:
79+
s.bind(("127.0.0.1", 0))
80+
return s.getsockname()[1]
81+
82+
83+
@pytest.fixture
84+
def server_url(server_port: int) -> str:
85+
return f"http://127.0.0.1:{server_port}"
86+
87+
88+
class RaceConditionTestServer(Server):
89+
def __init__(self):
90+
super().__init__(SERVER_NAME)
91+
92+
async def on_list_tools(self) -> list[Tool]:
93+
return []
94+
95+
96+
def run_server_with_logging(port: int):
97+
"""Run the StreamableHTTP server with logging to capture race condition errors."""
98+
app = RaceConditionTestServer()
99+
100+
# Create session manager
101+
session_manager = StreamableHTTPSessionManager(
102+
app=app,
103+
json_response=False,
104+
stateless=True, # Use stateless mode to trigger the race condition
105+
)
106+
107+
# Create the ASGI handler
108+
async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
109+
await session_manager.handle_request(scope, receive, send)
110+
111+
# Create Starlette app with lifespan
112+
@asynccontextmanager
113+
async def lifespan(app: Starlette) -> AsyncGenerator[None, None]:
114+
async with session_manager.run():
115+
yield
116+
117+
routes = [
118+
Mount("/", app=handle_streamable_http),
119+
]
120+
121+
starlette_app = Starlette(routes=routes, lifespan=lifespan)
122+
uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="debug")
123+
124+
125+
def start_server_process(port: int):
126+
"""Start server in a separate process."""
127+
# Create a temporary script to run the server
128+
import os
129+
import tempfile
130+
131+
script_content = f"""
132+
import sys
133+
sys.path.insert(0, '{os.getcwd()}')
134+
from tests.issues.test_1363_race_condition_streamable_http import run_server_with_logging
135+
run_server_with_logging({port})
136+
"""
137+
138+
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
139+
f.write(script_content)
140+
script_path = f.name
141+
142+
process = subprocess.Popen([sys.executable, script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
143+
144+
# Give server time to start
145+
time.sleep(1)
146+
return process
147+
148+
149+
@pytest.mark.anyio
150+
async def test_race_condition_invalid_accept_headers(server_port: int):
151+
"""
152+
Test the race condition with invalid Accept headers.
153+
154+
This test reproduces the exact scenario described in issue #1363:
155+
- Send POST request with incorrect Accept headers (missing either application/json or text/event-stream)
156+
- Request fails validation early and returns quickly
157+
- This should trigger the race condition where message_router encounters ClosedResourceError
158+
"""
159+
process = start_server_process(server_port)
160+
161+
try:
162+
# Test with missing text/event-stream in Accept header
163+
async with httpx.AsyncClient(timeout=5.0) as client:
164+
response = await client.post(
165+
f"http://127.0.0.1:{server_port}/",
166+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
167+
headers={
168+
"Accept": "application/json", # Missing text/event-stream
169+
"Content-Type": "application/json",
170+
},
171+
)
172+
# Should get 406 Not Acceptable due to missing text/event-stream
173+
assert response.status_code == 406
174+
175+
# Test with missing application/json in Accept header
176+
async with httpx.AsyncClient(timeout=5.0) as client:
177+
response = await client.post(
178+
f"http://127.0.0.1:{server_port}/",
179+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
180+
headers={
181+
"Accept": "text/event-stream", # Missing application/json
182+
"Content-Type": "application/json",
183+
},
184+
)
185+
# Should get 406 Not Acceptable due to missing application/json
186+
assert response.status_code == 406
187+
188+
# Test with completely invalid Accept header
189+
async with httpx.AsyncClient(timeout=5.0) as client:
190+
response = await client.post(
191+
f"http://127.0.0.1:{server_port}/",
192+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
193+
headers={
194+
"Accept": "text/plain", # Invalid Accept header
195+
"Content-Type": "application/json",
196+
},
197+
)
198+
# Should get 406 Not Acceptable
199+
assert response.status_code == 406
200+
201+
finally:
202+
process.terminate()
203+
process.wait()
204+
# Check server logs for race condition errors
205+
check_server_logs_for_errors(process, "test_race_condition_invalid_accept_headers")
206+
207+
208+
@pytest.mark.anyio
209+
async def test_race_condition_invalid_content_type(server_port: int):
210+
"""
211+
Test the race condition with invalid Content-Type headers.
212+
213+
This test reproduces the race condition scenario with Content-Type validation failure.
214+
"""
215+
process = start_server_process(server_port)
216+
217+
try:
218+
# Test with invalid Content-Type
219+
async with httpx.AsyncClient(timeout=5.0) as client:
220+
response = await client.post(
221+
f"http://127.0.0.1:{server_port}/",
222+
json={"jsonrpc": "2.0", "method": "initialize", "id": 1, "params": {}},
223+
headers={
224+
"Accept": "application/json, text/event-stream",
225+
"Content-Type": "text/plain", # Invalid Content-Type
226+
},
227+
)
228+
assert response.status_code == 400
229+
230+
finally:
231+
process.terminate()
232+
process.wait()
233+
# Check server logs for race condition errors
234+
check_server_logs_for_errors(process, "test_race_condition_invalid_content_type")

0 commit comments

Comments
 (0)