Skip to content

Commit d1d4fb2

Browse files
committed
Fix SSE parsing of Unicode line separator characters
1 parent 80c0d23 commit d1d4fb2

File tree

3 files changed

+342
-2
lines changed

3 files changed

+342
-2
lines changed

src/mcp/client/sse.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from collections.abc import AsyncIterator
23
from contextlib import asynccontextmanager
34
from typing import Any
45
from urllib.parse import urljoin, urlparse
@@ -7,7 +8,8 @@
78
import httpx
89
from anyio.abc import TaskStatus
910
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
10-
from httpx_sse import aconnect_sse
11+
from httpx_sse import EventSource, ServerSentEvent, aconnect_sse
12+
from httpx_sse._decoders import SSEDecoder
1113

1214
import mcp.types as types
1315
from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client
@@ -18,6 +20,43 @@
1820

1921
def remove_request_params(url: str) -> str:
2022
return urljoin(url, urlparse(url).path)
23+
24+
async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[ServerSentEvent]:
25+
"""
26+
Safely iterate over SSE events, working around httpx issue where U+2028 and U+2029
27+
are incorrectly treated as newlines, breaking SSE stream parsing.
28+
29+
This function replaces event_source.aiter_sse() to handle these Unicode characters
30+
correctly by processing the raw byte stream and only splitting on actual newlines.
31+
32+
Args:
33+
event_source: The EventSource to iterate over
34+
35+
Yields:
36+
ServerSentEvent objects parsed from the stream
37+
"""
38+
decoder = SSEDecoder()
39+
buffer = b""
40+
41+
async for chunk in event_source.response.aiter_bytes():
42+
buffer += chunk
43+
44+
# Split on "\n" only (not U+2028/U+2029 or other anything else)
45+
# https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
46+
while b"\n" in buffer:
47+
line_bytes, buffer = buffer.split(b"\n", 1)
48+
line = line_bytes.decode('utf-8', errors='replace').rstrip("\r")
49+
sse = decoder.decode(line)
50+
if sse is not None:
51+
yield sse
52+
53+
# Process any remaining data in buffer
54+
if buffer:
55+
assert b"\n" not in buffer
56+
line = buffer.decode('utf-8', errors='replace').rstrip("\r")
57+
sse = decoder.decode(line)
58+
if sse is not None:
59+
yield sse
2160

2261

2362
@asynccontextmanager
@@ -69,7 +108,8 @@ async def sse_reader(
69108
task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED,
70109
):
71110
try:
72-
async for sse in event_source.aiter_sse():
111+
# Use our compliant SSE iterator to handle Unicode correctly (issue #1356)
112+
async for sse in compliant_aiter_sse(event_source):
73113
logger.debug(f"Received SSE event: {sse.event}")
74114
match sse.event:
75115
case "endpoint":

