-
Notifications
You must be signed in to change notification settings - Fork 2.3k
StreamableHttp -- resumability support for servers #587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 43 commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
2b95598
initial streamable http server
ihrpr 3d790f8
add request validation and tests
ihrpr 27bc01e
session management
ihrpr 3c4cf10
terminations of a session
ihrpr bce74b3
fix cleaning up
ihrpr 2011579
add happy path test
ihrpr 2cebf08
tests
ihrpr 6c9c320
json mode
ihrpr ede8cde
clean up
ihrpr 2a3bed8
fix example server
ihrpr 0456b1b
return 405 for get stream
ihrpr 97ca48d
speed up tests
ihrpr f738cbf
stateless implemetation
ihrpr 92d4287
format
ihrpr aa9f6e5
uv lock
ihrpr 2fba7f3
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr 45723ea
simplify readme
ihrpr 6b7a616
clean up
ihrpr b1be691
get sse
ihrpr 201ec99
uv lock
ihrpr 46ec72d
clean up
ihrpr 1902abb
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr da1df74
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr c2be5af
streamable http client
ihrpr 9b096dc
add comments to server example where we use related_request_id
ihrpr bbe79c2
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr a0a9c5b
small fixes
ihrpr a5ac2e0
use mta field for related_request_id
ihrpr 2e615f3
unrelated test and format
ihrpr 110526d
clean up
ihrpr 7ffd5ba
terminate session
ihrpr 029ec56
format
ihrpr cae32e2
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr 58745c7
remove useless sleep
ihrpr 1387929
rename require_initialization to standalone_mode
ihrpr bccff75
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr dd007d7
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr 6482120
remove redundant check for initialize and session
ihrpr 9a6da2e
ruff check
ihrpr b957fad
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr 3f5fd7e
support for resumability - server
ihrpr b193242
format
ihrpr 6110435
remove print
ihrpr e087283
rename files to follow python naming
ihrpr 08247c4
update to use time delta in client
ihrpr 0484dfb
refactor
ihrpr 88ff2ba
Merge branch 'ihrpr/client' into ihrpr/resumability-server
ihrpr 5757f6c
small fixes
ihrpr ee28ad8
improve event store example implementation
ihrpr 5dbddeb
refactor _create_event_data
ihrpr ff70bd6
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr 179fbc8
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr a979864
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr 11b7dd9
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr 67a899c
Merge branch 'ihrpr/client' into ihrpr/resumability-server
ihrpr 2dda87e
Merge branch 'main' into ihrpr/resumability-server
ihrpr 2697f14
apply suggested changes
ihrpr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
41 changes: 41 additions & 0 deletions
41
examples/servers/simple-streamablehttp-stateless/README.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# MCP Simple StreamableHttp Stateless Server Example | ||
|
||
A stateless MCP server example demonstrating the StreamableHttp transport without maintaining session state. This example is ideal for understanding how to deploy MCP servers in multi-node environments where requests can be routed to any instance. | ||
|
||
## Features | ||
|
||
- Uses the StreamableHTTP transport in stateless mode (mcp_session_id=None) | ||
- Each request creates a new ephemeral connection | ||
- No session state maintained between requests | ||
- Task lifecycle scoped to individual requests | ||
- Suitable for deployment in multi-node environments | ||
|
||
|
||
## Usage | ||
|
||
Start the server: | ||
|
||
```bash | ||
# Using default port 3000 | ||
uv run mcp-simple-streamablehttp-stateless | ||
|
||
# Using custom port | ||
uv run mcp-simple-streamablehttp-stateless --port 3000 | ||
|
||
# Custom logging level | ||
uv run mcp-simple-streamablehttp-stateless --log-level DEBUG | ||
|
||
# Enable JSON responses instead of SSE streams | ||
uv run mcp-simple-streamablehttp-stateless --json-response | ||
``` | ||
|
||
The server exposes a tool named "start-notification-stream" that accepts three arguments: | ||
|
||
- `interval`: Time between notifications in seconds (e.g., 1.0) | ||
- `count`: Number of notifications to send (e.g., 5) | ||
- `caller`: Identifier string for the caller | ||
|
||
|
||
## Client | ||
|
||
You can connect to this server using an HTTP client. For now, only the TypeScript SDK has streamable HTTP client examples, or you can use [Inspector](https://github.com/modelcontextprotocol/inspector) for testing. |
Empty file.
4 changes: 4 additions & 0 deletions
4
...s/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__main__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .server import main | ||
|
||
if __name__ == "__main__": | ||
main() |
168 changes: 168 additions & 0 deletions
168
...les/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
import contextlib | ||
import logging | ||
|
||
import anyio | ||
import click | ||
import mcp.types as types | ||
from mcp.server.lowlevel import Server | ||
from mcp.server.streamableHttp import ( | ||
StreamableHTTPServerTransport, | ||
) | ||
from starlette.applications import Starlette | ||
from starlette.routing import Mount | ||
|
||
logger = logging.getLogger(__name__) | ||
# Global task group that will be initialized in the lifespan | ||
task_group = None | ||
|
||
|
||
@contextlib.asynccontextmanager | ||
async def lifespan(app): | ||
"""Application lifespan context manager for managing task group.""" | ||
global task_group | ||
|
||
async with anyio.create_task_group() as tg: | ||
task_group = tg | ||
logger.info("Application started, task group initialized!") | ||
try: | ||
yield | ||
finally: | ||
logger.info("Application shutting down, cleaning up resources...") | ||
if task_group: | ||
tg.cancel_scope.cancel() | ||
task_group = None | ||
logger.info("Resources cleaned up successfully.") | ||
|
||
|
||
@click.command() | ||
@click.option("--port", default=3000, help="Port to listen on for HTTP") | ||
@click.option( | ||
"--log-level", | ||
default="INFO", | ||
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", | ||
) | ||
@click.option( | ||
"--json-response", | ||
is_flag=True, | ||
default=False, | ||
help="Enable JSON responses instead of SSE streams", | ||
) | ||
def main( | ||
port: int, | ||
log_level: str, | ||
json_response: bool, | ||
) -> int: | ||
# Configure logging | ||
logging.basicConfig( | ||
level=getattr(logging, log_level.upper()), | ||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | ||
) | ||
|
||
app = Server("mcp-streamable-http-stateless-demo") | ||
|
||
@app.call_tool() | ||
async def call_tool( | ||
name: str, arguments: dict | ||
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: | ||
ctx = app.request_context | ||
interval = arguments.get("interval", 1.0) | ||
count = arguments.get("count", 5) | ||
caller = arguments.get("caller", "unknown") | ||
|
||
# Send the specified number of notifications with the given interval | ||
for i in range(count): | ||
await ctx.session.send_log_message( | ||
level="info", | ||
data=f"Notification {i+1}/{count} from caller: {caller}", | ||
logger="notification_stream", | ||
related_request_id=ctx.request_id, | ||
) | ||
if i < count - 1: # Don't wait after the last notification | ||
await anyio.sleep(interval) | ||
|
||
return [ | ||
types.TextContent( | ||
type="text", | ||
text=( | ||
f"Sent {count} notifications with {interval}s interval" | ||
f" for caller: {caller}" | ||
), | ||
) | ||
] | ||
|
||
@app.list_tools() | ||
async def list_tools() -> list[types.Tool]: | ||
return [ | ||
types.Tool( | ||
name="start-notification-stream", | ||
description=( | ||
"Sends a stream of notifications with configurable count" | ||
" and interval" | ||
), | ||
inputSchema={ | ||
"type": "object", | ||
"required": ["interval", "count", "caller"], | ||
"properties": { | ||
"interval": { | ||
"type": "number", | ||
"description": "Interval between notifications in seconds", | ||
}, | ||
"count": { | ||
"type": "number", | ||
"description": "Number of notifications to send", | ||
}, | ||
"caller": { | ||
"type": "string", | ||
"description": ( | ||
"Identifier of the caller to include in notifications" | ||
), | ||
}, | ||
}, | ||
}, | ||
) | ||
] | ||
|
||
# ASGI handler for stateless HTTP connections | ||
async def handle_streamable_http(scope, receive, send): | ||
logger.debug("Creating new transport") | ||
# Use lock to prevent race conditions when creating new sessions | ||
http_transport = StreamableHTTPServerTransport( | ||
mcp_session_id=None, | ||
is_json_response_enabled=json_response, | ||
) | ||
async with http_transport.connect() as streams: | ||
read_stream, write_stream = streams | ||
|
||
if not task_group: | ||
raise RuntimeError("Task group is not initialized") | ||
|
||
async def run_server(): | ||
await app.run( | ||
read_stream, | ||
write_stream, | ||
app.create_initialization_options(), | ||
# Runs in standalone mode for stateless deployments | ||
# where clients perform initialization with any node | ||
standalone_mode=True, | ||
) | ||
|
||
# Start server task | ||
task_group.start_soon(run_server) | ||
|
||
# Handle the HTTP request and return the response | ||
await http_transport.handle_request(scope, receive, send) | ||
|
||
# Create an ASGI application using the transport | ||
starlette_app = Starlette( | ||
debug=True, | ||
routes=[ | ||
Mount("/mcp", app=handle_streamable_http), | ||
], | ||
lifespan=lifespan, | ||
) | ||
|
||
import uvicorn | ||
|
||
uvicorn.run(starlette_app, host="0.0.0.0", port=port) | ||
|
||
return 0 |
36 changes: 36 additions & 0 deletions
36
examples/servers/simple-streamablehttp-stateless/pyproject.toml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
[project] | ||
name = "mcp-simple-streamablehttp-stateless" | ||
version = "0.1.0" | ||
description = "A simple MCP server exposing a StreamableHttp transport in stateless mode" | ||
readme = "README.md" | ||
requires-python = ">=3.10" | ||
authors = [{ name = "Anthropic, PBC." }] | ||
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable", "stateless"] | ||
license = { text = "MIT" } | ||
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"] | ||
|
||
[project.scripts] | ||
mcp-simple-streamablehttp-stateless = "mcp_simple_streamablehttp_stateless.server:main" | ||
|
||
[build-system] | ||
requires = ["hatchling"] | ||
build-backend = "hatchling.build" | ||
|
||
[tool.hatch.build.targets.wheel] | ||
packages = ["mcp_simple_streamablehttp_stateless"] | ||
|
||
[tool.pyright] | ||
include = ["mcp_simple_streamablehttp_stateless"] | ||
venvPath = "." | ||
venv = ".venv" | ||
|
||
[tool.ruff.lint] | ||
select = ["E", "F", "I"] | ||
ignore = [] | ||
|
||
[tool.ruff] | ||
line-length = 88 | ||
target-version = "py310" | ||
|
||
[tool.uv] | ||
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# MCP Simple StreamableHttp Server Example | ||
|
||
A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming. | ||
|
||
## Features | ||
|
||
- Uses the StreamableHTTP transport for server-client communication | ||
- Supports REST API operations (POST, GET, DELETE) for `/mcp` endpoint | ||
- Task management with anyio task groups | ||
- Ability to send multiple notifications over time to the client | ||
- Proper resource cleanup and lifespan management | ||
- Resumability support via InMemoryEventStore | ||
|
||
## Usage | ||
|
||
Start the server on the default or custom port: | ||
|
||
```bash | ||
|
||
# Using custom port | ||
uv run mcp-simple-streamablehttp --port 3000 | ||
|
||
# Custom logging level | ||
uv run mcp-simple-streamablehttp --log-level DEBUG | ||
|
||
# Enable JSON responses instead of SSE streams | ||
uv run mcp-simple-streamablehttp --json-response | ||
``` | ||
|
||
The server exposes a tool named "start-notification-stream" that accepts three arguments: | ||
|
||
- `interval`: Time between notifications in seconds (e.g., 1.0) | ||
- `count`: Number of notifications to send (e.g., 5) | ||
- `caller`: Identifier string for the caller | ||
|
||
## Resumability Support | ||
|
||
This server includes resumability support through the InMemoryEventStore. This enables clients to: | ||
|
||
- Reconnect to the server after a disconnection | ||
- Resume event streaming from where they left off using the Last-Event-ID header | ||
|
||
|
||
The server will: | ||
- Generate unique event IDs for each SSE message | ||
- Store events in memory for later replay | ||
- Replay missed events when a client reconnects with a Last-Event-ID header | ||
|
||
Note: The InMemoryEventStore is designed for demonstration purposes only. For production use, consider implementing a persistent storage solution. | ||
|
||
|
||
|
||
## Client | ||
|
||
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use [Inspector](https://github.com/modelcontextprotocol/inspector) |
Empty file.
4 changes: 4 additions & 0 deletions
4
examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/__main__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .server import main | ||
|
||
if __name__ == "__main__": | ||
main() |
75 changes: 75 additions & 0 deletions
75
examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/event_store.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
""" | ||
In-memory event store for demonstrating resumability functionality. | ||
|
||
This is a simple implementation intended for examples and testing, | ||
not for production use where a persistent storage solution would be more appropriate. | ||
""" | ||
|
||
import logging | ||
import time | ||
from collections.abc import Awaitable, Callable | ||
from uuid import uuid4 | ||
|
||
from mcp.server.streamableHttp import EventId, EventStore, StreamId | ||
from mcp.types import JSONRPCMessage | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class InMemoryEventStore(EventStore): | ||
""" | ||
Simple in-memory implementation of the EventStore interface for resumability. | ||
This is primarily intended for examples and testing, not for production use | ||
where a persistent storage solution would be more appropriate. | ||
""" | ||
|
||
def __init__(self): | ||
self.events: dict[ | ||
str, tuple[str, JSONRPCMessage, float] | ||
] = {} # event_id -> (stream_id, message, timestamp) | ||
ihrpr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def store_event( | ||
self, stream_id: StreamId, message: JSONRPCMessage | ||
) -> EventId: | ||
"""Stores an event with a generated event ID.""" | ||
event_id = str(uuid4()) | ||
self.events[event_id] = (stream_id, message, time.time()) | ||
ihrpr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return event_id | ||
|
||
async def replay_events_after( | ||
self, | ||
last_event_id: EventId, | ||
send_callback: Callable[[EventId, JSONRPCMessage], Awaitable[None]], | ||
) -> StreamId: | ||
"""Replays events that occurred after the specified event ID.""" | ||
logger.debug(f"Attempting to replay events after {last_event_id}") | ||
logger.debug(f"Total events in store: {len(self.events)}") | ||
logger.debug(f"Event IDs in store: {list(self.events.keys())}") | ||
|
||
if not last_event_id or last_event_id not in self.events: | ||
logger.warning(f"Event ID {last_event_id} not found in store") | ||
return "" | ||
|
||
# Get the stream ID and timestamp from the last event | ||
stream_id, _, last_timestamp = self.events[last_event_id] | ||
|
||
# Find all events for this stream after the last event | ||
events_to_replay = [ | ||
(event_id, message) | ||
for event_id, (sid, message, timestamp) in self.events.items() | ||
if sid == stream_id and timestamp > last_timestamp | ||
] | ||
ihrpr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Sort by timestamp to ensure chronological order | ||
events_to_replay.sort(key=lambda x: self.events[x[0]][2]) | ||
ihrpr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
logger.debug(f"Found {len(events_to_replay)} events to replay") | ||
logger.debug( | ||
f"Events to replay: {[event_id for event_id, _ in events_to_replay]}" | ||
) | ||
|
||
# Send all events in order | ||
for event_id, message in events_to_replay: | ||
await send_callback(event_id, message) | ||
|
||
return stream_id |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.