Skip to content

Commit ab110bc

Browse files
committed
fix http transport to be stateful
1 parent 479bb0a commit ab110bc

File tree

1 file changed

+87
-53
lines changed

1 file changed

+87
-53
lines changed

fastapi_mcp/transport/http.py

Lines changed: 87 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,98 @@
11
import logging
2+
import asyncio
23

34
from fastapi import Request, Response, HTTPException
45
from mcp.server.lowlevel.server import Server
5-
from mcp.server.streamable_http import StreamableHTTPServerTransport
6+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager, EventStore
67
from mcp.server.transport_security import TransportSecuritySettings
78

89
logger = logging.getLogger(__name__)
910

1011

11-
class FastApiStreamableHttpTransport(StreamableHTTPServerTransport):
12+
class FastApiHttpSessionManager:
13+
"""
14+
FastAPI-native wrapper around StreamableHTTPSessionManager
15+
"""
16+
1217
def __init__(
1318
self,
14-
mcp_session_id: str | None = None,
15-
is_json_response_enabled: bool = True, # Default to JSON for HTTP transport
16-
event_store=None,
19+
mcp_server: Server,
20+
event_store: EventStore | None = None,
21+
json_response: bool = True, # Default to JSON for HTTP transport
1722
security_settings: TransportSecuritySettings | None = None,
18-
mcp_server: Server | None = None,
1923
):
20-
super().__init__(
21-
mcp_session_id=mcp_session_id,
22-
is_json_response_enabled=is_json_response_enabled,
23-
event_store=event_store,
24-
security_settings=security_settings,
25-
)
26-
logger.debug(f"FastApiStreamableHttpTransport initialized with session_id: {mcp_session_id}")
27-
self._mcp_server = mcp_server
28-
self._server_running = False
29-
30-
async def handle_fastapi_request(self, request: Request, mcp_server: Server | None = None) -> Response:
24+
self.mcp_server = mcp_server
25+
self.event_store = event_store
26+
self.json_response = json_response
27+
self.security_settings = security_settings
28+
self._session_manager: StreamableHTTPSessionManager | None = None
29+
self._manager_task: asyncio.Task | None = None
30+
self._manager_started = False
31+
self._startup_lock = asyncio.Lock()
32+
33+
async def _ensure_session_manager_started(self) -> None:
3134
"""
32-
The approach here is different from FastApiSseTransport.
33-
In FastApiSseTransport, we reimplement the SSE transport logic to have a more FastAPI-native transport.
34-
It proved to be less bug-prone since it avoids deconstructing and reconstructing raw ASGI objects.
35-
36-
But, we took a different approach here because StreamableHTTPServerTransport handles more complexity,
37-
and multiple request methods (GET/POST/DELETE), so we want to leverage that logic and avoid reimplementing.
35+
Ensure the session manager is started.
3836
39-
We still ensure it works natively with FastAPI by capturing the ASGI response from the SDK and converting
40-
it to a FastAPI Response.
37+
This is called lazily on the first request to start the session manager
38+
if it hasn't been started yet.
4139
"""
42-
logger.debug(f"Handling FastAPI request: {request.method} {request.url.path}")
43-
44-
# Use the stored server if available, or the passed one
45-
server = self._mcp_server or mcp_server
46-
if not server:
47-
raise HTTPException(status_code=500, detail="No MCP server available")
48-
49-
# Initialize the transport if not already done
50-
if not self._server_running:
51-
import anyio
40+
if self._manager_started:
41+
return
42+
43+
async with self._startup_lock:
44+
if self._manager_started:
45+
return
46+
47+
logger.debug("Starting StreamableHTTP session manager")
48+
49+
# Create the session manager
50+
# Note: We don't use stateless=True because we want to support sessions
51+
# but sessions are optional as per the MCP spec
52+
self._session_manager = StreamableHTTPSessionManager(
53+
app=self.mcp_server,
54+
event_store=self.event_store,
55+
json_response=self.json_response,
56+
stateless=False, # Always support sessions, but they're optional
57+
security_settings=self.security_settings,
58+
)
5259

53-
async def start_server():
54-
self._server_running = True
55-
async with self.connect() as (reader, writer):
56-
await server.run(
57-
reader,
58-
writer,
59-
server.create_initialization_options(notification_options=None, experimental_capabilities={}),
60-
raise_exceptions=False,
61-
)
60+
# Start the session manager in a background task
61+
async def run_session_manager():
62+
try:
63+
async with self._session_manager.run():
64+
logger.info("StreamableHTTP session manager is running")
65+
# Keep running until cancelled
66+
await asyncio.Event().wait()
67+
except asyncio.CancelledError:
68+
logger.info("StreamableHTTP session manager is shutting down")
69+
raise
70+
except Exception:
71+
logger.exception("Error in StreamableHTTP session manager")
72+
raise
73+
74+
self._manager_task = asyncio.create_task(run_session_manager())
75+
self._manager_started = True
76+
77+
# Give the session manager a moment to initialize
78+
await asyncio.sleep(0.1)
79+
80+
async def handle_fastapi_request(self, request: Request) -> Response:
81+
"""
82+
Handle a FastAPI request by delegating to the session manager.
6283
63-
# Start the server in a background task
64-
import asyncio
84+
This converts FastAPI's Request/Response to ASGI scope/receive/send
85+
and then converts the result back to a FastAPI Response.
86+
"""
87+
# Ensure session manager is started
88+
await self._ensure_session_manager_started()
6589

66-
asyncio.create_task(start_server())
90+
if not self._session_manager:
91+
raise HTTPException(status_code=500, detail="Session manager not initialized")
6792

68-
# Give the server a moment to initialize
69-
await anyio.sleep(0.1)
93+
logger.debug(f"Handling FastAPI request: {request.method} {request.url.path}")
7094

71-
# Capture the response from the SDK's handle_request method
95+
# Capture the response from the session manager
7296
response_started = False
7397
response_status = 200
7498
response_headers = []
@@ -85,8 +109,8 @@ async def send_callback(message):
85109
response_body += message.get("body", b"")
86110

87111
try:
88-
# Delegate to the SDK's handle_request method with ASGI interface
89-
await self.handle_request(request.scope, request.receive, send_callback)
112+
# Delegate to the session manager's handle_request method
113+
await self._session_manager.handle_request(request.scope, request.receive, send_callback)
90114

91115
# Convert the captured ASGI response to a FastAPI Response
92116
headers_dict = {name.decode(): value.decode() for name, value in response_headers}
@@ -98,5 +122,15 @@ async def send_callback(message):
98122
)
99123

100124
except Exception:
101-
logger.exception("Error in StreamableHTTPServerTransport")
125+
logger.exception("Error in StreamableHTTPSessionManager")
102126
raise HTTPException(status_code=500, detail="Internal server error")
127+
128+
async def shutdown(self) -> None:
129+
"""Clean up the session manager and background task."""
130+
if self._manager_task and not self._manager_task.done():
131+
self._manager_task.cancel()
132+
try:
133+
await self._manager_task
134+
except asyncio.CancelledError:
135+
pass
136+
self._manager_started = False

0 commit comments

Comments
 (0)