Skip to content

Commit 39c02d3

Browse files
committed
Stateful MCP python example with inferencing payloads
1 parent 36cc4df commit 39c02d3

File tree

10 files changed

+736
-0
lines changed

10 files changed

+736
-0
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM python:3.13-slim
2+
3+
WORKDIR /app
4+
RUN pip install uv
5+
COPY mcp_simple_streamablehttp mcp_simple_streamablehttp
6+
COPY pyproject.toml pyproject.toml
7+
COPY README.md README.md
8+
RUN uv sync
9+
10+
CMD ["uv", "--offline", "run", "mcp_simple_streamablehttp", "--port", "8080"]
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# MCP Simple StreamableHttp stateful Server Example
2+
3+
A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming.
4+
This example talks how to enable session id based communication in order to identify a particular user session.
5+
6+
## Features
7+
8+
- Uses the StreamableHTTP transport for server-client communication
9+
- Supports REST API operations (POST, GET, DELETE) for `/mcp` endpoint
10+
- Task management with anyio task groups
11+
- Ability to send multiple notifications over time to the client
12+
- Proper resource cleanup and lifespan management
13+
- Resumability support via InMemoryEventStore
14+
15+
## Usage
16+
17+
Start the server on the default or custom port:
18+
19+
```bash
20+
21+
# Using custom port
22+
uv run mcp-simple-streamablehttp --port 8080
23+
24+
# Custom logging level
25+
uv run mcp-simple-streamablehttp --log-level DEBUG
26+
27+
# Enable JSON responses instead of SSE streams
28+
uv run mcp-simple-streamablehttp --json-response
29+
```
30+
31+
The server exposes a tool named "start-notification-stream" that accepts three arguments:
32+
33+
- `interval`: Time between notifications in seconds (e.g., 1.0)
34+
- `count`: Number of notifications to send (e.g., 5)
35+
- `caller`: Identifier string for the caller
36+
37+
## Resumability Support
38+
39+
This server includes resumability support through the InMemoryEventStore. This enables clients to:
40+
41+
- Reconnect to the server after a disconnection
42+
- Resume event streaming from where they left off using the Last-Event-ID header
43+
44+
45+
The server will:
46+
- Generate unique event IDs for each SSE message
47+
- Store events in memory for later replay
48+
- Replay missed events when a client reconnects with a Last-Event-ID header
49+
50+
Note: The InMemoryEventStore is designed for demonstration purposes only. For production use, consider implementing a persistent storage solution.
51+
52+
53+
54+
## Client
55+
56+
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use [Inspector](https://github.com/modelcontextprotocol/inspector)
57+
58+
## Limitations
59+
As this examples shows session ids in ephermal and in memory state, any update/restart/patching will wipe out session instances.
60+
Also as this is not a distributed architecture, there is no guarantee of session persistence in more than 1 nodes. Users need to implement custom, centralised session management persistence layer.
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import json
2+
3+
import requests
4+
import oci
5+
6+
MCP_SESSION_ID = "mcp-session-id"
7+
8+
def get_auth():
9+
PROFILE_NAME = 'DEFAULT'
10+
SECURITY_TOKEN_FILE_KEY = 'security_token_file'
11+
KEY_FILE_KEY = 'key_file'
12+
config = oci.config.from_file(profile_name=PROFILE_NAME)
13+
token_file = config[SECURITY_TOKEN_FILE_KEY]
14+
token = None
15+
with open(token_file, 'r') as f:
16+
token = f.read()
17+
private_key = oci.signer.load_private_key_from_file(config[KEY_FILE_KEY])
18+
signer = oci.auth.signers.SecurityTokenSigner(token, private_key)
19+
return signer
20+
21+
def initializeSession(url):
22+
request_headers = {
23+
"Accept-Encoding":"gzip, deflate, br, zstd",
24+
"Accept-Language":"en-GB,en-US;q=0.9,en;q=0.8",
25+
"Connection":"keep-alive",
26+
"accept":"application/json, "
27+
"text/event-stream",
28+
"content-type":"application/json"
29+
}
30+
31+
request_body = json.dumps({
32+
"method":"initialize",
33+
"params": {
34+
"protocolVersion":"2025-03-26",
35+
"capabilities": {
36+
"sampling":{},
37+
"roots": {
38+
"listChanged":True
39+
}
40+
},
41+
"clientInfo": {
42+
"name":"oci-cli",
43+
"version":"0.14.0"
44+
}
45+
},
46+
"jsonrpc":"2.0",
47+
"id":0
48+
})
49+
response = requests.request("POST", url, headers=request_headers, data=request_body, auth=get_auth(), verify=False)
50+
print("Initialized instance")
51+
print(response.content)
52+
print(response.headers)
53+
return response.headers[MCP_SESSION_ID]
54+
55+
def initializeGroup(url, mcpSessionId):
56+
request_headers = {
57+
"Accept-Encoding":"gzip, deflate, br, zstd",
58+
"Accept-Language":"en-GB,en-US;q=0.9,en;q=0.8",
59+
"Connection":"keep-alive",
60+
"accept":"application/json, "
61+
"text/event-stream",
62+
"content-type":"application/json",
63+
MCP_SESSION_ID: mcpSessionId
64+
}
65+
66+
request_body = json.dumps({"method":"notifications/initialized","jsonrpc":"2.0"})
67+
response = requests.request("POST", url, headers=request_headers, data=request_body, auth=get_auth(), verify=False)
68+
print("Initialized group")
69+
70+
def listTools(url, mcpSessionId):
71+
request_headers = {
72+
"Accept-Encoding":"gzip, deflate, br, zstd",
73+
"Accept-Language":"en-GB,en-US;q=0.9,en;q=0.8",
74+
"Connection":"keep-alive",
75+
"accept":"application/json, "
76+
"text/event-stream",
77+
"content-type":"application/json",
78+
MCP_SESSION_ID: mcpSessionId
79+
}
80+
81+
request_body = json.dumps({"method":"notifications/initialized","jsonrpc":"2.0"})
82+
response = requests.request("POST", url, headers=request_headers, data=request_body, auth=get_auth(), verify=False)
83+
print("Listing available tools")
84+
print(response)
85+
86+
87+
def callTools(url, mcpSessionId):
88+
request_headers = {
89+
"Accept-Encoding":"gzip, deflate, br, zstd",
90+
"Accept-Language":"en-GB,en-US;q=0.9,en;q=0.8",
91+
"Connection":"keep-alive",
92+
"accept":"application/json, "
93+
"text/event-stream",
94+
"content-type":"application/json",
95+
MCP_SESSION_ID: mcpSessionId
96+
}
97+
98+
request_body = json.dumps({"method":"tools/call","params":{"name":"start-notification-stream","arguments":{"interval":2,"count":5,"caller":"nipun"},"_meta":{"progressToken":2}},"jsonrpc":"2.0","id":2})
99+
100+
with requests.request("POST", url, headers=request_headers, data=request_body, auth=get_auth(), verify=False, stream=True) as response:
101+
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
102+
for chunk in response.iter_content(chunk_size=8192): # 8192 is the default chunk size
103+
if chunk: # Filter out keep-alive new chunks
104+
print(chunk)
105+
106+
if __name__ == "__main__":
107+
predict_url = '<MODEL_DEPLOYMENT_STREAMING_URL..../predictWithResponseStream>'
108+
109+
# Generate MCP session id
110+
mcpSessionId = initializeSession(predict_url)
111+
112+
# initialise group
113+
initializeGroup(predict_url, mcpSessionId)
114+
115+
# list tools
116+
listTools(predict_url, mcpSessionId)
117+
118+
# call streaming tool
119+
callTools(predict_url, mcpSessionId)

model-deployment/mcp-servers/stateful-python-example-server/mcp_simple_streamablehttp/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from server import main
2+
3+
if __name__ == "__main__":
4+
main() # type: ignore[call-arg]
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""
2+
In-memory event store for demonstrating resumability functionality.
3+
4+
This is a simple implementation intended for examples and testing,
5+
not for production use where a persistent storage solution would be more appropriate.
6+
"""
7+
8+
import logging
9+
from collections import deque
10+
from dataclasses import dataclass
11+
from uuid import uuid4
12+
13+
from mcp.server.streamable_http import (
14+
EventCallback,
15+
EventId,
16+
EventMessage,
17+
EventStore,
18+
StreamId,
19+
)
20+
from mcp.types import JSONRPCMessage
21+
22+
logger = logging.getLogger(__name__)
23+
24+
25+
@dataclass
26+
class EventEntry:
27+
"""
28+
Represents an event entry in the event store.
29+
"""
30+
31+
event_id: EventId
32+
stream_id: StreamId
33+
message: JSONRPCMessage
34+
35+
36+
class InMemoryEventStore(EventStore):
37+
"""
38+
Simple in-memory implementation of the EventStore interface for resumability.
39+
This is primarily intended for examples and testing, not for production use
40+
where a persistent storage solution would be more appropriate.
41+
42+
This implementation keeps only the last N events per stream for memory efficiency.
43+
"""
44+
45+
def __init__(self, max_events_per_stream: int = 100):
46+
"""Initialize the event store.
47+
48+
Args:
49+
max_events_per_stream: Maximum number of events to keep per stream
50+
"""
51+
self.max_events_per_stream = max_events_per_stream
52+
# for maintaining last N events per stream
53+
self.streams: dict[StreamId, deque[EventEntry]] = {}
54+
# event_id -> EventEntry for quick lookup
55+
self.event_index: dict[EventId, EventEntry] = {}
56+
57+
async def store_event(
58+
self, stream_id: StreamId, message: JSONRPCMessage
59+
) -> EventId:
60+
"""Stores an event with a generated event ID."""
61+
event_id = str(uuid4())
62+
event_entry = EventEntry(
63+
event_id=event_id, stream_id=stream_id, message=message
64+
)
65+
66+
# Get or create deque for this stream
67+
if stream_id not in self.streams:
68+
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
69+
70+
# If deque is full, the oldest event will be automatically removed
71+
# We need to remove it from the event_index as well
72+
if len(self.streams[stream_id]) == self.max_events_per_stream:
73+
oldest_event = self.streams[stream_id][0]
74+
self.event_index.pop(oldest_event.event_id, None)
75+
76+
# Add new event
77+
self.streams[stream_id].append(event_entry)
78+
self.event_index[event_id] = event_entry
79+
80+
return event_id
81+
82+
async def replay_events_after(
83+
self,
84+
last_event_id: EventId,
85+
send_callback: EventCallback,
86+
) -> StreamId | None:
87+
"""Replays events that occurred after the specified event ID."""
88+
if last_event_id not in self.event_index:
89+
logger.warning(f"Event ID {last_event_id} not found in store")
90+
return None
91+
92+
# Get the stream and find events after the last one
93+
last_event = self.event_index[last_event_id]
94+
stream_id = last_event.stream_id
95+
stream_events = self.streams.get(last_event.stream_id, deque())
96+
97+
# Events in deque are already in chronological order
98+
found_last = False
99+
for event in stream_events:
100+
if found_last:
101+
await send_callback(EventMessage(event.message, event.event_id))
102+
elif event.event_id == last_event_id:
103+
found_last = True
104+
105+
return stream_id

0 commit comments

Comments
 (0)