Skip to content

Commit f874465

Browse files
committed
Add EventStore and SSE polling support (SEP-1699)
1 parent 07750ef commit f874465

File tree

5 files changed

+437
-3
lines changed

5 files changed

+437
-3
lines changed

src/fastmcp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from fastmcp.server.server import FastMCP
1616
from fastmcp.server.context import Context
17+
from fastmcp.server.event_store import EventStore
1718
import fastmcp.server
1819

1920
from fastmcp.client import Client
@@ -30,6 +31,7 @@
3031
__all__ = [
3132
"Client",
3233
"Context",
34+
"EventStore",
3335
"FastMCP",
3436
"client",
3537
"settings",

src/fastmcp/server/event_store.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
"""EventStore implementation backed by AsyncKeyValue.
2+
3+
This module provides an EventStore implementation that enables SSE polling/resumability
4+
for Streamable HTTP transports. Events are stored using the key_value package's
5+
AsyncKeyValue protocol, allowing users to configure any compatible backend
6+
(in-memory, Redis, etc.) following the same pattern as ResponseCachingMiddleware.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from uuid import uuid4
12+
13+
from key_value.aio.adapters.pydantic import PydanticAdapter
14+
from key_value.aio.protocols import AsyncKeyValue
15+
from key_value.aio.stores.memory import MemoryStore
16+
from mcp.server.streamable_http import EventCallback, EventId, EventMessage, StreamId
17+
from mcp.server.streamable_http import EventStore as SDKEventStore
18+
from mcp.types import JSONRPCMessage
19+
from pydantic import BaseModel
20+
21+
from fastmcp.utilities.logging import get_logger
22+
23+
logger = get_logger(__name__)
24+
25+
26+
class EventEntry(BaseModel):
27+
"""Stored event entry."""
28+
29+
event_id: str
30+
stream_id: str
31+
message: dict | None # JSONRPCMessage serialized to dict
32+
33+
34+
class StreamEventList(BaseModel):
35+
"""List of event IDs for a stream."""
36+
37+
event_ids: list[str]
38+
39+
40+
class EventStore(SDKEventStore):
41+
"""EventStore implementation backed by AsyncKeyValue.
42+
43+
Enables SSE polling/resumability by storing events that can be replayed
44+
when clients reconnect. Works with any AsyncKeyValue backend (memory, Redis, etc.)
45+
following the same pattern as ResponseCachingMiddleware and OAuthProxy.
46+
47+
Example:
48+
```python
49+
from fastmcp import FastMCP
50+
from fastmcp.server.event_store import EventStore
51+
52+
# Default in-memory storage
53+
event_store = EventStore()
54+
55+
# Or with a custom backend
56+
from key_value.aio.stores.redis import RedisStore
57+
redis_backend = RedisStore(url="redis://localhost")
58+
event_store = EventStore(storage=redis_backend)
59+
60+
mcp = FastMCP("MyServer")
61+
app = mcp.http_app(event_store=event_store, retry_interval=2000)
62+
```
63+
64+
Args:
65+
storage: AsyncKeyValue backend. Defaults to MemoryStore.
66+
max_events_per_stream: Maximum events to retain per stream. Default 100.
67+
ttl: Event TTL in seconds. Default 3600 (1 hour). Set to None for no expiration.
68+
"""
69+
70+
def __init__(
71+
self,
72+
storage: AsyncKeyValue | None = None,
73+
max_events_per_stream: int = 100,
74+
ttl: int | None = 3600,
75+
):
76+
self._storage: AsyncKeyValue = storage or MemoryStore()
77+
self._max_events_per_stream = max_events_per_stream
78+
self._ttl = ttl
79+
80+
# PydanticAdapter for type-safe storage (following OAuth proxy pattern)
81+
self._event_store: PydanticAdapter[EventEntry] = PydanticAdapter[EventEntry](
82+
key_value=self._storage,
83+
pydantic_model=EventEntry,
84+
default_collection="fastmcp_events",
85+
)
86+
self._stream_store: PydanticAdapter[StreamEventList] = PydanticAdapter[
87+
StreamEventList
88+
](
89+
key_value=self._storage,
90+
pydantic_model=StreamEventList,
91+
default_collection="fastmcp_streams",
92+
)
93+
94+
async def store_event(
95+
self, stream_id: StreamId, message: JSONRPCMessage | None
96+
) -> EventId:
97+
"""Store an event and return its ID.
98+
99+
Args:
100+
stream_id: ID of the stream the event belongs to
101+
message: The JSON-RPC message to store, or None for priming events
102+
103+
Returns:
104+
The generated event ID for the stored event
105+
"""
106+
event_id = str(uuid4())
107+
108+
# Store the event entry
109+
entry = EventEntry(
110+
event_id=event_id,
111+
stream_id=stream_id,
112+
message=message.model_dump(mode="json") if message else None,
113+
)
114+
await self._event_store.put(key=event_id, value=entry, ttl=self._ttl)
115+
116+
# Update stream's event list
117+
stream_data = await self._stream_store.get(key=stream_id)
118+
event_ids = stream_data.event_ids if stream_data else []
119+
event_ids.append(event_id)
120+
121+
# Trim to max events (delete old events)
122+
if len(event_ids) > self._max_events_per_stream:
123+
for old_id in event_ids[: -self._max_events_per_stream]:
124+
await self._event_store.delete(key=old_id)
125+
event_ids = event_ids[-self._max_events_per_stream :]
126+
127+
await self._stream_store.put(
128+
key=stream_id,
129+
value=StreamEventList(event_ids=event_ids),
130+
ttl=self._ttl,
131+
)
132+
133+
return event_id
134+
135+
async def replay_events_after(
136+
self,
137+
last_event_id: EventId,
138+
send_callback: EventCallback,
139+
) -> StreamId | None:
140+
"""Replay events that occurred after the specified event ID.
141+
142+
Args:
143+
last_event_id: The ID of the last event the client received
144+
send_callback: A callback function to send events to the client
145+
146+
Returns:
147+
The stream ID of the replayed events, or None if the event ID was not found
148+
"""
149+
# Look up the event to find its stream
150+
entry = await self._event_store.get(key=last_event_id)
151+
if not entry:
152+
logger.warning(f"Event ID {last_event_id} not found in store")
153+
return None
154+
155+
stream_id = entry.stream_id
156+
stream_data = await self._stream_store.get(key=stream_id)
157+
if not stream_data:
158+
logger.warning(f"Stream {stream_id} not found in store")
159+
return None
160+
161+
event_ids = stream_data.event_ids
162+
163+
# Find events after last_event_id
164+
try:
165+
start_idx = event_ids.index(last_event_id) + 1
166+
except ValueError:
167+
logger.warning(f"Event ID {last_event_id} not found in stream {stream_id}")
168+
return None
169+
170+
# Replay events after the last one
171+
for event_id in event_ids[start_idx:]:
172+
event = await self._event_store.get(key=event_id)
173+
if event and event.message:
174+
msg = JSONRPCMessage.model_validate(event.message)
175+
await send_callback(EventMessage(msg, event.event_id))
176+
177+
return stream_id

src/fastmcp/server/http.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def create_streamable_http_app(
275275
server: FastMCP[LifespanResultT],
276276
streamable_http_path: str,
277277
event_store: EventStore | None = None,
278+
retry_interval: int | None = None,
278279
auth: AuthProvider | None = None,
279280
json_response: bool = False,
280281
stateless_http: bool = False,
@@ -287,7 +288,10 @@ def create_streamable_http_app(
287288
Args:
288289
server: The FastMCP server instance
289290
streamable_http_path: Path for StreamableHTTP connections
290-
event_store: Optional event store for session management
291+
event_store: Optional event store for SSE polling/resumability
292+
retry_interval: Optional retry interval in milliseconds for SSE polling.
293+
Controls how quickly clients should reconnect after server-initiated
294+
disconnections. Requires event_store to be set. Defaults to SDK default.
291295
auth: Optional authentication provider (AuthProvider)
292296
json_response: Whether to use JSON response format
293297
stateless_http: Whether to use stateless mode (new transport per request)
@@ -305,6 +309,7 @@ def create_streamable_http_app(
305309
session_manager = StreamableHTTPSessionManager(
306310
app=server._mcp_server,
307311
event_store=event_store,
312+
retry_interval=retry_interval,
308313
json_response=json_response,
309314
stateless=stateless_http,
310315
)

src/fastmcp/server/server.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from fastmcp.resources.resource_manager import ResourceManager
6565
from fastmcp.resources.template import FunctionResourceTemplate, ResourceTemplate
6666
from fastmcp.server.auth import AuthProvider
67+
from fastmcp.server.event_store import EventStore
6768
from fastmcp.server.http import (
6869
StarletteWithLifespan,
6970
create_sse_app,
@@ -2346,13 +2347,24 @@ def http_app(
23462347
json_response: bool | None = None,
23472348
stateless_http: bool | None = None,
23482349
transport: Literal["http", "streamable-http", "sse"] = "http",
2350+
event_store: EventStore | None = None,
2351+
retry_interval: int | None = None,
23492352
) -> StarletteWithLifespan:
23502353
"""Create a Starlette app using the specified HTTP transport.
23512354
23522355
Args:
23532356
path: The path for the HTTP endpoint
23542357
middleware: A list of middleware to apply to the app
2355-
transport: Transport protocol to use - either "streamable-http" (default) or "sse"
2358+
json_response: Whether to use JSON response format
2359+
stateless_http: Whether to use stateless mode (new transport per request)
2360+
transport: Transport protocol to use - "http", "streamable-http", or "sse"
2361+
event_store: Optional event store for SSE polling/resumability. When set,
2362+
enables clients to reconnect and resume receiving events after
2363+
server-initiated disconnections. Only used with streamable-http transport.
2364+
retry_interval: Optional retry interval in milliseconds for SSE polling.
2365+
Controls how quickly clients should reconnect after server-initiated
2366+
disconnections. Requires event_store to be set. Only used with
2367+
streamable-http transport.
23562368
23572369
Returns:
23582370
A Starlette application configured with the specified transport
@@ -2363,7 +2375,8 @@ def http_app(
23632375
server=self,
23642376
streamable_http_path=path
23652377
or self._deprecated_settings.streamable_http_path,
2366-
event_store=None,
2378+
event_store=event_store,
2379+
retry_interval=retry_interval,
23672380
auth=self.auth,
23682381
json_response=(
23692382
json_response

0 commit comments

Comments
 (0)