Skip to content

Commit 7f1f541

Browse files
committed
feat: improve code
1 parent c2fb91e commit 7f1f541

File tree

4 files changed

+195
-118
lines changed

4 files changed

+195
-118
lines changed

README.md

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ LONG_TERM_MEMORY_EMBEDDER_MODEL=text-embedding-3-small
243243
# MCP Configuration
244244
MCP_ENABLED=true
245245
MCP_SERVER_PORT=7001
246-
MCP_HOSTNAMES_CSV=localhost:7001
246+
MCP_HOSTNAMES_CSV=http://localhost:7001
247247

248248
# Observability
249249
LANGFUSE_PUBLIC_KEY=your_public_key
@@ -321,24 +321,7 @@ The application includes comprehensive support for the Model Context Protocol (M
321321

322322
### MCP Server
323323

324-
The template includes a sample MCP server (`app/mcp/server.py`) with example tools:
325-
326-
```python
327-
from mcp.server.fastmcp import FastMCP
328-
329-
mcpServer = FastMCP("MCP Server", port=7001)
330-
331-
@mcpServer.tool()
332-
def add(a: int, b: int) -> int:
333-
"""Add two numbers"""
334-
return a + b
335-
336-
@mcpServer.resource("greeting://{name}")
337-
def get_greeting(name: str) -> str:
338-
"""Get a personalized greeting"""
339-
return f"Hello, {name}!"
340-
```
341-
324+
The template includes a sample MCP server (`app/mcp/server.py`) with example tools
342325
Start the MCP server:
343326

344327
```bash

app/mcp/dependencies.py

Lines changed: 150 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import uuid
12
from contextlib import AsyncExitStack
23
from contextlib import asynccontextmanager
34
from typing import AsyncGenerator, Optional
@@ -9,114 +10,174 @@
910
from pydantic import BaseModel
1011

1112
from app.core.config import settings
12-
from app.core.logging import logger
13+
from app.core.logging import bind_context, logger
14+
15+
16+
def generate_correlation_id() -> str:
17+
"""Generate a unique correlation ID for tracking MCP operations.
18+
19+
Returns:
20+
str: A unique correlation ID in UUID format.
21+
"""
22+
return str(uuid.uuid4())
1323

1424

1525
class Resource(BaseModel):
16-
tools: list[StructuredTool]
17-
sessions: list[ClientSession]
26+
tools: list[StructuredTool]
27+
sessions: list[ClientSession]
1828

19-
class Config:
20-
arbitrary_types_allowed = True
29+
class Config:
30+
arbitrary_types_allowed = True
2131

2232

2333
@asynccontextmanager
2434
async def mcp_sse_client(
25-
mcp_host: str = "localhost:7001",
35+
mcp_host: str = "http://localhost:7001",
36+
timeout: int = 2,
37+
correlation_id: Optional[str] = None,
2638
) -> AsyncGenerator[ClientSession]:
27-
"""
28-
Creates and initializes an MCP client session over SSE.
29-
30-
Establishes an SSE connection to the MCP server and yields an initialized
31-
`ClientSession` for communication.
32-
33-
Yields:
34-
ClientSession: An initialized MCP client session.
35-
"""
36-
async with sse_client(
37-
f"http://{mcp_host}/sse"
38-
) as (
39-
read_stream,
40-
write_stream,
41-
):
42-
async with ClientSession(read_stream, write_stream) as session:
43-
await session.initialize()
44-
yield session
39+
"""
40+
Creates and initializes an MCP client session over SSE.
4541
42+
Establishes an SSE connection to the MCP server and yields an initialized
43+
`ClientSession` for communication.
4644
47-
class MCPSessionManager:
48-
"""Manages MCP client sessions for the application lifetime."""
49-
50-
def __init__(self):
51-
self._exit_stack: Optional[AsyncExitStack] = None
52-
self._resource: Optional[Resource] = None
53-
self._initialized: bool = False
45+
Args:
46+
mcp_host: The MCP server host URL.
47+
timeout: Connection timeout in seconds.
48+
correlation_id: Optional correlation ID for tracking this operation.
5449
55-
async def initialize(self) -> Resource:
56-
"""Initialize MCP sessions and load tools."""
57-
if self._initialized:
58-
return self._resource
59-
60-
self._exit_stack = AsyncExitStack()
61-
await self._exit_stack.__aenter__()
50+
Yields:
51+
ClientSession: An initialized MCP client session.
52+
"""
53+
try:
54+
logger.info("mcp_connection_initiated", correlation_id=correlation_id, host=mcp_host)
55+
async with sse_client(
56+
f"{mcp_host}/sse",
57+
timeout=timeout # in seconds
58+
) as (
59+
read_stream,
60+
write_stream,
61+
):
62+
async with ClientSession(read_stream, write_stream) as session:
63+
await session.initialize()
64+
logger.info("mcp_session_initialized", correlation_id=correlation_id, host=mcp_host)
65+
yield session
66+
finally:
67+
logger.info("mcp_session_closed", correlation_id=correlation_id, host=mcp_host)
6268

63-
tools = []
64-
sessions = []
6569

66-
for hostname in settings.MCP_HOSTNAMES:
67-
try:
68-
session = await self._exit_stack.enter_async_context(
69-
mcp_sse_client(hostname)
70+
class MCPSessionManager:
71+
"""Manages MCP client sessions for the application lifetime."""
72+
73+
def __init__(self):
74+
self._exit_stack: Optional[AsyncExitStack] = None
75+
self._resource: Optional[Resource] = None
76+
self._initialized: bool = False
77+
78+
async def initialize(self) -> Resource:
79+
"""Initialize MCP sessions and load tools."""
80+
if self._initialized:
81+
return self._resource
82+
83+
init_correlation_id = generate_correlation_id()
84+
logger.info("mcp_initialization_started", correlation_id=init_correlation_id)
85+
86+
self._exit_stack = AsyncExitStack()
87+
await self._exit_stack.__aenter__()
88+
89+
tools = []
90+
sessions = []
91+
92+
for hostname in settings.MCP_HOSTNAMES:
93+
# Generate unique correlation ID for each server connection
94+
server_correlation_id = generate_correlation_id()
95+
try:
96+
bind_context(
97+
mcp_server_correlation_id=server_correlation_id,
98+
mcp_init_correlation_id=init_correlation_id,
99+
)
100+
logger.info(
101+
"mcp_server_connection_attempt",
102+
correlation_id=server_correlation_id,
103+
init_correlation_id=init_correlation_id,
104+
hostname=hostname,
105+
)
106+
session = await self._exit_stack.enter_async_context(
107+
mcp_sse_client(hostname, correlation_id=server_correlation_id)
108+
)
109+
session_tools = await load_mcp_tools(session)
110+
tools.extend(session_tools)
111+
sessions.append(session)
112+
logger.info(
113+
"connected_to_mcp_server",
114+
correlation_id=server_correlation_id,
115+
init_correlation_id=init_correlation_id,
116+
hostname=hostname,
117+
tool_count=len(session_tools),
118+
)
119+
except Exception as e:
120+
logger.error(
121+
"failed_to_connect_to_mcp_server",
122+
correlation_id=server_correlation_id,
123+
init_correlation_id=init_correlation_id,
124+
hostname=hostname,
125+
error=str(e),
126+
)
127+
128+
self._resource = Resource(tools=tools, sessions=sessions)
129+
self._initialized = True
130+
logger.info(
131+
"mcp_initialization_completed",
132+
correlation_id=init_correlation_id,
133+
total_tools=len(tools),
134+
total_sessions=len(sessions),
70135
)
71-
session_tools = await load_mcp_tools(session)
72-
tools.extend(session_tools)
73-
sessions.append(session)
74-
logger.info("connected_to_mcp_server", hostname=hostname, tool_count=len(session_tools))
75-
except Exception as e:
76-
logger.error("failed_to_connect_to_mcp_server", hostname=hostname, error=str(e))
77-
78-
self._resource = Resource(tools=tools, sessions=sessions)
79-
self._initialized = True
80-
return self._resource
81-
82-
async def reconnect(self) -> bool:
83-
"""Attempt to reconnect to MCP servers."""
84-
logger.info("reconnecting_to_mcp_servers")
85-
try:
86-
await self.cleanup()
87-
await self.initialize()
88-
return True
89-
except Exception as e:
90-
logger.error("mcp_reconnection_failed", error=str(e))
91-
return False
92-
93-
async def cleanup(self):
94-
"""Close all MCP sessions."""
95-
if self._exit_stack is not None:
96-
try:
97-
await self._exit_stack.__aexit__(None, None, None)
98-
logger.info("mcp_sessions_closed_successfully")
99-
except Exception as e:
100-
logger.error("mcp_sessions_cleanup_failed", error=str(e))
101-
finally:
102-
self._exit_stack = None
103-
self._resource = None
104-
self._initialized = False
105-
106-
def get_resource(self) -> Resource:
107-
"""Get the current MCP resource."""
108-
if not self._initialized or self._resource is None:
109-
raise RuntimeError("MCP session manager not initialized")
110-
return self._resource
136+
return self._resource
137+
138+
async def reconnect(self) -> bool:
139+
"""Attempt to reconnect to MCP servers."""
140+
reconnect_correlation_id = generate_correlation_id()
141+
logger.info("reconnecting_to_mcp_servers", correlation_id=reconnect_correlation_id)
142+
try:
143+
await self.cleanup()
144+
await self.initialize()
145+
logger.info("mcp_reconnection_successful", correlation_id=reconnect_correlation_id)
146+
return True
147+
except Exception as e:
148+
logger.error("mcp_reconnection_failed", correlation_id=reconnect_correlation_id, error=str(e))
149+
return False
150+
151+
async def cleanup(self):
152+
"""Close all MCP sessions."""
153+
if self._exit_stack is not None:
154+
cleanup_correlation_id = generate_correlation_id()
155+
logger.info("mcp_cleanup_started", correlation_id=cleanup_correlation_id)
156+
try:
157+
await self._exit_stack.__aexit__(None, None, None)
158+
logger.info("mcp_sessions_closed_successfully", correlation_id=cleanup_correlation_id)
159+
except Exception as e:
160+
logger.error("mcp_sessions_cleanup_failed", correlation_id=cleanup_correlation_id, error=str(e))
161+
finally:
162+
self._exit_stack = None
163+
self._resource = None
164+
self._initialized = False
165+
logger.info("mcp_cleanup_completed", correlation_id=cleanup_correlation_id)
166+
167+
def get_resource(self) -> Resource:
168+
"""Get the current MCP resource."""
169+
if not self._initialized or self._resource is None:
170+
raise RuntimeError("MCP session manager not initialized")
171+
return self._resource
111172

112173

113174
# Module-level singleton
114175
_mcp_session_manager: Optional[MCPSessionManager] = None
115176

116177

117178
def get_mcp_session_manager() -> MCPSessionManager:
118-
"""Get the global MCP session manager."""
119-
global _mcp_session_manager
120-
if _mcp_session_manager is None:
121-
_mcp_session_manager = MCPSessionManager()
122-
return _mcp_session_manager
179+
"""Get the global MCP session manager."""
180+
global _mcp_session_manager
181+
if _mcp_session_manager is None:
182+
_mcp_session_manager = MCPSessionManager()
183+
return _mcp_session_manager

0 commit comments

Comments
 (0)