Skip to content

Commit a39b353

Browse files
authored
SEP-1699: Add SSE polling support with EventStore (#2564)
* Add EventStore and SSE polling support (SEP-1699) * Add close_sse_stream() method to Context * Add SSE polling documentation * Fix missing Context import in docs example * Remove EventStore from root __init__.py, update docs imports - Removed EventStore import and export from src/fastmcp/__init__.py - Updated docs to import EventStore from fastmcp.server.event_store - Resolves merge conflict by not exporting EventStore from root package
1 parent cf67152 commit a39b353

File tree

6 files changed

+547
-3
lines changed

6 files changed

+547
-3
lines changed

docs/deployment/http.mdx

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,79 @@ Without `expose_headers=["mcp-session-id"]`, browsers will receive the session I
198198
**Production Security**: Never use `allow_origins=["*"]` in production. Specify the exact origins of your browser-based clients. Using wildcards exposes your server to unauthorized access from any website.
199199
</Warning>
200200

201+
### SSE Polling for Long-Running Operations
202+
203+
<VersionBadge version="2.14.0" />
204+
205+
<Note>
206+
This feature only applies to the **StreamableHTTP transport** (the default for `http_app()`). It does not apply to the legacy SSE transport (`transport="sse"`).
207+
</Note>
208+
209+
When running tools that take a long time to complete, you may encounter issues with load balancers or proxies terminating connections that stay idle too long. [SEP-1699](https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699) introduces SSE polling to solve this by allowing the server to gracefully close connections and have clients automatically reconnect.
210+
211+
To enable SSE polling, configure an `EventStore` when creating your HTTP application:
212+
213+
```python
214+
from fastmcp import FastMCP, Context
215+
from fastmcp.server.event_store import EventStore
216+
217+
mcp = FastMCP("My Server")
218+
219+
@mcp.tool
220+
async def long_running_task(ctx: Context) -> str:
221+
"""A task that takes several minutes to complete."""
222+
for i in range(100):
223+
await ctx.report_progress(i, 100)
224+
225+
# Periodically close the connection to avoid load balancer timeouts
226+
# Client will automatically reconnect and resume receiving progress
227+
if i % 30 == 0 and i > 0:
228+
await ctx.close_sse_stream()
229+
230+
await do_expensive_work()
231+
232+
return "Done!"
233+
234+
# Configure with EventStore for resumability
235+
event_store = EventStore()
236+
app = mcp.http_app(
237+
event_store=event_store,
238+
retry_interval=2000, # Client reconnects after 2 seconds
239+
)
240+
```
241+
242+
**How it works:**
243+
244+
1. When `event_store` is configured, the server stores all events (progress updates, results) with unique IDs
245+
2. Calling `ctx.close_sse_stream()` gracefully closes the HTTP connection
246+
3. The client automatically reconnects with a `Last-Event-ID` header
247+
4. The server replays any events the client missed during the disconnection
248+
249+
The `retry_interval` parameter (in milliseconds) controls how long clients wait before reconnecting. Choose a value that balances responsiveness with server load.
250+
251+
<Note>
252+
`close_sse_stream()` is a no-op if called without an `EventStore` configured, so you can safely include it in tools that may run in different deployment configurations.
253+
</Note>
254+
255+
#### Custom Storage Backends
256+
257+
By default, `EventStore` uses in-memory storage. For production deployments with multiple server instances, you can provide a custom storage backend using the `key_value` package:
258+
259+
```python
260+
from fastmcp.server.event_store import EventStore
261+
from key_value.aio.stores.redis import RedisStore
262+
263+
# Use Redis for distributed deployments
264+
redis_store = RedisStore(url="redis://localhost:6379")
265+
event_store = EventStore(
266+
storage=redis_store,
267+
max_events_per_stream=100, # Keep last 100 events per stream
268+
ttl=3600, # Events expire after 1 hour
269+
)
270+
271+
app = mcp.http_app(event_store=event_store)
272+
```
273+
201274
## Integration with Web Frameworks
202275

203276
If you already have a web application running, you can add MCP capabilities by mounting a FastMCP server as a sub-application. This allows you to expose MCP tools alongside your existing API endpoints, sharing the same domain and infrastructure. The MCP server becomes just another route in your application, making it easy to manage and deploy.

src/fastmcp/server/context.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,45 @@ async def send_prompt_list_changed(self) -> None:
486486
"""Send a prompt list changed notification to the client."""
487487
await self.session.send_prompt_list_changed()
488488

489+
async def close_sse_stream(self) -> None:
490+
"""Close the current response stream to trigger client reconnection.
491+
492+
When using StreamableHTTP transport with an EventStore configured, this
493+
method gracefully closes the HTTP connection for the current request.
494+
The client will automatically reconnect (after `retry_interval` milliseconds)
495+
and resume receiving events from where it left off via the EventStore.
496+
497+
This is useful for long-running operations to avoid load balancer timeouts.
498+
Instead of holding a connection open for minutes, you can periodically close
499+
and let the client reconnect.
500+
501+
Example:
502+
```python
503+
@mcp.tool
504+
async def long_running_task(ctx: Context) -> str:
505+
for i in range(100):
506+
await ctx.report_progress(i, 100)
507+
508+
# Close connection every 30 iterations to avoid LB timeouts
509+
if i % 30 == 0 and i > 0:
510+
await ctx.close_sse_stream()
511+
512+
await do_work()
513+
return "Done"
514+
```
515+
516+
Note:
517+
This is a no-op (with a debug log) if not using StreamableHTTP
518+
transport with an EventStore configured.
519+
"""
520+
if not self.request_context or not self.request_context.close_sse_stream:
521+
logger.debug(
522+
"close_sse_stream() called but not applicable "
523+
"(requires StreamableHTTP transport with event_store)"
524+
)
525+
return
526+
await self.request_context.close_sse_stream()
527+
489528
async def sample(
490529
self,
491530
messages: str | Sequence[str | SamplingMessage],

src/fastmcp/server/event_store.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""EventStore implementation backed by AsyncKeyValue.
2+
3+
This module provides an EventStore implementation that enables SSE polling/resumability
4+
for Streamable HTTP transports. Events are stored using the key_value package's
5+
AsyncKeyValue protocol, allowing users to configure any compatible backend
6+
(in-memory, Redis, etc.) following the same pattern as ResponseCachingMiddleware.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from uuid import uuid4
12+
13+
from key_value.aio.adapters.pydantic import PydanticAdapter
14+
from key_value.aio.protocols import AsyncKeyValue
15+
from key_value.aio.stores.memory import MemoryStore
16+
from mcp.server.streamable_http import EventCallback, EventId, EventMessage, StreamId
17+
from mcp.server.streamable_http import EventStore as SDKEventStore
18+
from mcp.types import JSONRPCMessage
19+
from pydantic import BaseModel
20+
21+
from fastmcp.utilities.logging import get_logger
22+
23+
logger = get_logger(__name__)
24+
25+
26+
class EventEntry(BaseModel):
27+
"""Stored event entry."""
28+
29+
event_id: str
30+
stream_id: str
31+
message: dict | None # JSONRPCMessage serialized to dict
32+
33+
34+
class StreamEventList(BaseModel):
35+
"""List of event IDs for a stream."""
36+
37+
event_ids: list[str]
38+
39+
40+
class EventStore(SDKEventStore):
41+
"""EventStore implementation backed by AsyncKeyValue.
42+
43+
Enables SSE polling/resumability by storing events that can be replayed
44+
when clients reconnect. Works with any AsyncKeyValue backend (memory, Redis, etc.)
45+
following the same pattern as ResponseCachingMiddleware and OAuthProxy.
46+
47+
Example:
48+
```python
49+
from fastmcp import FastMCP
50+
from fastmcp.server.event_store import EventStore
51+
52+
# Default in-memory storage
53+
event_store = EventStore()
54+
55+
# Or with a custom backend
56+
from key_value.aio.stores.redis import RedisStore
57+
redis_backend = RedisStore(url="redis://localhost")
58+
event_store = EventStore(storage=redis_backend)
59+
60+
mcp = FastMCP("MyServer")
61+
app = mcp.http_app(event_store=event_store, retry_interval=2000)
62+
```
63+
64+
Args:
65+
storage: AsyncKeyValue backend. Defaults to MemoryStore.
66+
max_events_per_stream: Maximum events to retain per stream. Default 100.
67+
ttl: Event TTL in seconds. Default 3600 (1 hour). Set to None for no expiration.
68+
"""
69+
70+
def __init__(
71+
self,
72+
storage: AsyncKeyValue | None = None,
73+
max_events_per_stream: int = 100,
74+
ttl: int | None = 3600,
75+
):
76+
self._storage: AsyncKeyValue = storage or MemoryStore()
77+
self._max_events_per_stream = max_events_per_stream
78+
self._ttl = ttl
79+
80+
# PydanticAdapter for type-safe storage (following OAuth proxy pattern)
81+
self._event_store: PydanticAdapter[EventEntry] = PydanticAdapter[EventEntry](
82+
key_value=self._storage,
83+
pydantic_model=EventEntry,
84+
default_collection="fastmcp_events",
85+
)
86+
self._stream_store: PydanticAdapter[StreamEventList] = PydanticAdapter[
87+
StreamEventList
88+
](
89+
key_value=self._storage,
90+
pydantic_model=StreamEventList,
91+
default_collection="fastmcp_streams",
92+
)
93+
94+
async def store_event(
95+
self, stream_id: StreamId, message: JSONRPCMessage | None
96+
) -> EventId:
97+
"""Store an event and return its ID.
98+
99+
Args:
100+
stream_id: ID of the stream the event belongs to
101+
message: The JSON-RPC message to store, or None for priming events
102+
103+
Returns:
104+
The generated event ID for the stored event
105+
"""
106+
event_id = str(uuid4())
107+
108+
# Store the event entry
109+
entry = EventEntry(
110+
event_id=event_id,
111+
stream_id=stream_id,
112+
message=message.model_dump(mode="json") if message else None,
113+
)
114+
await self._event_store.put(key=event_id, value=entry, ttl=self._ttl)
115+
116+
# Update stream's event list
117+
stream_data = await self._stream_store.get(key=stream_id)
118+
event_ids = stream_data.event_ids if stream_data else []
119+
event_ids.append(event_id)
120+
121+
# Trim to max events (delete old events)
122+
if len(event_ids) > self._max_events_per_stream:
123+
for old_id in event_ids[: -self._max_events_per_stream]:
124+
await self._event_store.delete(key=old_id)
125+
event_ids = event_ids[-self._max_events_per_stream :]
126+
127+
await self._stream_store.put(
128+
key=stream_id,
129+
value=StreamEventList(event_ids=event_ids),
130+
ttl=self._ttl,
131+
)
132+
133+
return event_id
134+
135+
async def replay_events_after(
136+
self,
137+
last_event_id: EventId,
138+
send_callback: EventCallback,
139+
) -> StreamId | None:
140+
"""Replay events that occurred after the specified event ID.
141+
142+
Args:
143+
last_event_id: The ID of the last event the client received
144+
send_callback: A callback function to send events to the client
145+
146+
Returns:
147+
The stream ID of the replayed events, or None if the event ID was not found
148+
"""
149+
# Look up the event to find its stream
150+
entry = await self._event_store.get(key=last_event_id)
151+
if not entry:
152+
logger.warning(f"Event ID {last_event_id} not found in store")
153+
return None
154+
155+
stream_id = entry.stream_id
156+
stream_data = await self._stream_store.get(key=stream_id)
157+
if not stream_data:
158+
logger.warning(f"Stream {stream_id} not found in store")
159+
return None
160+
161+
event_ids = stream_data.event_ids
162+
163+
# Find events after last_event_id
164+
try:
165+
start_idx = event_ids.index(last_event_id) + 1
166+
except ValueError:
167+
logger.warning(f"Event ID {last_event_id} not found in stream {stream_id}")
168+
return None
169+
170+
# Replay events after the last one
171+
for event_id in event_ids[start_idx:]:
172+
event = await self._event_store.get(key=event_id)
173+
if event and event.message:
174+
msg = JSONRPCMessage.model_validate(event.message)
175+
await send_callback(EventMessage(msg, event.event_id))
176+
177+
return stream_id

src/fastmcp/server/http.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def create_streamable_http_app(
275275
server: FastMCP[LifespanResultT],
276276
streamable_http_path: str,
277277
event_store: EventStore | None = None,
278+
retry_interval: int | None = None,
278279
auth: AuthProvider | None = None,
279280
json_response: bool = False,
280281
stateless_http: bool = False,
@@ -287,7 +288,10 @@ def create_streamable_http_app(
287288
Args:
288289
server: The FastMCP server instance
289290
streamable_http_path: Path for StreamableHTTP connections
290-
event_store: Optional event store for session management
291+
event_store: Optional event store for SSE polling/resumability
292+
retry_interval: Optional retry interval in milliseconds for SSE polling.
293+
Controls how quickly clients should reconnect after server-initiated
294+
disconnections. Requires event_store to be set. Defaults to SDK default.
291295
auth: Optional authentication provider (AuthProvider)
292296
json_response: Whether to use JSON response format
293297
stateless_http: Whether to use stateless mode (new transport per request)
@@ -305,6 +309,7 @@ def create_streamable_http_app(
305309
session_manager = StreamableHTTPSessionManager(
306310
app=server._mcp_server,
307311
event_store=event_store,
312+
retry_interval=retry_interval,
308313
json_response=json_response,
309314
stateless=stateless_http,
310315
)

src/fastmcp/server/server.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from fastmcp.resources.resource_manager import ResourceManager
6868
from fastmcp.resources.template import FunctionResourceTemplate, ResourceTemplate
6969
from fastmcp.server.auth import AuthProvider
70+
from fastmcp.server.event_store import EventStore
7071
from fastmcp.server.http import (
7172
StarletteWithLifespan,
7273
create_sse_app,
@@ -2595,13 +2596,24 @@ def http_app(
25952596
json_response: bool | None = None,
25962597
stateless_http: bool | None = None,
25972598
transport: Literal["http", "streamable-http", "sse"] = "http",
2599+
event_store: EventStore | None = None,
2600+
retry_interval: int | None = None,
25982601
) -> StarletteWithLifespan:
25992602
"""Create a Starlette app using the specified HTTP transport.
26002603
26012604
Args:
26022605
path: The path for the HTTP endpoint
26032606
middleware: A list of middleware to apply to the app
2604-
transport: Transport protocol to use - either "streamable-http" (default) or "sse"
2607+
json_response: Whether to use JSON response format
2608+
stateless_http: Whether to use stateless mode (new transport per request)
2609+
transport: Transport protocol to use - "http", "streamable-http", or "sse"
2610+
event_store: Optional event store for SSE polling/resumability. When set,
2611+
enables clients to reconnect and resume receiving events after
2612+
server-initiated disconnections. Only used with streamable-http transport.
2613+
retry_interval: Optional retry interval in milliseconds for SSE polling.
2614+
Controls how quickly clients should reconnect after server-initiated
2615+
disconnections. Requires event_store to be set. Only used with
2616+
streamable-http transport.
26052617
26062618
Returns:
26072619
A Starlette application configured with the specified transport
@@ -2612,7 +2624,8 @@ def http_app(
26122624
server=self,
26132625
streamable_http_path=path
26142626
or self._deprecated_settings.streamable_http_path,
2615-
event_store=None,
2627+
event_store=event_store,
2628+
retry_interval=retry_interval,
26162629
auth=self.auth,
26172630
json_response=(
26182631
json_response

0 commit comments

Comments
 (0)