1
1
import logging
2
- from collections .abc import AsyncIterator
3
2
from contextlib import asynccontextmanager
4
3
from typing import Any
5
4
from urllib .parse import urljoin , urlparse
8
7
import httpx
9
8
from anyio .abc import TaskStatus
10
9
from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
11
- from httpx_sse import EventSource , ServerSentEvent , aconnect_sse
12
- from httpx_sse ._decoders import SSEDecoder
10
+ from httpx_sse import aconnect_sse
13
11
14
12
import mcp .types as types
15
13
from mcp .shared ._httpx_utils import McpHttpClientFactory , create_mcp_http_client
@@ -22,68 +20,6 @@ def remove_request_params(url: str) -> str:
22
20
return urljoin (url , urlparse (url ).path )
23
21
24
22
25
- async def compliant_aiter_sse (event_source : EventSource ) -> AsyncIterator [ServerSentEvent ]:
26
- """
27
- Safely iterate over SSE events, working around httpx issue where U+2028 and U+2029
28
- are incorrectly treated as newlines, breaking SSE stream parsing.
29
-
30
- This function replaces event_source.aiter_sse() to handle these Unicode characters
31
- correctly by processing the raw byte stream and only splitting on actual newlines.
32
-
33
- Args:
34
- event_source: The EventSource to iterate over
35
-
36
- Yields:
37
- ServerSentEvent objects parsed from the stream
38
- """
39
- decoder = SSEDecoder ()
40
- buffer = b""
41
-
42
- # Split on "\r\n", "\r", or "\n" only, no other new line characters.
43
- # https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
44
-
45
- # Note: this is tricky, because we could have a "\r" at the end of a chunk and not yet
46
- # know if the next chunk starts with a "\n" or not.
47
- skip_leading_lf = False
48
-
49
- async for chunk in event_source .response .aiter_bytes ():
50
- buffer += chunk
51
-
52
- while len (buffer ) != 0 :
53
- if skip_leading_lf and buffer .startswith (b"\n " ):
54
- buffer = buffer [1 :]
55
- skip_leading_lf = False
56
-
57
- # Find first "\r" or "\n"
58
- cr = buffer .find (b"\r " )
59
- lf = buffer .find (b"\n " )
60
- pos = cr if lf == - 1 else lf if cr == - 1 else min (cr , lf )
61
-
62
- if pos == - 1 :
63
- # No lines, need another chunk
64
- break
65
-
66
- line_bytes = buffer [:pos ]
67
- buffer = buffer [pos + 1 :]
68
-
69
- # If we have a CR first, skip any LF immediately after (may be in next chunk)
70
- skip_leading_lf = pos == cr
71
-
72
- line = line_bytes .decode ("utf-8" , errors = "replace" )
73
- sse = decoder .decode (line )
74
- if sse is not None :
75
- yield sse
76
-
77
- # Process any remaining data in buffer
78
- if buffer :
79
- assert b"\n " not in buffer
80
- assert b"\r " not in buffer
81
- line = buffer .decode ("utf-8" , errors = "replace" )
82
- sse = decoder .decode (line )
83
- if sse is not None :
84
- yield sse
85
-
86
-
87
23
@asynccontextmanager
88
24
async def sse_client (
89
25
url : str ,
@@ -133,8 +69,7 @@ async def sse_reader(
133
69
task_status : TaskStatus [str ] = anyio .TASK_STATUS_IGNORED ,
134
70
):
135
71
try :
136
- # Use our compliant SSE iterator to handle Unicode correctly (issue #1356)
137
- async for sse in compliant_aiter_sse (event_source ):
72
+ async for sse in event_source .aiter_sse ():
138
73
logger .debug (f"Received SSE event: { sse .event } " )
139
74
match sse .event :
140
75
case "endpoint" :
0 commit comments