Skip to content

Commit b71b621

Browse files
Add tests to cover SSE polling reconnection code paths
Add tests for previously uncovered lines in streamable_http client and server to achieve full test coverage: - Client _get_next_reconnection_delay with server retry values - Client resume_stream early returns and error handling - Server _create_priming_event with various configurations - Server close_sse_stream including edge cases Github-Issue:#1654
1 parent 0fdccd2 commit b71b621

File tree

1 file changed

+251
-0
lines changed

1 file changed

+251
-0
lines changed

tests/shared/test_streamable_http.py

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,3 +1606,254 @@ async def bad_client():
16061606
assert isinstance(result, InitializeResult)
16071607
tools = await session.list_tools()
16081608
assert tools.tools
1609+
1610+
1611+
# =============================================================================
1612+
# SEP-1699: SSE Polling Support Tests
1613+
# =============================================================================
1614+
1615+
1616+
@pytest.mark.anyio
1617+
async def test_reconnection_delay_with_server_retry():
1618+
"""Test _get_next_reconnection_delay uses server-provided retry value."""
1619+
from mcp.client.streamable_http import (
1620+
StreamableHTTPReconnectionOptions,
1621+
StreamableHTTPTransport,
1622+
)
1623+
1624+
transport = StreamableHTTPTransport(
1625+
"http://localhost:8000",
1626+
reconnection_options=StreamableHTTPReconnectionOptions(
1627+
initial_reconnection_delay=1.0,
1628+
max_reconnection_delay=30.0,
1629+
reconnection_delay_grow_factor=2.0,
1630+
max_retries=5,
1631+
),
1632+
)
1633+
1634+
# Without server retry, should use exponential backoff
1635+
delay_0 = transport._get_next_reconnection_delay(0)
1636+
assert delay_0 == 1.0 # initial_delay * 2^0 = 1.0
1637+
1638+
delay_1 = transport._get_next_reconnection_delay(1)
1639+
assert delay_1 == 2.0 # initial_delay * 2^1 = 2.0
1640+
1641+
delay_2 = transport._get_next_reconnection_delay(2)
1642+
assert delay_2 == 4.0 # initial_delay * 2^2 = 4.0
1643+
1644+
# Should cap at max_reconnection_delay
1645+
delay_large = transport._get_next_reconnection_delay(10)
1646+
assert delay_large == 30.0 # capped at max
1647+
1648+
# Set server-provided retry value
1649+
transport._server_retry_seconds = 5.0
1650+
1651+
# Should now use server-provided value regardless of attempt
1652+
assert transport._get_next_reconnection_delay(0) == 5.0
1653+
assert transport._get_next_reconnection_delay(5) == 5.0
1654+
assert transport._get_next_reconnection_delay(100) == 5.0
1655+
1656+
1657+
@pytest.mark.anyio
1658+
async def test_create_priming_event_with_event_store():
1659+
"""Test _create_priming_event generates correct event when event store configured."""
1660+
event_store = SimpleEventStore()
1661+
1662+
transport = StreamableHTTPServerTransport(
1663+
mcp_session_id="test-session",
1664+
event_store=event_store,
1665+
retry_interval=5000, # 5 seconds in ms
1666+
)
1667+
1668+
# Create priming event
1669+
priming_event = await transport._create_priming_event("stream-123")
1670+
1671+
assert priming_event is not None
1672+
assert "id" in priming_event
1673+
assert priming_event["id"] == "1" # First event ID from SimpleEventStore
1674+
assert priming_event["data"] == "" # Empty data for priming
1675+
assert priming_event["retry"] == "5000"
1676+
1677+
1678+
@pytest.mark.anyio
1679+
async def test_create_priming_event_without_retry_interval():
1680+
"""Test _create_priming_event without retry interval configured."""
1681+
event_store = SimpleEventStore()
1682+
1683+
transport = StreamableHTTPServerTransport(
1684+
mcp_session_id="test-session",
1685+
event_store=event_store,
1686+
# No retry_interval
1687+
)
1688+
1689+
priming_event = await transport._create_priming_event("stream-456")
1690+
1691+
assert priming_event is not None
1692+
assert "id" in priming_event
1693+
assert priming_event["data"] == ""
1694+
assert "retry" not in priming_event # No retry field
1695+
1696+
1697+
@pytest.mark.anyio
1698+
async def test_create_priming_event_without_event_store():
1699+
"""Test _create_priming_event returns None without event store."""
1700+
transport = StreamableHTTPServerTransport(
1701+
mcp_session_id="test-session",
1702+
# No event_store
1703+
)
1704+
1705+
priming_event = await transport._create_priming_event("stream-789")
1706+
1707+
assert priming_event is None
1708+
1709+
1710+
@pytest.mark.anyio
1711+
async def test_close_sse_stream():
1712+
"""Test close_sse_stream closes the stream and cleans up."""
1713+
transport = StreamableHTTPServerTransport(
1714+
mcp_session_id="test-session",
1715+
)
1716+
1717+
# Manually add a stream to _request_streams
1718+
send_stream, recv_stream = anyio.create_memory_object_stream[EventMessage](0)
1719+
transport._request_streams["request-123"] = (send_stream, recv_stream)
1720+
1721+
assert "request-123" in transport._request_streams
1722+
1723+
# Close the stream
1724+
await transport.close_sse_stream("request-123")
1725+
1726+
# Stream should be removed
1727+
assert "request-123" not in transport._request_streams
1728+
1729+
1730+
@pytest.mark.anyio
1731+
async def test_close_sse_stream_nonexistent():
1732+
"""Test close_sse_stream handles nonexistent stream gracefully."""
1733+
transport = StreamableHTTPServerTransport(
1734+
mcp_session_id="test-session",
1735+
)
1736+
1737+
# Should not raise even if stream doesn't exist
1738+
await transport.close_sse_stream("nonexistent-stream")
1739+
1740+
1741+
@pytest.mark.anyio
1742+
async def test_close_sse_stream_already_closed():
1743+
"""Test close_sse_stream handles already-closed streams gracefully."""
1744+
transport = StreamableHTTPServerTransport(
1745+
mcp_session_id="test-session",
1746+
)
1747+
1748+
# Manually add a stream and close it before calling close_sse_stream
1749+
send_stream, recv_stream = anyio.create_memory_object_stream[EventMessage](0)
1750+
transport._request_streams["request-456"] = (send_stream, recv_stream)
1751+
1752+
# Close streams manually first
1753+
await send_stream.aclose()
1754+
await recv_stream.aclose()
1755+
1756+
# Should handle gracefully without error
1757+
await transport.close_sse_stream("request-456")
1758+
1759+
# Stream should still be removed from dict
1760+
assert "request-456" not in transport._request_streams
1761+
1762+
1763+
@pytest.mark.anyio
1764+
async def test_resume_stream_without_session_id():
1765+
"""Test resume_stream returns early without session ID."""
1766+
from mcp.client.streamable_http import StreamableHTTPTransport
1767+
1768+
transport = StreamableHTTPTransport("http://localhost:8000")
1769+
assert transport.session_id is None
1770+
1771+
# Create a dummy stream writer
1772+
read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0)
1773+
1774+
async with httpx.AsyncClient() as client:
1775+
# Should return early without making request
1776+
await transport.resume_stream(
1777+
client,
1778+
read_stream_writer,
1779+
"event-123",
1780+
)
1781+
1782+
# Clean up streams to avoid resource warnings
1783+
await read_stream_writer.aclose()
1784+
await read_stream_reader.aclose()
1785+
1786+
# No errors should occur
1787+
1788+
1789+
@pytest.mark.anyio
1790+
async def test_resume_stream_with_405_response(basic_server: None, basic_server_url: str):
1791+
"""Test resume_stream handles 405 Method Not Allowed gracefully."""
1792+
from mcp.client.streamable_http import StreamableHTTPTransport
1793+
1794+
transport = StreamableHTTPTransport(f"{basic_server_url}/mcp")
1795+
1796+
# First establish a session via initialization
1797+
async with streamablehttp_client(f"{basic_server_url}/mcp") as (
1798+
read_stream,
1799+
write_stream,
1800+
get_session_id,
1801+
):
1802+
async with ClientSession(read_stream, write_stream) as session:
1803+
await session.initialize()
1804+
transport.session_id = get_session_id()
1805+
1806+
# Now try to resume with the session - server might return 405
1807+
read_stream_writer, read_stream_reader = anyio.create_memory_object_stream(0)
1808+
1809+
async with httpx.AsyncClient() as client:
1810+
# This should handle the 405 gracefully
1811+
await transport.resume_stream(
1812+
client,
1813+
read_stream_writer,
1814+
"nonexistent-event-id",
1815+
)
1816+
1817+
# Clean up streams to avoid resource warnings
1818+
await read_stream_writer.aclose()
1819+
await read_stream_reader.aclose()
1820+
1821+
1822+
@pytest.mark.anyio
1823+
async def test_reconnection_options_dataclass():
1824+
"""Test StreamableHTTPReconnectionOptions defaults."""
1825+
from mcp.client.streamable_http import StreamableHTTPReconnectionOptions
1826+
1827+
options = StreamableHTTPReconnectionOptions()
1828+
1829+
assert options.initial_reconnection_delay == 1.0
1830+
assert options.max_reconnection_delay == 30.0
1831+
assert options.reconnection_delay_grow_factor == 1.5
1832+
assert options.max_retries == 2
1833+
1834+
1835+
@pytest.mark.anyio
1836+
async def test_streamablehttp_client_with_reconnection_options(
1837+
basic_server: None, basic_server_url: str
1838+
):
1839+
"""Test streamablehttp_client accepts reconnection_options parameter."""
1840+
from mcp.client.streamable_http import StreamableHTTPReconnectionOptions
1841+
1842+
options = StreamableHTTPReconnectionOptions(
1843+
initial_reconnection_delay=0.5,
1844+
max_reconnection_delay=10.0,
1845+
reconnection_delay_grow_factor=1.2,
1846+
max_retries=3,
1847+
)
1848+
1849+
async with streamablehttp_client(
1850+
f"{basic_server_url}/mcp",
1851+
reconnection_options=options,
1852+
) as (
1853+
read_stream,
1854+
write_stream,
1855+
_,
1856+
):
1857+
async with ClientSession(read_stream, write_stream) as session:
1858+
result = await session.initialize()
1859+
assert isinstance(result, InitializeResult)

0 commit comments

Comments
 (0)