From d1d4fb272e3971b205e8e5885628181808f035bc Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 13:05:51 +0100 Subject: [PATCH 1/8] Fix SSE parsing of Unicode line separator characters --- src/mcp/client/sse.py | 44 ++++- tests/client/test_sse_unicode.py | 139 +++++++++++++++ .../test_1356_sse_parsing_line_separator.py | 161 ++++++++++++++++++ 3 files changed, 342 insertions(+), 2 deletions(-) create mode 100644 tests/client/test_sse_unicode.py create mode 100644 tests/issues/test_1356_sse_parsing_line_separator.py diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 7ca8d19afd..5029637a30 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -1,4 +1,5 @@ import logging +from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any from urllib.parse import urljoin, urlparse @@ -7,7 +8,8 @@ import httpx from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from httpx_sse import aconnect_sse +from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from httpx_sse._decoders import SSEDecoder import mcp.types as types from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client @@ -18,6 +20,43 @@ def remove_request_params(url: str) -> str: return urljoin(url, urlparse(url).path) + +async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[ServerSentEvent]: + """ + Safely iterate over SSE events, working around httpx issue where U+2028 and U+2029 + are incorrectly treated as newlines, breaking SSE stream parsing. + + This function replaces event_source.aiter_sse() to handle these Unicode characters + correctly by processing the raw byte stream and only splitting on actual newlines. + + Args: + event_source: The EventSource to iterate over + + Yields: + ServerSentEvent objects parsed from the stream + """ + decoder = SSEDecoder() + buffer = b"" + + async for chunk in event_source.response.aiter_bytes(): + buffer += chunk + + # Split on "\n" only (not U+2028/U+2029 or other anything else) + # https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream + while b"\n" in buffer: + line_bytes, buffer = buffer.split(b"\n", 1) + line = line_bytes.decode('utf-8', errors='replace').rstrip("\r") + sse = decoder.decode(line) + if sse is not None: + yield sse + + # Process any remaining data in buffer + if buffer: + assert b"\n" not in buffer + line = buffer.decode('utf-8', errors='replace').rstrip("\r") + sse = decoder.decode(line) + if sse is not None: + yield sse @asynccontextmanager @@ -69,7 +108,8 @@ async def sse_reader( task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED, ): try: - async for sse in event_source.aiter_sse(): + # Use our compliant SSE iterator to handle Unicode correctly (issue #1356) + async for sse in compliant_aiter_sse(event_source): logger.debug(f"Received SSE event: {sse.event}") match sse.event: case "endpoint": diff --git a/tests/client/test_sse_unicode.py b/tests/client/test_sse_unicode.py new file mode 100644 index 0000000000..4a008257c3 --- /dev/null +++ b/tests/client/test_sse_unicode.py @@ -0,0 +1,139 @@ +"""Test for SSE client Unicode handling.""" + +from collections.abc import AsyncIterator +from unittest.mock import AsyncMock, MagicMock + +import pytest +from httpx_sse import EventSource + +from mcp.client.sse import compliant_aiter_sse + +pytestmark = pytest.mark.anyio + + +def create_mock_event_source(data_chunks: list[bytes]) -> EventSource: + """Create a mock EventSource that yields the given data chunks.""" + event_source = MagicMock(spec=EventSource) + response = AsyncMock() + event_source.response = response + + async def mock_aiter_bytes() -> AsyncIterator[bytes]: + for chunk in data_chunks: + yield chunk + + response.aiter_bytes = mock_aiter_bytes + return event_source + + +async def test_compliant_aiter_sse_handles_unicode_line_separators(): + """Test that compliant_aiter_sse correctly handles U+2028 and U+2029 characters.""" + + # Simulate SSE data with U+2028 in JSON content + # The server sends: event: message\ndata: {"text":"Hello\u2028World"}\n\n + test_data = [ + b'event: message\n', + b'data: {"text":"Hello', + b'\xe2\x80\xa8', # UTF-8 encoding of U+2028 + b'World"}\n', + b'\n', + ] + + event_source = create_mock_event_source(test_data) + + # Collect events + events = [event async for event in compliant_aiter_sse(event_source)] + + # Should receive one message event + assert len(events) == 1 + assert events[0].event == "message" + # The U+2028 should be preserved in the data + assert '\u2028' in events[0].data + assert events[0].data == '{"text":"Hello\u2028World"}' + + +async def test_compliant_aiter_sse_handles_paragraph_separator(): + """Test that compliant_aiter_sse correctly handles U+2029 (PARAGRAPH SEPARATOR).""" + + # Simulate SSE data with U+2029 + test_data = [ + b'event: test\ndata: Line1', + b'\xe2\x80\xa9', # UTF-8 encoding of U+2029 + b'Line2\n\n', + ] + + event_source = create_mock_event_source(test_data) + + events = [event async for event in compliant_aiter_sse(event_source)] + + assert len(events) == 1 + assert events[0].event == "test" + # U+2029 should be preserved, not treated as a newline + assert '\u2029' in events[0].data + assert events[0].data == 'Line1\u2029Line2' + + +async def test_compliant_aiter_sse_handles_crlf(): + """Test that compliant_aiter_sse correctly handles \\r\\n line endings.""" + + # Simulate SSE data with CRLF line endings + test_data = [ + b'event: message\r\n', + b'data: test data\r\n', + b'\r\n', + ] + + event_source = create_mock_event_source(test_data) + + events = [event async for event in compliant_aiter_sse(event_source)] + + assert len(events) == 1 + assert events[0].event == "message" + assert events[0].data == "test data" + + +async def test_compliant_aiter_sse_handles_split_utf8(): + """Test that compliant_aiter_sse handles UTF-8 characters split across chunks.""" + + # Split a UTF-8 emoji (🎉 = \xf0\x9f\x8e\x89) across chunks + test_data = [ + b'event: message\n', + b'data: Party ', + b'\xf0\x9f', # First half of emoji + b'\x8e\x89', # Second half of emoji + b' time!\n\n', + ] + + event_source = create_mock_event_source(test_data) + + events = [event async for event in compliant_aiter_sse(event_source)] + + assert len(events) == 1 + assert events[0].event == "message" + assert events[0].data == "Party 🎉 time!" + + +async def test_compliant_aiter_sse_handles_multiple_events(): + """Test that compliant_aiter_sse correctly handles multiple SSE events.""" + + # Multiple events with problematic Unicode + test_data = [ + b'event: first\ndata: Hello\xe2\x80\xa8World\n\n', + b'event: second\ndata: Test\xe2\x80\xa9Data\n\n', + b'data: No event name\n\n', + ] + + event_source = create_mock_event_source(test_data) + + events = [event async for event in compliant_aiter_sse(event_source)] + + assert len(events) == 3 + + assert events[0].event == "first" + assert '\u2028' in events[0].data + + assert events[1].event == "second" + assert '\u2029' in events[1].data + + # Default event type is "message" + assert events[2].event == "message" + assert events[2].data == "No event name" \ No newline at end of file diff --git a/tests/issues/test_1356_sse_parsing_line_separator.py b/tests/issues/test_1356_sse_parsing_line_separator.py new file mode 100644 index 0000000000..2d2f1da355 --- /dev/null +++ b/tests/issues/test_1356_sse_parsing_line_separator.py @@ -0,0 +1,161 @@ +"""Test for issue #1356: SSE parsing fails with Unicode line separator characters.""" + +import multiprocessing +import socket +import time +from collections.abc import Generator +from typing import Any + +import anyio +import pytest +import uvicorn +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import Response +from starlette.routing import Mount, Route + +from mcp.client.session import ClientSession +from mcp.client.sse import sse_client +from mcp.server import Server +from mcp.server.sse import SseServerTransport +from mcp.server.transport_security import TransportSecuritySettings +from mcp.shared.exceptions import McpError +from mcp.types import TextContent, Tool + +pytestmark = pytest.mark.anyio + + +class ProblematicUnicodeServer(Server): + """Test server that returns problematic Unicode characters.""" + + def __init__(self): + super().__init__("ProblematicUnicodeServer") + + @self.list_tools() + async def handle_list_tools() -> list[Tool]: + return [ + Tool( + name="get_problematic_unicode", + description="Returns text with problematic Unicode character U+2028", + inputSchema={"type": "object", "properties": {}}, + ) + ] + + @self.call_tool() + async def handle_call_tool(name: str, args: dict[str, Any]) -> list[TextContent]: + if name == "get_problematic_unicode": + # Return text with U+2028 (LINE SEPARATOR) which can cause JSON parsing issues + # U+2028 is a valid Unicode character but can break JSON parsing in some contexts + problematic_text = "This text contains a line separator\u2028character that may break JSON parsing" + return [TextContent(type="text", text=problematic_text)] + return [TextContent(type="text", text=f"Unknown tool: {name}")] + + +def make_problematic_server_app() -> Starlette: + """Create test Starlette app with SSE transport.""" + security_settings = TransportSecuritySettings( + allowed_hosts=["127.0.0.1:*", "localhost:*"], + allowed_origins=["http://127.0.0.1:*", "http://localhost:*"], + ) + sse = SseServerTransport("/messages/", security_settings=security_settings) + server = ProblematicUnicodeServer() + + async def handle_sse(request: Request) -> Response: + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + await server.run(streams[0], streams[1], server.create_initialization_options()) + return Response() + + app = Starlette( + routes=[ + Route("/sse", endpoint=handle_sse), + Mount("/messages/", app=sse.handle_post_message), + ] + ) + + return app + + +def run_problematic_server(server_port: int) -> None: + """Run the problematic Unicode test server.""" + app = make_problematic_server_app() + server = uvicorn.Server( + config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error") + ) + server.run() + + +@pytest.fixture +def problematic_server_port() -> int: + """Get an available port for the test server.""" + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.fixture +def problematic_server(problematic_server_port: int) -> Generator[str, None, None]: + """Start the problematic Unicode test server in a separate process.""" + proc = multiprocessing.Process( + target=run_problematic_server, kwargs={"server_port": problematic_server_port}, daemon=True + ) + proc.start() + + # Wait for server to be running + max_attempts = 20 + attempt = 0 + while attempt < max_attempts: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect(("127.0.0.1", problematic_server_port)) + break + except ConnectionRefusedError: + time.sleep(0.1) + attempt += 1 + else: + raise RuntimeError(f"Server failed to start after {max_attempts} attempts") + + yield f"http://127.0.0.1:{problematic_server_port}" + + # Clean up + proc.kill() + proc.join(timeout=2) + + +async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> None: + """Test that special Unicode characters like U+2028 are handled properly. + + This test reproduces issue #1356 where special Unicode characters + cause JSON parsing to fail and the raw exception is sent to the stream, + preventing proper error handling. + """ + # Connect to the server using SSE client + async with sse_client(problematic_server + "/sse") as streams: + async with ClientSession(*streams) as session: + # Initialize the connection + result = await session.initialize() + assert result.serverInfo.name == "ProblematicUnicodeServer" + + # Call the tool that returns problematic Unicode + # This should succeed and not hang + + # Use a timeout to detect if we're hanging + with anyio.fail_after(5): # 5 second timeout + try: + response = await session.call_tool("get_problematic_unicode", {}) + + # If we get here, the Unicode was handled properly + assert len(response.content) == 1 + text_content = response.content[0] + assert hasattr(text_content, "text"), f"Response doesn't have text: {text_content}" + + expected = "This text contains a line separator\u2028character that may break JSON parsing" + assert text_content.text == expected, f"Expected: {expected!r}, Got: {text_content.text!r}" + + except McpError: + pytest.fail("Unexpected error with tool call") + except TimeoutError: + # If we timeout, the issue is confirmed - the client hangs + pytest.fail("Client hangs when handling problematic Unicode (issue #1356 confirmed)") + except Exception as e: + # We should not get raw exceptions - they should be wrapped as McpError + pytest.fail(f"Got raw exception instead of McpError: {type(e).__name__}: {e}") From e781013a55cc35157922082f29cffe29127e6c6d Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 13:08:22 +0100 Subject: [PATCH 2/8] Apply Unicode line separator fix to streamable HTTP client Use compliant_aiter_sse in streamable_http.py to handle Unicode line separator characters correctly, preventing the same issue that affected the SSE client. --- src/mcp/client/streamable_http.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 57df647057..2dbd477afc 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -18,6 +18,7 @@ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from mcp.client.sse import compliant_aiter_sse from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( @@ -211,7 +212,8 @@ async def handle_get_stream( event_source.response.raise_for_status() logger.debug("GET SSE connection established") - async for sse in event_source.aiter_sse(): + # Use compliant SSE iterator to handle Unicode correctly (issue #1356) + async for sse in compliant_aiter_sse(event_source): await self._handle_sse_event(sse, read_stream_writer) except Exception as exc: @@ -240,7 +242,8 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: event_source.response.raise_for_status() logger.debug("Resumption GET SSE connection established") - async for sse in event_source.aiter_sse(): + # Use compliant SSE iterator to handle Unicode correctly (issue #1356) + async for sse in compliant_aiter_sse(event_source): is_complete = await self._handle_sse_event( sse, ctx.read_stream_writer, @@ -323,7 +326,8 @@ async def _handle_sse_response( """Handle SSE response from the server.""" try: event_source = EventSource(response) - async for sse in event_source.aiter_sse(): + # Use compliant SSE iterator to handle Unicode correctly (issue #1356) + async for sse in compliant_aiter_sse(event_source): is_complete = await self._handle_sse_event( sse, ctx.read_stream_writer, From 094c76d42bbe2bb16d29153d03f1428c15a0635a Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 14:37:39 +0100 Subject: [PATCH 3/8] Fix SSE parsing to handle split CRLF across chunks Previously, the SSE parser could incorrectly handle CRLF line endings when \r appeared at the end of one chunk and \n at the beginning of the next chunk, potentially treating them as two separate line breaks instead of a single CRLF sequence. This fix implements proper CRLF handling by: - Tracking when a chunk ends with \r using a skip_leading_lf flag - Skipping a leading \n in the next chunk if the previous ended with \r - Ensuring Unicode line/paragraph separators (U+2028/U+2029) are treated as regular content, not line breaks, per the SSE specification Added comprehensive test coverage for the edge case of split CRLF sequences across chunk boundaries. --- src/mcp/client/sse.py | 36 ++++++++++++++++++++++++++------ tests/client/test_sse_unicode.py | 24 ++++++++++++++++++++- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 5029637a30..b70275edc8 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -37,15 +37,38 @@ async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[Server """ decoder = SSEDecoder() buffer = b"" + + # Split on "\r\n", "\r", or "\n" only, no other new line characters. + # https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream + + # Note: this is tricky, because we could have a "\r" at the end of a chunk and not yet + # know if the next chunk starts with a "\n" or not. + skip_leading_lf = False async for chunk in event_source.response.aiter_bytes(): buffer += chunk - # Split on "\n" only (not U+2028/U+2029 or other anything else) - # https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream - while b"\n" in buffer: - line_bytes, buffer = buffer.split(b"\n", 1) - line = line_bytes.decode('utf-8', errors='replace').rstrip("\r") + while len(buffer) != 0: + if skip_leading_lf and buffer.startswith(b"\n"): + buffer = buffer[1:] + skip_leading_lf = False + + # Find first "\r" or "\n" + cr = buffer.find(b"\r") + lf = buffer.find(b"\n") + pos = cr if lf == -1 else lf if cr == -1 else min(cr, lf) + + if pos == -1: + # No lines, need another chunk + break + + line_bytes = buffer[:pos] + buffer = buffer[pos + 1:] + + # If we have a CR first, skip any LF immediately after (may be in next chunk) + skip_leading_lf = (pos == cr) + + line = line_bytes.decode('utf-8', errors='replace') sse = decoder.decode(line) if sse is not None: yield sse @@ -53,7 +76,8 @@ async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[Server # Process any remaining data in buffer if buffer: assert b"\n" not in buffer - line = buffer.decode('utf-8', errors='replace').rstrip("\r") + assert b"\r" not in buffer + line = buffer.decode('utf-8', errors='replace') sse = decoder.decode(line) if sse is not None: yield sse diff --git a/tests/client/test_sse_unicode.py b/tests/client/test_sse_unicode.py index 4a008257c3..f55f82176b 100644 --- a/tests/client/test_sse_unicode.py +++ b/tests/client/test_sse_unicode.py @@ -136,4 +136,26 @@ async def test_compliant_aiter_sse_handles_multiple_events(): # Default event type is "message" assert events[2].event == "message" - assert events[2].data == "No event name" \ No newline at end of file + assert events[2].data == "No event name" + + +async def test_compliant_aiter_sse_handles_split_crlf(): + """Test that \r at end of chunk followed by \n in next chunk is treated as one newline.""" + + # Test case where \r is at the end of one chunk and \n starts the next + # This should be treated as a single CRLF line ending, not two separate newlines + test_data = [ + b'event: test\r', # \r at end of chunk + b'\ndata: line1\r', # \n at start of next chunk, then another \r at end + b'\ndata: line2\n\n', # \n at start, completing the CRLF + ] + + event_source = create_mock_event_source(test_data) + + events = [event async for event in compliant_aiter_sse(event_source)] + + # Should get exactly one event with both data lines + assert len(events) == 1 + assert events[0].event == "test" + # The SSE decoder concatenates multiple data fields with \n + assert events[0].data == "line1\nline2" \ No newline at end of file From f310e3959669cc2a9d8884379202185da3d8c309 Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 14:42:26 +0100 Subject: [PATCH 4/8] Apply ruff formatting --- src/mcp/client/sse.py | 23 ++-- tests/client/test_sse_unicode.py | 110 +++++++++--------- .../test_1356_sse_parsing_line_separator.py | 14 +-- 3 files changed, 73 insertions(+), 74 deletions(-) diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index b70275edc8..2125952f26 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -20,18 +20,19 @@ def remove_request_params(url: str) -> str: return urljoin(url, urlparse(url).path) - + + async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[ServerSentEvent]: """ Safely iterate over SSE events, working around httpx issue where U+2028 and U+2029 are incorrectly treated as newlines, breaking SSE stream parsing. - + This function replaces event_source.aiter_sse() to handle these Unicode characters correctly by processing the raw byte stream and only splitting on actual newlines. - + Args: event_source: The EventSource to iterate over - + Yields: ServerSentEvent objects parsed from the stream """ @@ -44,10 +45,10 @@ async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[Server # Note: this is tricky, because we could have a "\r" at the end of a chunk and not yet # know if the next chunk starts with a "\n" or not. skip_leading_lf = False - + async for chunk in event_source.response.aiter_bytes(): buffer += chunk - + while len(buffer) != 0: if skip_leading_lf and buffer.startswith(b"\n"): buffer = buffer[1:] @@ -63,21 +64,21 @@ async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[Server break line_bytes = buffer[:pos] - buffer = buffer[pos + 1:] + buffer = buffer[pos + 1 :] # If we have a CR first, skip any LF immediately after (may be in next chunk) - skip_leading_lf = (pos == cr) + skip_leading_lf = pos == cr - line = line_bytes.decode('utf-8', errors='replace') + line = line_bytes.decode("utf-8", errors="replace") sse = decoder.decode(line) if sse is not None: yield sse - + # Process any remaining data in buffer if buffer: assert b"\n" not in buffer assert b"\r" not in buffer - line = buffer.decode('utf-8', errors='replace') + line = buffer.decode("utf-8", errors="replace") sse = decoder.decode(line) if sse is not None: yield sse diff --git a/tests/client/test_sse_unicode.py b/tests/client/test_sse_unicode.py index f55f82176b..b85e595fab 100644 --- a/tests/client/test_sse_unicode.py +++ b/tests/client/test_sse_unicode.py @@ -16,76 +16,76 @@ def create_mock_event_source(data_chunks: list[bytes]) -> EventSource: event_source = MagicMock(spec=EventSource) response = AsyncMock() event_source.response = response - + async def mock_aiter_bytes() -> AsyncIterator[bytes]: for chunk in data_chunks: yield chunk - + response.aiter_bytes = mock_aiter_bytes return event_source async def test_compliant_aiter_sse_handles_unicode_line_separators(): """Test that compliant_aiter_sse correctly handles U+2028 and U+2029 characters.""" - + # Simulate SSE data with U+2028 in JSON content # The server sends: event: message\ndata: {"text":"Hello\u2028World"}\n\n test_data = [ - b'event: message\n', + b"event: message\n", b'data: {"text":"Hello', - b'\xe2\x80\xa8', # UTF-8 encoding of U+2028 + b"\xe2\x80\xa8", # UTF-8 encoding of U+2028 b'World"}\n', - b'\n', + b"\n", ] - + event_source = create_mock_event_source(test_data) - + # Collect events events = [event async for event in compliant_aiter_sse(event_source)] - + # Should receive one message event assert len(events) == 1 assert events[0].event == "message" # The U+2028 should be preserved in the data - assert '\u2028' in events[0].data + assert "\u2028" in events[0].data assert events[0].data == '{"text":"Hello\u2028World"}' async def test_compliant_aiter_sse_handles_paragraph_separator(): """Test that compliant_aiter_sse correctly handles U+2029 (PARAGRAPH SEPARATOR).""" - + # Simulate SSE data with U+2029 test_data = [ - b'event: test\ndata: Line1', - b'\xe2\x80\xa9', # UTF-8 encoding of U+2029 - b'Line2\n\n', + b"event: test\ndata: Line1", + b"\xe2\x80\xa9", # UTF-8 encoding of U+2029 + b"Line2\n\n", ] - + event_source = create_mock_event_source(test_data) - + events = [event async for event in compliant_aiter_sse(event_source)] - + assert len(events) == 1 assert events[0].event == "test" # U+2029 should be preserved, not treated as a newline - assert '\u2029' in events[0].data - assert events[0].data == 'Line1\u2029Line2' + assert "\u2029" in events[0].data + assert events[0].data == "Line1\u2029Line2" async def test_compliant_aiter_sse_handles_crlf(): """Test that compliant_aiter_sse correctly handles \\r\\n line endings.""" - + # Simulate SSE data with CRLF line endings test_data = [ - b'event: message\r\n', - b'data: test data\r\n', - b'\r\n', + b"event: message\r\n", + b"data: test data\r\n", + b"\r\n", ] - + event_source = create_mock_event_source(test_data) - + events = [event async for event in compliant_aiter_sse(event_source)] - + assert len(events) == 1 assert events[0].event == "message" assert events[0].data == "test data" @@ -93,20 +93,20 @@ async def test_compliant_aiter_sse_handles_crlf(): async def test_compliant_aiter_sse_handles_split_utf8(): """Test that compliant_aiter_sse handles UTF-8 characters split across chunks.""" - + # Split a UTF-8 emoji (🎉 = \xf0\x9f\x8e\x89) across chunks test_data = [ - b'event: message\n', - b'data: Party ', - b'\xf0\x9f', # First half of emoji - b'\x8e\x89', # Second half of emoji - b' time!\n\n', + b"event: message\n", + b"data: Party ", + b"\xf0\x9f", # First half of emoji + b"\x8e\x89", # Second half of emoji + b" time!\n\n", ] - + event_source = create_mock_event_source(test_data) - + events = [event async for event in compliant_aiter_sse(event_source)] - + assert len(events) == 1 assert events[0].event == "message" assert events[0].data == "Party 🎉 time!" @@ -114,26 +114,26 @@ async def test_compliant_aiter_sse_handles_split_utf8(): async def test_compliant_aiter_sse_handles_multiple_events(): """Test that compliant_aiter_sse correctly handles multiple SSE events.""" - + # Multiple events with problematic Unicode test_data = [ - b'event: first\ndata: Hello\xe2\x80\xa8World\n\n', - b'event: second\ndata: Test\xe2\x80\xa9Data\n\n', - b'data: No event name\n\n', + b"event: first\ndata: Hello\xe2\x80\xa8World\n\n", + b"event: second\ndata: Test\xe2\x80\xa9Data\n\n", + b"data: No event name\n\n", ] - + event_source = create_mock_event_source(test_data) - + events = [event async for event in compliant_aiter_sse(event_source)] - + assert len(events) == 3 - + assert events[0].event == "first" - assert '\u2028' in events[0].data - + assert "\u2028" in events[0].data + assert events[1].event == "second" - assert '\u2029' in events[1].data - + assert "\u2029" in events[1].data + # Default event type is "message" assert events[2].event == "message" assert events[2].data == "No event name" @@ -141,21 +141,21 @@ async def test_compliant_aiter_sse_handles_multiple_events(): async def test_compliant_aiter_sse_handles_split_crlf(): """Test that \r at end of chunk followed by \n in next chunk is treated as one newline.""" - + # Test case where \r is at the end of one chunk and \n starts the next # This should be treated as a single CRLF line ending, not two separate newlines test_data = [ - b'event: test\r', # \r at end of chunk - b'\ndata: line1\r', # \n at start of next chunk, then another \r at end - b'\ndata: line2\n\n', # \n at start, completing the CRLF + b"event: test\r", # \r at end of chunk + b"\ndata: line1\r", # \n at start of next chunk, then another \r at end + b"\ndata: line2\n\n", # \n at start, completing the CRLF ] - + event_source = create_mock_event_source(test_data) - + events = [event async for event in compliant_aiter_sse(event_source)] - + # Should get exactly one event with both data lines assert len(events) == 1 assert events[0].event == "test" # The SSE decoder concatenates multiple data fields with \n - assert events[0].data == "line1\nline2" \ No newline at end of file + assert events[0].data == "line1\nline2" diff --git a/tests/issues/test_1356_sse_parsing_line_separator.py b/tests/issues/test_1356_sse_parsing_line_separator.py index 2d2f1da355..23e2d4a343 100644 --- a/tests/issues/test_1356_sse_parsing_line_separator.py +++ b/tests/issues/test_1356_sse_parsing_line_separator.py @@ -78,9 +78,7 @@ async def handle_sse(request: Request) -> Response: def run_problematic_server(server_port: int) -> None: """Run the problematic Unicode test server.""" app = make_problematic_server_app() - server = uvicorn.Server( - config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error") - ) + server = uvicorn.Server(config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error")) server.run() @@ -123,7 +121,7 @@ def problematic_server(problematic_server_port: int) -> Generator[str, None, Non async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> None: """Test that special Unicode characters like U+2028 are handled properly. - + This test reproduces issue #1356 where special Unicode characters cause JSON parsing to fail and the raw exception is sent to the stream, preventing proper error handling. @@ -137,20 +135,20 @@ async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> # Call the tool that returns problematic Unicode # This should succeed and not hang - + # Use a timeout to detect if we're hanging with anyio.fail_after(5): # 5 second timeout try: response = await session.call_tool("get_problematic_unicode", {}) - + # If we get here, the Unicode was handled properly assert len(response.content) == 1 text_content = response.content[0] assert hasattr(text_content, "text"), f"Response doesn't have text: {text_content}" - + expected = "This text contains a line separator\u2028character that may break JSON parsing" assert text_content.text == expected, f"Expected: {expected!r}, Got: {text_content.text!r}" - + except McpError: pytest.fail("Unexpected error with tool call") except TimeoutError: From b2dde3df3367d327f341423b7f1937bcb11409e8 Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 15:16:37 +0100 Subject: [PATCH 5/8] Fix type narrowing for TextContent in test --- tests/issues/test_1356_sse_parsing_line_separator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/issues/test_1356_sse_parsing_line_separator.py b/tests/issues/test_1356_sse_parsing_line_separator.py index 23e2d4a343..9c15c8c348 100644 --- a/tests/issues/test_1356_sse_parsing_line_separator.py +++ b/tests/issues/test_1356_sse_parsing_line_separator.py @@ -145,6 +145,10 @@ async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> assert len(response.content) == 1 text_content = response.content[0] assert hasattr(text_content, "text"), f"Response doesn't have text: {text_content}" + + # Type narrowing for pyright + from mcp.types import TextContent + assert isinstance(text_content, TextContent) expected = "This text contains a line separator\u2028character that may break JSON parsing" assert text_content.text == expected, f"Expected: {expected!r}, Got: {text_content.text!r}" From 1c2d4c651428515d0bbdaa18446357efae8dbe92 Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 10 Sep 2025 15:34:37 +0100 Subject: [PATCH 6/8] Fix ruff --- tests/issues/test_1356_sse_parsing_line_separator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/issues/test_1356_sse_parsing_line_separator.py b/tests/issues/test_1356_sse_parsing_line_separator.py index 9c15c8c348..5e87d17ac4 100644 --- a/tests/issues/test_1356_sse_parsing_line_separator.py +++ b/tests/issues/test_1356_sse_parsing_line_separator.py @@ -145,9 +145,10 @@ async def test_json_parsing_with_problematic_unicode(problematic_server: str) -> assert len(response.content) == 1 text_content = response.content[0] assert hasattr(text_content, "text"), f"Response doesn't have text: {text_content}" - + # Type narrowing for pyright from mcp.types import TextContent + assert isinstance(text_content, TextContent) expected = "This text contains a line separator\u2028character that may break JSON parsing" From 45109d204920c28bc83003ec7f5537c8d3d0464b Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Tue, 7 Oct 2025 16:25:29 +0100 Subject: [PATCH 7/8] Revert SSE parsing workaround - fixed in httpx-sse 0.4.2 The Unicode line separator issue (U+2028 and U+2029 characters being incorrectly treated as newlines) has been fixed in httpx-sse 0.4.2. See: https://github.com/florimondmanca/httpx-sse/pull/39 Revert the compliant_aiter_sse workaround and use the standard event_source.aiter_sse() method again. Upgrade httpx-sse to >=0.4.2 to get the fix. Keep the high-level issue test to ensure the problem doesn't regress. Github-Issue:#1356 --- pyproject.toml | 2 +- src/mcp/client/sse.py | 69 +------------ src/mcp/client/streamable_http.py | 10 +- tests/client/test_sse_unicode.py | 161 ------------------------------ uv.lock | 6 +- 5 files changed, 9 insertions(+), 239 deletions(-) delete mode 100644 tests/client/test_sse_unicode.py diff --git a/pyproject.toml b/pyproject.toml index c6119867ef..fc2cc70576 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ dependencies = [ "anyio>=4.5", "httpx>=0.27.1", - "httpx-sse>=0.4", + "httpx-sse>=0.4.2", "pydantic>=2.11.0,<3.0.0", "starlette>=0.27", "python-multipart>=0.0.9", diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 2125952f26..7ca8d19afd 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -1,5 +1,4 @@ import logging -from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any from urllib.parse import urljoin, urlparse @@ -8,8 +7,7 @@ import httpx from anyio.abc import TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from httpx_sse import EventSource, ServerSentEvent, aconnect_sse -from httpx_sse._decoders import SSEDecoder +from httpx_sse import aconnect_sse import mcp.types as types from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client @@ -22,68 +20,6 @@ def remove_request_params(url: str) -> str: return urljoin(url, urlparse(url).path) -async def compliant_aiter_sse(event_source: EventSource) -> AsyncIterator[ServerSentEvent]: - """ - Safely iterate over SSE events, working around httpx issue where U+2028 and U+2029 - are incorrectly treated as newlines, breaking SSE stream parsing. - - This function replaces event_source.aiter_sse() to handle these Unicode characters - correctly by processing the raw byte stream and only splitting on actual newlines. - - Args: - event_source: The EventSource to iterate over - - Yields: - ServerSentEvent objects parsed from the stream - """ - decoder = SSEDecoder() - buffer = b"" - - # Split on "\r\n", "\r", or "\n" only, no other new line characters. - # https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream - - # Note: this is tricky, because we could have a "\r" at the end of a chunk and not yet - # know if the next chunk starts with a "\n" or not. - skip_leading_lf = False - - async for chunk in event_source.response.aiter_bytes(): - buffer += chunk - - while len(buffer) != 0: - if skip_leading_lf and buffer.startswith(b"\n"): - buffer = buffer[1:] - skip_leading_lf = False - - # Find first "\r" or "\n" - cr = buffer.find(b"\r") - lf = buffer.find(b"\n") - pos = cr if lf == -1 else lf if cr == -1 else min(cr, lf) - - if pos == -1: - # No lines, need another chunk - break - - line_bytes = buffer[:pos] - buffer = buffer[pos + 1 :] - - # If we have a CR first, skip any LF immediately after (may be in next chunk) - skip_leading_lf = pos == cr - - line = line_bytes.decode("utf-8", errors="replace") - sse = decoder.decode(line) - if sse is not None: - yield sse - - # Process any remaining data in buffer - if buffer: - assert b"\n" not in buffer - assert b"\r" not in buffer - line = buffer.decode("utf-8", errors="replace") - sse = decoder.decode(line) - if sse is not None: - yield sse - - @asynccontextmanager async def sse_client( url: str, @@ -133,8 +69,7 @@ async def sse_reader( task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED, ): try: - # Use our compliant SSE iterator to handle Unicode correctly (issue #1356) - async for sse in compliant_aiter_sse(event_source): + async for sse in event_source.aiter_sse(): logger.debug(f"Received SSE event: {sse.event}") match sse.event: case "endpoint": diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 2dbd477afc..57df647057 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -18,7 +18,6 @@ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse -from mcp.client.sse import compliant_aiter_sse from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( @@ -212,8 +211,7 @@ async def handle_get_stream( event_source.response.raise_for_status() logger.debug("GET SSE connection established") - # Use compliant SSE iterator to handle Unicode correctly (issue #1356) - async for sse in compliant_aiter_sse(event_source): + async for sse in event_source.aiter_sse(): await self._handle_sse_event(sse, read_stream_writer) except Exception as exc: @@ -242,8 +240,7 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: event_source.response.raise_for_status() logger.debug("Resumption GET SSE connection established") - # Use compliant SSE iterator to handle Unicode correctly (issue #1356) - async for sse in compliant_aiter_sse(event_source): + async for sse in event_source.aiter_sse(): is_complete = await self._handle_sse_event( sse, ctx.read_stream_writer, @@ -326,8 +323,7 @@ async def _handle_sse_response( """Handle SSE response from the server.""" try: event_source = EventSource(response) - # Use compliant SSE iterator to handle Unicode correctly (issue #1356) - async for sse in compliant_aiter_sse(event_source): + async for sse in event_source.aiter_sse(): is_complete = await self._handle_sse_event( sse, ctx.read_stream_writer, diff --git a/tests/client/test_sse_unicode.py b/tests/client/test_sse_unicode.py deleted file mode 100644 index b85e595fab..0000000000 --- a/tests/client/test_sse_unicode.py +++ /dev/null @@ -1,161 +0,0 @@ -"""Test for SSE client Unicode handling.""" - -from collections.abc import AsyncIterator -from unittest.mock import AsyncMock, MagicMock - -import pytest -from httpx_sse import EventSource - -from mcp.client.sse import compliant_aiter_sse - -pytestmark = pytest.mark.anyio - - -def create_mock_event_source(data_chunks: list[bytes]) -> EventSource: - """Create a mock EventSource that yields the given data chunks.""" - event_source = MagicMock(spec=EventSource) - response = AsyncMock() - event_source.response = response - - async def mock_aiter_bytes() -> AsyncIterator[bytes]: - for chunk in data_chunks: - yield chunk - - response.aiter_bytes = mock_aiter_bytes - return event_source - - -async def test_compliant_aiter_sse_handles_unicode_line_separators(): - """Test that compliant_aiter_sse correctly handles U+2028 and U+2029 characters.""" - - # Simulate SSE data with U+2028 in JSON content - # The server sends: event: message\ndata: {"text":"Hello\u2028World"}\n\n - test_data = [ - b"event: message\n", - b'data: {"text":"Hello', - b"\xe2\x80\xa8", # UTF-8 encoding of U+2028 - b'World"}\n', - b"\n", - ] - - event_source = create_mock_event_source(test_data) - - # Collect events - events = [event async for event in compliant_aiter_sse(event_source)] - - # Should receive one message event - assert len(events) == 1 - assert events[0].event == "message" - # The U+2028 should be preserved in the data - assert "\u2028" in events[0].data - assert events[0].data == '{"text":"Hello\u2028World"}' - - -async def test_compliant_aiter_sse_handles_paragraph_separator(): - """Test that compliant_aiter_sse correctly handles U+2029 (PARAGRAPH SEPARATOR).""" - - # Simulate SSE data with U+2029 - test_data = [ - b"event: test\ndata: Line1", - b"\xe2\x80\xa9", # UTF-8 encoding of U+2029 - b"Line2\n\n", - ] - - event_source = create_mock_event_source(test_data) - - events = [event async for event in compliant_aiter_sse(event_source)] - - assert len(events) == 1 - assert events[0].event == "test" - # U+2029 should be preserved, not treated as a newline - assert "\u2029" in events[0].data - assert events[0].data == "Line1\u2029Line2" - - -async def test_compliant_aiter_sse_handles_crlf(): - """Test that compliant_aiter_sse correctly handles \\r\\n line endings.""" - - # Simulate SSE data with CRLF line endings - test_data = [ - b"event: message\r\n", - b"data: test data\r\n", - b"\r\n", - ] - - event_source = create_mock_event_source(test_data) - - events = [event async for event in compliant_aiter_sse(event_source)] - - assert len(events) == 1 - assert events[0].event == "message" - assert events[0].data == "test data" - - -async def test_compliant_aiter_sse_handles_split_utf8(): - """Test that compliant_aiter_sse handles UTF-8 characters split across chunks.""" - - # Split a UTF-8 emoji (🎉 = \xf0\x9f\x8e\x89) across chunks - test_data = [ - b"event: message\n", - b"data: Party ", - b"\xf0\x9f", # First half of emoji - b"\x8e\x89", # Second half of emoji - b" time!\n\n", - ] - - event_source = create_mock_event_source(test_data) - - events = [event async for event in compliant_aiter_sse(event_source)] - - assert len(events) == 1 - assert events[0].event == "message" - assert events[0].data == "Party 🎉 time!" - - -async def test_compliant_aiter_sse_handles_multiple_events(): - """Test that compliant_aiter_sse correctly handles multiple SSE events.""" - - # Multiple events with problematic Unicode - test_data = [ - b"event: first\ndata: Hello\xe2\x80\xa8World\n\n", - b"event: second\ndata: Test\xe2\x80\xa9Data\n\n", - b"data: No event name\n\n", - ] - - event_source = create_mock_event_source(test_data) - - events = [event async for event in compliant_aiter_sse(event_source)] - - assert len(events) == 3 - - assert events[0].event == "first" - assert "\u2028" in events[0].data - - assert events[1].event == "second" - assert "\u2029" in events[1].data - - # Default event type is "message" - assert events[2].event == "message" - assert events[2].data == "No event name" - - -async def test_compliant_aiter_sse_handles_split_crlf(): - """Test that \r at end of chunk followed by \n in next chunk is treated as one newline.""" - - # Test case where \r is at the end of one chunk and \n starts the next - # This should be treated as a single CRLF line ending, not two separate newlines - test_data = [ - b"event: test\r", # \r at end of chunk - b"\ndata: line1\r", # \n at start of next chunk, then another \r at end - b"\ndata: line2\n\n", # \n at start, completing the CRLF - ] - - event_source = create_mock_event_source(test_data) - - events = [event async for event in compliant_aiter_sse(event_source)] - - # Should get exactly one event with both data lines - assert len(events) == 1 - assert events[0].event == "test" - # The SSE decoder concatenates multiple data fields with \n - assert events[0].data == "line1\nline2" diff --git a/uv.lock b/uv.lock index 68abdcc4f5..47942c55b3 100644 --- a/uv.lock +++ b/uv.lock @@ -442,11 +442,11 @@ wheels = [ [[package]] name = "httpx-sse" -version = "0.4.1" +version = "0.4.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/6e/fa/66bd985dd0b7c109a3bcb89272ee0bfb7e2b4d06309ad7b38ff866734b2a/httpx_sse-0.4.1.tar.gz", hash = "sha256:8f44d34414bc7b21bf3602713005c5df4917884f76072479b21f68befa4ea26e", size = 12998, upload-time = "2025-06-24T13:21:05.71Z" } +sdist = { url = "https://files.pythonhosted.org/packages/63/7a/280d644f906f077e4f4a6d327e9b6e5a936624395ad1bf6ee9165a9d9959/httpx_sse-0.4.2.tar.gz", hash = "sha256:5bb6a2771a51e6c7a5f5c645e40b8a5f57d8de708f46cb5f3868043c3c18124e", size = 13103, upload-time = "2025-10-07T14:15:23.000Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/25/0a/6269e3473b09aed2dab8aa1a600c70f31f00ae1349bee30658f7e358a159/httpx_sse-0.4.1-py3-none-any.whl", hash = "sha256:cba42174344c3a5b06f255ce65b350880f962d99ead85e776f23c6618a377a37", size = 8054, upload-time = "2025-06-24T13:21:04.772Z" }, + { url = "https://files.pythonhosted.org/packages/4f/e5/ec31165492ecc52426370b9005e0637d6da02f9579283298affcb1ab614d/httpx_sse-0.4.2-py3-none-any.whl", hash = "sha256:a9fa4afacb293fa50ef9bacb6cae8287ba5fd1f4b1c2d10a35bb981c41da31ab", size = 8149, upload-time = "2025-10-07T14:15:23.000Z" }, ] [[package]] From 98ce8a2eabdd505cc5fc95c10621f97c20090ae6 Mon Sep 17 00:00:00 2001 From: Peter Alexander Date: Wed, 8 Oct 2025 17:28:24 +0100 Subject: [PATCH 8/8] Update uv.lock --- uv.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/uv.lock b/uv.lock index 47942c55b3..45f46a0f3e 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" [manifest] @@ -444,9 +444,9 @@ wheels = [ name = "httpx-sse" version = "0.4.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/63/7a/280d644f906f077e4f4a6d327e9b6e5a936624395ad1bf6ee9165a9d9959/httpx_sse-0.4.2.tar.gz", hash = "sha256:5bb6a2771a51e6c7a5f5c645e40b8a5f57d8de708f46cb5f3868043c3c18124e", size = 13103, upload-time = "2025-10-07T14:15:23.000Z" } +sdist = { url = "https://files.pythonhosted.org/packages/63/7a/280d644f906f077e4f4a6d327e9b6e5a936624395ad1bf6ee9165a9d9959/httpx_sse-0.4.2.tar.gz", hash = "sha256:5bb6a2771a51e6c7a5f5c645e40b8a5f57d8de708f46cb5f3868043c3c18124e", size = 16000, upload-time = "2025-10-07T08:10:05.219Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4f/e5/ec31165492ecc52426370b9005e0637d6da02f9579283298affcb1ab614d/httpx_sse-0.4.2-py3-none-any.whl", hash = "sha256:a9fa4afacb293fa50ef9bacb6cae8287ba5fd1f4b1c2d10a35bb981c41da31ab", size = 8149, upload-time = "2025-10-07T14:15:23.000Z" }, + { url = "https://files.pythonhosted.org/packages/4f/e5/ec31165492ecc52426370b9005e0637d6da02f9579283298affcb1ab614d/httpx_sse-0.4.2-py3-none-any.whl", hash = "sha256:a9fa4afacb293fa50ef9bacb6cae8287ba5fd1f4b1c2d10a35bb981c41da31ab", size = 9018, upload-time = "2025-10-07T08:10:04.257Z" }, ] [[package]] @@ -654,7 +654,7 @@ docs = [ requires-dist = [ { name = "anyio", specifier = ">=4.5" }, { name = "httpx", specifier = ">=0.27.1" }, - { name = "httpx-sse", specifier = ">=0.4" }, + { name = "httpx-sse", specifier = ">=0.4.2" }, { name = "jsonschema", specifier = ">=4.20.0" }, { name = "pydantic", specifier = ">=2.11.0,<3.0.0" }, { name = "pydantic-settings", specifier = ">=2.5.2" },