tests/client/test_sse_unicode.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""Test for SSE client Unicode handling."""
2+
3+
from collections.abc import AsyncIterator
4+
from unittest.mock import AsyncMock, MagicMock
5+
6+
import pytest
7+
from httpx_sse import EventSource
8+
9+
from mcp.client.sse import compliant_aiter_sse
10+
11+
pytestmark = pytest.mark.anyio
12+
13+
14+
def create_mock_event_source(data_chunks: list[bytes]) -> EventSource:
15+
"""Create a mock EventSource that yields the given data chunks."""
16+
event_source = MagicMock(spec=EventSource)
17+
response = AsyncMock()
18+
event_source.response = response
19+
20+
async def mock_aiter_bytes() -> AsyncIterator[bytes]:
21+
for chunk in data_chunks:
22+
yield chunk
23+
24+
response.aiter_bytes = mock_aiter_bytes
25+
return event_source
26+
27+
28+
async def test_compliant_aiter_sse_handles_unicode_line_separators():
29+
"""Test that compliant_aiter_sse correctly handles U+2028 and U+2029 characters."""
30+
31+
# Simulate SSE data with U+2028 in JSON content
32+
# The server sends: event: message\ndata: {"text":"Hello\u2028World"}\n\n
33+
test_data = [
34+
b'event: message\n',
35+
b'data: {"text":"Hello',
36+
b'\xe2\x80\xa8', # UTF-8 encoding of U+2028
37+
b'World"}\n',
38+
b'\n',
39+
]
40+
41+
event_source = create_mock_event_source(test_data)
42+
43+
# Collect events
44+
events = [event async for event in compliant_aiter_sse(event_source)]
45+
46+
# Should receive one message event
47+
assert len(events) == 1
48+
assert events[0].event == "message"
49+
# The U+2028 should be preserved in the data
50+
assert '\u2028' in events[0].data
51+
assert events[0].data == '{"text":"Hello\u2028World"}'
52+
53+
54+
async def test_compliant_aiter_sse_handles_paragraph_separator():
55+
"""Test that compliant_aiter_sse correctly handles U+2029 (PARAGRAPH SEPARATOR)."""
56+
57+
# Simulate SSE data with U+2029
58+
test_data = [
59+
b'event: test\ndata: Line1',
60+
b'\xe2\x80\xa9', # UTF-8 encoding of U+2029
61+
b'Line2\n\n',
62+
]
63+
64+
event_source = create_mock_event_source(test_data)
65+
66+
events = [event async for event in compliant_aiter_sse(event_source)]
67+
68+
assert len(events) == 1
69+
assert events[0].event == "test"
70+
# U+2029 should be preserved, not treated as a newline
71+
assert '\u2029' in events[0].data
72+
assert events[0].data == 'Line1\u2029Line2'
73+
74+
75+
async def test_compliant_aiter_sse_handles_crlf():
76+
"""Test that compliant_aiter_sse correctly handles \\r\\n line endings."""
77+
78+
# Simulate SSE data with CRLF line endings
79+
test_data = [
80+
b'event: message\r\n',
81+
b'data: test data\r\n',
82+
b'\r\n',
83+
]
84+
85+
event_source = create_mock_event_source(test_data)
86+
87+
events = [event async for event in compliant_aiter_sse(event_source)]
88+
89+
assert len(events) == 1
90+
assert events[0].event == "message"
91+
assert events[0].data == "test data"
92+
93+
94+
async def test_compliant_aiter_sse_handles_split_utf8():
95+
"""Test that compliant_aiter_sse handles UTF-8 characters split across chunks."""
96+
97+
# Split a UTF-8 emoji (🎉 = \xf0\x9f\x8e\x89) across chunks
98+
test_data = [
99+
b'event: message\n',
100+
b'data: Party ',
101+
b'\xf0\x9f', # First half of emoji
102+
b'\x8e\x89', # Second half of emoji
103+
b' time!\n\n',
104+
]
105+
106+
event_source = create_mock_event_source(test_data)
107+
108+
events = [event async for event in compliant_aiter_sse(event_source)]
109+
110+
assert len(events) == 1
111+
assert events[0].event == "message"
112+
assert events[0].data == "Party 🎉 time!"
113+
114+
115+
async def test_compliant_aiter_sse_handles_multiple_events():
116+
"""Test that compliant_aiter_sse correctly handles multiple SSE events."""
117+
118+
# Multiple events with problematic Unicode
119+
test_data = [
120+
b'event: first\ndata: Hello\xe2\x80\xa8World\n\n',
121+
b'event: second\ndata: Test\xe2\x80\xa9Data\n\n',
122+
b'data: No event name\n\n',
123+
]
124+
125+
event_source = create_mock_event_source(test_data)
126+
127+
events = [event async for event in compliant_aiter_sse(event_source)]
128+
129+
assert len(events) == 3
130+
131+
assert events[0].event == "first"
132+
assert '\u2028' in events[0].data
133+
134+
assert events[1].event == "second"
135+
assert '\u2029' in events[1].data
136+
137+
# Default event type is "message"
138+
assert events[2].event == "message"
139+
assert events[2].data == "No event name"
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Test for issue #1356: SSE parsing fails with Unicode line separator characters."""
2+
3+
import multiprocessing
4+
import socket
5+
import time
6+
from collections.abc import Generator
7+
from typing import Any
8+
9+
import anyio
10+
import pytest
11+
import uvicorn
12+
from starlette.applications import Starlette
13+
from starlette.requests import Request
14+
from starlette.responses import Response
15+
from starlette.routing import Mount, Route
16+
17+
from mcp.client.session import ClientSession
18+
from mcp.client.sse import sse_client
19+
from mcp.server import Server
20+
from mcp.server.sse import SseServerTransport
21+
from mcp.server.transport_security import TransportSecuritySettings
22+
from mcp.shared.exceptions import McpError
23+
from mcp.types import TextContent, Tool
24+
25+
pytestmark = pytest.mark.anyio
26+
27+
28+
class ProblematicUnicodeServer(Server):
29+
"""Test server that returns problematic Unicode characters."""
30+
31+
def __init__(self):
32+
super().__init__("ProblematicUnicodeServer")
33+
34+
@self.list_tools()
35+
async def handle_list_tools() -> list[Tool]:
36+
return [
37+
Tool(
38+
name="get_problematic_unicode",
39+
description="Returns text with problematic Unicode character U+2028",
40+
inputSchema={"type": "object", "properties": {}},
41+
)
42+
]
43+
44+
@self.call_tool()
45+
async def handle_call_tool(name: str, args: dict[str, Any]) -> list[TextContent]:
46+
if name == "get_problematic_unicode":
47+
# Return text with U+2028 (LINE SEPARATOR) which can cause JSON parsing issues
48+
# U+2028 is a valid Unicode character but can break JSON parsing in some contexts
49+
problematic_text = "This text contains a line separator\u2028character that may break JSON parsing"
50+
return [TextContent(type="text", text=problematic_text)]
51+
return [TextContent(type="text", text=f"Unknown tool: {name}")]
52+
53+
54+
def make_problematic_server_app() -> Starlette:
55+
"""Create test Starlette app with SSE transport."""
56+
security_settings = TransportSecuritySettings(
57+
allowed_hosts=["127.0.0.1:*", "localhost:*"],
58+
allowed_origins=["http://127.0.0.1:*", "http://localhost:*"],
59+
)
60+
sse = SseServerTransport("/messages/", security_settings=security_settings)
61+
server = ProblematicUnicodeServer()
62+
63+
async def handle_sse(request: Request) -> Response:
64+
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
65+
await server.run(streams[0], streams[1], server.create_initialization_options())
66+
return Response()
67+
68+
app = Starlette(
69+
routes=[
70+
Route("/sse", endpoint=handle_sse),
71+
Mount("/messages/", app=sse.handle_post_message),
72+
]
73+
)
74+
75+
return app
76+
77+
78+
def run_problematic_server(server_port: int) -> None:
79+
"""Run the problematic Unicode test server."""
80+
app = make_problematic_server_app()
81+
server = uvicorn.Server(
82+
config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error")
83+
)
84+
server.run()
85+
86+
87+
@pytest.fixture
88+
def problematic_server_port() -> int:
89+
"""Get an available port for the test server."""
90+
with socket.socket() as s:
91+
s.bind(("127.0.0.1", 0))
92+
return s.getsockname()[1]
93+
94+
95+
@pytest.fixture
96+
def problematic_server(problematic_server_port: int) -> Generator[str, None, None]:
97+
"""Start the problematic Unicode test server in a separate process."""
98+
proc = multiprocessing.Process(
99+
target=run_problematic_server, kwargs={"server_port": problematic_server_port}, daemon=True
100+
)
101+
proc.start()
102+
103+
# Wait for server to be running
104+
max_attempts = 20
105+
attempt = 0
106+
while attempt < max_attempts:
107+
try:
108+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
109+
s.connect(("127.0.0.1", problematic_server_port))
110+
break
111+
except ConnectionRefusedError:
112+
time.sleep(0.1)
113+
attempt += 1
114+
else:
115+
raise RuntimeError(f"Server failed to start after {max_attempts} attempts")
116+
117+
yield f"http://127.0.0.1:{problematic_server_port}"
118+
119+
# Clean up
120+
proc.kill()
121+
proc.join(timeout=2)
122+
123+
124+
async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> None:
125+
"""Test that special Unicode characters like U+2028 are handled properly.
126+
127+
This test reproduces issue #1356 where special Unicode characters
128+
cause JSON parsing to fail and the raw exception is sent to the stream,
129+
preventing proper error handling.
130+
"""
131+
# Connect to the server using SSE client
132+
async with sse_client(problematic_server + "/sse") as streams:
133+
async with ClientSession(*streams) as session:
134+
# Initialize the connection
135+
result = await session.initialize()
136+
assert result.serverInfo.name == "ProblematicUnicodeServer"
137+
138+
# Call the tool that returns problematic Unicode
139+
# This should succeed and not hang
140+
141+
# Use a timeout to detect if we're hanging
142+
with anyio.fail_after(5): # 5 second timeout
143+
try:
144+
response = await session.call_tool("get_problematic_unicode", {})
145+
146+
# If we get here, the Unicode was handled properly
147+
assert len(response.content) == 1
148+
text_content = response.content[0]
149+
assert hasattr(text_content, "text"), f"Response doesn't have text: {text_content}"
150+
151+
expected = "This text contains a line separator\u2028character that may break JSON parsing"
152+
assert text_content.text == expected, f"Expected: {expected!r}, Got: {text_content.text!r}"
153+
154+
except McpError:
155+
pytest.fail("Unexpected error with tool call")
156+
except TimeoutError:
157+
# If we timeout, the issue is confirmed - the client hangs
158+
pytest.fail("Client hangs when handling problematic Unicode (issue #1356 confirmed)")
159+
except Exception as e:
160+
# We should not get raw exceptions - they should be wrapped as McpError
161+
pytest.fail(f"Got raw exception instead of McpError: {type(e).__name__}: {e}")

0 commit comments

Comments
 (0)