|
| 1 | +# streamablehttptransport.py |
| 2 | + |
| 3 | +import logging |
| 4 | +import contextlib |
| 5 | +from collections.abc import AsyncIterator |
| 6 | +import anyio |
| 7 | +import mcp.types as types |
| 8 | +from mcp.server.lowlevel import Server |
| 9 | +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager |
| 10 | +from starlette.types import Receive, Scope, Send |
| 11 | +from typing import Any, Dict, List, Optional, Union |
| 12 | + |
| 13 | +from mcpgateway.services.tool_service import ToolService |
| 14 | +from mcpgateway.db import SessionLocal |
| 15 | + |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | +# This is the MCP configuration from your script's CLI options. |
| 20 | +# You can get these from environment variables, a config file, or hardcode them. |
| 21 | +JSON_RESPONSE_ENABLED = False |
| 22 | + |
| 23 | + |
| 24 | +tool_service = ToolService() |
| 25 | + |
| 26 | + |
| 27 | +# Initialize MCP app |
| 28 | +mcp_app = Server("mcp-streamable-http-stateless-demo") |
| 29 | + |
| 30 | +@mcp_app.call_tool() |
| 31 | +async def call_tool(name: str, arguments: dict) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: |
| 32 | + with SessionLocal() as db: |
| 33 | + result = await tool_service.invoke_tool(db, name, arguments) |
| 34 | + |
| 35 | + tool_result = [ |
| 36 | + types.TextContent( |
| 37 | + type=result.content[0].type, |
| 38 | + text=( |
| 39 | + result.content[0].text |
| 40 | + ), |
| 41 | + ) |
| 42 | + ] |
| 43 | + return tool_result |
| 44 | + |
| 45 | +@mcp_app.list_tools() |
| 46 | +async def list_tools() -> list[types.Tool]: |
| 47 | + |
| 48 | + with SessionLocal() as db: |
| 49 | + tools = await tool_service.list_tools(db) |
| 50 | + listed_tools = [] |
| 51 | + for tool in tools: |
| 52 | + listed_tools.append(types.Tool( |
| 53 | + name=tool.name, |
| 54 | + description=tool.description, |
| 55 | + inputSchema=tool.input_schema |
| 56 | + )) |
| 57 | + |
| 58 | + return listed_tools |
| 59 | + |
| 60 | + |
| 61 | +# --- 2. Create and Configure MCP Session Manager --- |
| 62 | +session_manager = StreamableHTTPSessionManager( |
| 63 | + app=mcp_app, |
| 64 | + event_store=None, |
| 65 | + json_response=JSON_RESPONSE_ENABLED, |
| 66 | + stateless=True, |
| 67 | +) |
| 68 | + |
| 69 | +# This is the handler that will process requests for the mounted path. |
| 70 | +async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None: |
| 71 | + await session_manager.handle_request(scope, receive, send) |
| 72 | + |
| 73 | +# --- 3. Lifespan Management to Start/Stop the Session Manager --- |
| 74 | +# @contextlib.asynccontextmanager |
| 75 | +# async def lifespan(app: FastAPI) -> AsyncIterator[None]: |
| 76 | +# """Manages the startup and shutdown of the MCP session manager.""" |
| 77 | +# async with session_manager.run(): |
| 78 | +# logger.info("Application starting with MCP StreamableHTTPSessionManager!") |
| 79 | +# yield |
| 80 | +# logger.info("Application shutting down...") |
| 81 | + |
| 82 | +async def start_streamablehttp(): |
| 83 | + # await session_manager.run() |
| 84 | + # session = await session_manager.run().__aenter__() |
| 85 | + # logger.info("Application starting with MCP StreamableHTTPSessionManager!") |
| 86 | + async with session_manager.run(): |
| 87 | + logger.info("Application starting with MCP StreamableHTTPSessionManager!") |
| 88 | + # yield |
| 89 | + |
| 90 | + |
| 91 | +async def stop_streamablehttp(): |
| 92 | + # await session_manager.stop() |
| 93 | + # await session_manager.run().__aexit__(None, None, None) |
| 94 | + logger.info("StreamableHTTPSessionManager shutting down...") |
0 commit comments