Skip to content

Commit 70ff8b8

Browse files
calmininiechenJeremy DaiJoJoJoJoJoJoJo
authored
fulfill the router with profile (#33)
* router wip * Refactor clients * return warning for owner or repo name mismatch (#9) * return warning * fix * Router working for uvx servers npx server still has issues with env * fix: issues when starting server * Support SSE client and add resource templates support to router * Add file watcher and update router to support dynamic server config updates * Refactor client connection management and add server lifespan handling * Refactor router to support profile-based server management and add profile-aware SSE transport * Refactor router to use ProfileConfigManager and ServerConfig instead of ConnectionDetails * Update router README with new server config schema and namespace conventions --------- Co-authored-by: Chen Nie <[email protected]> Co-authored-by: Jeremy Dai <[email protected]> Co-authored-by: Jonathan Wang <[email protected]> Co-authored-by: calmini <[email protected]>
1 parent 82f3e48 commit 70ff8b8

File tree

10 files changed

+1816
-423
lines changed

10 files changed

+1816
-423
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ dependencies = [
2929
"boto3>=1.37.25",
3030
"loguru>=0.7.3",
3131
"ruamel-yaml>=0.18.10",
32+
"aiohttp>=3.11.16",
33+
"watchfiles>=1.0.4",
3234
]
3335

3436
[project.urls]

src/mcpm/router/README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# MCPM Router
2+
3+
The MCPM Router is a module that allows you to aggregate multiple MCP servers (both SSE and STDIO) and expose them as a single SSE server.
4+
5+
## Features
6+
7+
- Aggregate multiple MCP servers as a single server
8+
- Support both SSE and STDIO connections to underlying servers
9+
- Namespace capabilities from different servers
10+
- Expose a unified SSE server interface
11+
12+
## Usage
13+
14+
### Basic Usage
15+
16+
```python
17+
import asyncio
18+
from mcpm.router import MCPRouter
19+
from mcpm.schema.server_config import STDIOServerConfig, SSEServerConfig
20+
21+
async def main():
22+
# Create a router
23+
router = MCPRouter()
24+
25+
# Add a STDIO server
26+
await router.add_server(
27+
"example1",
28+
STDIOServerConfig(
29+
command="python",
30+
args=["-m", "mcp.examples.simple_server"]
31+
)
32+
)
33+
34+
# Add an SSE server
35+
await router.add_server(
36+
"example2",
37+
SSEServerConfig(
38+
url="http://localhost:3000/sse"
39+
)
40+
)
41+
42+
# Start the SSE server
43+
await router.start_sse_server(host="localhost", port=8080)
44+
45+
if __name__ == "__main__":
46+
asyncio.run(main())
47+
```
48+
### Configuration File
49+
by default we use the configure from file `~/.config/mcpm/profile.json` to manage the servers.
50+
51+
## Implementation Details
52+
53+
The router works by:
54+
55+
1. Connecting to each downstream server (SSE or STDIO)
56+
2. Collecting capabilities, tools, prompts, and resources from each server
57+
3. Namespacing these capabilities to avoid conflicts
58+
4. Creating an aggregated server that routes requests to the appropriate downstream server
59+
5. Exposing this aggregated server as an SSE server
60+
61+
## Namespacing
62+
63+
The router uses the following namespacing conventions:
64+
65+
- Tools: `{server_name}_t_{tool_name}`
66+
- Prompts: `{server_name}_p_{prompt_name}`
67+
- Resources: `{server_name}:{resource_uri}`
68+
69+
This allows the router to route requests to the appropriate server based on the namespaced identifier.

src/mcpm/router/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""MCP Router Package"""
2+
3+
from .router import MCPRouter
4+
5+
__all__ = [
6+
"MCPRouter",
7+
]
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
import logging
3+
from typing import Optional, cast
4+
5+
from mcp import ClientSession, InitializeResult, StdioServerParameters, stdio_client
6+
from mcp.client.sse import sse_client
7+
8+
from mcpm.schemas.server_config import ServerConfig, SSEServerConfig, STDIOServerConfig
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
def _stdio_transport_context(server_config: ServerConfig):
14+
server_config = cast(STDIOServerConfig, server_config)
15+
server_params = StdioServerParameters(command=server_config.command, args=server_config.args, env=server_config.env)
16+
return stdio_client(server_params)
17+
18+
19+
def _sse_transport_context(server_config: ServerConfig):
20+
server_config = cast(SSEServerConfig, server_config)
21+
return sse_client(server_config.url, headers=server_config.headers)
22+
23+
24+
class ServerConnection:
25+
def __init__(self, server_config: ServerConfig) -> None:
26+
self.session: Optional[ClientSession] = None
27+
self.session_initialized_response: Optional[InitializeResult] = None
28+
self._initialized = False
29+
self.server_config = server_config
30+
self._initialized_event = asyncio.Event()
31+
self._shutdown_event = asyncio.Event()
32+
33+
self._transport_context_factory = (
34+
_stdio_transport_context if isinstance(server_config, STDIOServerConfig) else _sse_transport_context
35+
)
36+
37+
self._server_task = asyncio.create_task(self._server_lifespan_cycle())
38+
39+
def healthy(self) -> bool:
40+
return self.session is not None and self._initialized
41+
42+
# block until client session is initialized
43+
async def wait_for_initialization(self):
44+
await self._initialized_event.wait()
45+
46+
# request for client session to gracefully close
47+
async def request_for_shutdown(self):
48+
self._shutdown_event.set()
49+
50+
# block until client session is shutdown
51+
async def wait_for_shutdown_request(self):
52+
await self._shutdown_event.wait()
53+
54+
async def _server_lifespan_cycle(self):
55+
try:
56+
async with self._transport_context_factory(self.server_config) as (read, write):
57+
async with ClientSession(read, write) as session:
58+
self.session_initialized_response = await session.initialize()
59+
60+
self.session = session
61+
self._initialized = True
62+
self._initialized_event.set()
63+
# block here so that the session will not be closed after exit scope
64+
# we could retrieve alive session through self.session
65+
await self.wait_for_shutdown_request()
66+
except Exception as e:
67+
logger.error(f"Failed to connect to server {self.server_config.name}: {e}")
68+
self._initialized_event.set()
69+
self._shutdown_event.set()

src/mcpm/router/design.md

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# MCPM Router Design
2+
3+
## Overview
4+
5+
The MCPM Router serves as an intermediary layer within the MCPM (Model Context Protocol Manager) project. The router acts in a dual role:
6+
7+
1. **As an MCP Client**: Connects to multiple downstream MCP servers
8+
2. **As an MCP Server**: Provides a unified interface to upstream MCP clients
9+
10+
This design allows for aggregation of capabilities from multiple MCP servers while providing a single, stable connection point for clients.
11+
12+
## Architecture
13+
14+
```mermaid
15+
flowchart TB
16+
subgraph clients[Upstream Clients]
17+
c1[MCP Client 1]
18+
c2[MCP Client 2]
19+
cn[MCP Client N]
20+
end
21+
22+
subgraph router[MCPM Router]
23+
r[MCPRouter]
24+
ch[ClientHandler]
25+
sh[ServerHandler]
26+
cm[ConnectionManager]
27+
r --> ch
28+
r --> sh
29+
ch --> cm
30+
sh --> cm
31+
end
32+
33+
subgraph servers[Downstream Servers]
34+
s1[MCP Server 1]
35+
s2[MCP Server 2]
36+
sm[MCP Server M]
37+
end
38+
39+
c1 <--SSE--> ch
40+
c2 <--SSE--> ch
41+
cn <--SSE--> ch
42+
43+
sh <--STDIO/SSE--> s1
44+
sh <--STDIO/SSE--> s2
45+
sh <--STDIO/SSE--> sm
46+
47+
classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px;
48+
classDef routerClass fill:#e1f5fe,stroke:#0277bd,stroke-width:2px;
49+
class router routerClass;
50+
```
51+
52+
## Key Components
53+
54+
### `MCPRouter`
55+
56+
The main orchestrator class that provides a unified API for the application:
57+
- Initializes and coordinates all internal components
58+
- Provides methods for connecting to downstream servers
59+
- Handles server and client routing through appropriate handlers
60+
- Manages namespacing of capabilities across different servers
61+
- Offers a clean, high-level interface for application code
62+
63+
### `ConnectionManager`
64+
65+
Maintains a registry of connections to downstream servers, providing methods to:
66+
- Add, retrieve, and remove downstream server connections
67+
- Lookup downstream servers by ID
68+
69+
### `ServerHandler`
70+
71+
Manages connections to downstream MCP servers:
72+
- Establishes and maintains connections using stdio or SSE transports
73+
- Aggregates capabilities from all connected servers
74+
- Routes requests from upstream clients to the appropriate downstream server
75+
- Handles notifications from downstream servers
76+
77+
### `ClientHandler`
78+
79+
Serves upstream MCP clients:
80+
- Provides an SSE server endpoint for clients to connect
81+
- Handles client connections/disconnections transparently
82+
- Routes client requests to the `ServerHandler`
83+
- Delivers responses and notifications back to clients
84+
85+
### `ConnectionDetails` and `ConnectionType`
86+
87+
Defines the configuration for connecting to downstream servers:
88+
- Supports multiple transport protocols (STDIO, SSE)
89+
- Validates configuration based on the connection type
90+
- Stores necessary connection parameters (command, args, env, URL)
91+
92+
## Communication Flow
93+
94+
### Downstream Connections (Router as Client)
95+
96+
1. Router creates persistent connections to downstream MCP servers using STDIO or SSE
97+
2. Connections are maintained regardless of upstream client presence
98+
3. Server capabilities are fetched and aggregated with namespacing
99+
4. Notifications from servers are routed to appropriate upstream clients
100+
101+
### Upstream Connections (Router as Server)
102+
103+
1. Router provides an SSE server interface for upstream clients
104+
2. Clients can connect/disconnect at will without affecting downstream connections
105+
3. Client requests are routed to appropriate downstream servers
106+
4. Responses and notifications are delivered back to clients
107+
108+
## Request Routing
109+
110+
```mermaid
111+
sequenceDiagram
112+
participant Client as MCP Client
113+
participant CH as ClientHandler
114+
participant Router as MCPRouter
115+
participant SH as ServerHandler
116+
participant Server as MCP Server
117+
118+
Client->>CH: Request
119+
CH->>Router: Forward request
120+
Router->>Router: Parse namespaced ID
121+
Router->>SH: Route to appropriate server
122+
SH->>Server: Forward request with denormalized ID
123+
Server->>SH: Response
124+
SH->>Router: Forward response
125+
Router->>CH: Route to originating client
126+
CH->>Client: Deliver response
127+
```
128+
129+
## Capability Aggregation
130+
131+
1. Upon connection to a downstream server, all capabilities are fetched
132+
2. Capabilities (tools, resources, prompts) are namespaced with server ID
133+
3. Namespaced capabilities are added to aggregate pool
134+
4. Clients see all capabilities from all servers as a unified collection
135+
5. When a server disconnects, its capabilities are removed from the pool
136+
137+
## Error Handling
138+
139+
1. Connection errors are isolated to affected servers
140+
2. Standard JSON-RPC error responses for client requests
141+
3. Proper error propagation from downstream servers to clients
142+
4. Graceful handling of client and server disconnections
143+
144+
## Benefits of This Design
145+
146+
1. **Decoupling**: Upstream clients are decoupled from downstream servers
147+
2. **Resilience**: Client disconnections don't affect server connections
148+
3. **Aggregation**: Multiple capabilities from different servers appear as one
149+
4. **Flexibility**: Supports different transport protocols (STDIO, SSE)
150+
5. **Scalability**: Can manage multiple clients and servers simultaneously
151+
6. **Clean API**: The `MCPRouter` provides a simple, unified interface for applications
152+
153+
## Implementation Notes
154+
155+
- All communication follows the MCP protocol specification
156+
- Asynchronous operation using Python's asyncio
157+
- Type-safe interfaces using Python type hints
158+
- Clean separation of concerns between components

src/mcpm/router/example.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""
2+
Example script demonstrating how to use the MCPRouter to aggregate multiple MCP servers.
3+
"""
4+
5+
import argparse
6+
import asyncio
7+
import logging
8+
from typing import List
9+
10+
from .router import MCPRouter
11+
12+
# Configure logging
13+
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
14+
logger = logging.getLogger(__name__)
15+
16+
17+
async def main(host: str, port: int, allow_origins: List[str] = None):
18+
"""
19+
Main function to run the router example.
20+
21+
Args:
22+
host: Host to bind the SSE server to
23+
port: Port to bind the SSE server to
24+
allow_origins: List of allowed origins for CORS
25+
"""
26+
router = MCPRouter()
27+
28+
logger.info(f"Starting MCPRouter - will expose SSE server on http://{host}:{port}")
29+
30+
# Start the SSE server
31+
try:
32+
logger.info(f"Starting SSE server on http://{host}:{port}")
33+
if allow_origins:
34+
logger.info(f"CORS enabled for origins: {allow_origins}")
35+
await router.start_sse_server(host, port, allow_origins)
36+
except KeyboardInterrupt:
37+
logger.info("Shutting down...")
38+
except Exception as e:
39+
logger.error(f"Error starting SSE server: {e}")
40+
41+
42+
if __name__ == "__main__":
43+
parser = argparse.ArgumentParser(description="MCP Router Example")
44+
parser.add_argument("--host", type=str, default="localhost", help="Host to bind the SSE server to")
45+
parser.add_argument("--port", type=int, default=8080, help="Port to bind the SSE server to")
46+
parser.add_argument("--cors", type=str, help="Comma-separated list of allowed origins for CORS")
47+
48+
args = parser.parse_args()
49+
50+
# Parse CORS origins
51+
allow_origins = None
52+
if args.cors:
53+
allow_origins = [origin.strip() for origin in args.cors.split(",")]
54+
55+
asyncio.run(main(args.host, args.port, allow_origins))

0 commit comments

Comments
 (0)