Skip to content

Commit 659294c

Browse files
nits
1 parent 5dc3830 commit 659294c

File tree

2 files changed

+74
-20
lines changed

2 files changed

+74
-20
lines changed

backend/app/api/routes/mcp.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@
33
from fastapi import APIRouter, HTTPException, Depends
44
from sqlmodel import Session, select
55
from app.api.deps import get_current_active_user, get_current_active_superuser, get_db
6-
from app.models import User, MCPServer, MCPServerCreate, MCPServerUpdate, MCPServerPublic
6+
from app.models import User
7+
from app.models.mcp_server import (
8+
MCPServer,
9+
MCPServerStatus,
10+
MCPServerBase,
11+
MCPServerCreate,
12+
MCPServerUpdate,
13+
MCPServerPublic
14+
)
715
from app.services.mcp_manager import mcp_manager
816
import logging
917

@@ -19,7 +27,7 @@ async def list_mcp_servers(
1927
current_user: User = Depends(get_current_active_user),
2028
) -> List[MCPServerPublic]:
2129
"""List all MCP servers."""
22-
servers = session.exec(select(MCPServer)).all()
30+
servers = session.execute(select(MCPServer)).scalars().all()
2331

2432
# Update runtime data from manager
2533
result = []
@@ -41,12 +49,12 @@ async def list_mcp_servers(
4149
async def create_mcp_server(
4250
*,
4351
session: Session = Depends(get_db),
44-
current_user: User = Depends(get_current_active_superuser),
52+
current_user: User = Depends(get_current_active_user),
4553
server_in: MCPServerCreate,
4654
) -> MCPServerPublic:
4755
"""Create a new MCP server configuration."""
4856
# Check if server with same name exists
49-
existing = session.exec(select(MCPServer).where(MCPServer.name == server_in.name)).first()
57+
existing = session.execute(select(MCPServer).where(MCPServer.name == server_in.name)).scalar_one_or_none()
5058
if existing:
5159
raise HTTPException(status_code=400, detail="Server with this name already exists")
5260

@@ -94,7 +102,7 @@ async def get_mcp_server(
94102
async def update_mcp_server(
95103
*,
96104
session: Session = Depends(get_db),
97-
current_user: User = Depends(get_current_active_superuser),
105+
current_user: User = Depends(get_current_active_user),
98106
server_id: str,
99107
server_in: MCPServerUpdate,
100108
) -> MCPServerPublic:
@@ -130,7 +138,7 @@ async def update_mcp_server(
130138
async def delete_mcp_server(
131139
*,
132140
session: Session = Depends(get_db),
133-
current_user: User = Depends(get_current_active_superuser),
141+
current_user: User = Depends(get_current_active_user),
134142
server_id: str,
135143
) -> dict:
136144
"""Delete MCP server."""

backend/app/services/mcp_manager.py

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ async def connect(self) -> bool:
6161
else:
6262
raise ValueError(f"Unsupported transport: {self.server.transport}")
6363
except Exception as e:
64+
import traceback
6465
logger.error(f"Failed to connect to MCP server {self.server.name}: {e}")
66+
logger.error(f"Traceback: {traceback.format_exc()}")
6567
self.status = MCPServerStatus.ERROR
6668
self.error_message = str(e)
6769
return False
@@ -80,13 +82,15 @@ async def _connect_stdio(self) -> bool:
8082

8183
# Start the process
8284
try:
85+
logger.info(f"Starting MCP server process: {' '.join(args)}")
8386
self.process = await asyncio.create_subprocess_exec(
8487
*args,
8588
stdin=asyncio.subprocess.PIPE,
8689
stdout=asyncio.subprocess.PIPE,
8790
stderr=asyncio.subprocess.PIPE,
8891
env={**os.environ} # Pass current environment variables
8992
)
93+
logger.info(f"MCP server process started with PID: {self.process.pid}")
9094
except FileNotFoundError:
9195
raise RuntimeError(f"Command not found: {command}")
9296
except Exception as e:
@@ -101,18 +105,15 @@ async def _connect_stdio(self) -> bool:
101105
stderr = await self.process.stderr.read()
102106
raise RuntimeError(f"Process exited immediately with code {self.process.returncode}: {stderr.decode()}")
103107

108+
logger.info(f"MCP server process is running, starting message reader task")
109+
104110
# Start reading messages
105111
self._read_task = asyncio.create_task(self._read_messages())
106112

107113
# Send initialize request
108114
response = await self._send_request("initialize", {
109-
"protocolVersion": "0.1.0",
110-
"capabilities": {
111-
"roots": True,
112-
"tools": True,
113-
"prompts": True,
114-
"resources": True
115-
},
115+
"protocolVersion": "2025-03-26",
116+
"capabilities": {},
116117
"clientInfo": {
117118
"name": "copilot-mcp-host",
118119
"version": "0.1.0"
@@ -121,6 +122,10 @@ async def _connect_stdio(self) -> bool:
121122

122123
if response and "capabilities" in response:
123124
self.server.capabilities = response["capabilities"]
125+
126+
# Send initialized notification to complete handshake
127+
await self._send_notification("notifications/initialized", {})
128+
124129
self.status = MCPServerStatus.CONNECTED
125130

126131
# Fetch available tools, resources, and prompts
@@ -165,13 +170,8 @@ async def _connect_http_sse(self) -> bool:
165170

166171
# Send initialize request
167172
response = await self._send_http_request("initialize", {
168-
"protocolVersion": "0.1.0",
169-
"capabilities": {
170-
"roots": True,
171-
"tools": True,
172-
"prompts": True,
173-
"resources": True
174-
},
173+
"protocolVersion": "2025-03-26",
174+
"capabilities": {},
175175
"clientInfo": {
176176
"name": "copilot-mcp-host",
177177
"version": "0.1.0"
@@ -180,6 +180,10 @@ async def _connect_http_sse(self) -> bool:
180180

181181
if response and "capabilities" in response:
182182
self.server.capabilities = response["capabilities"]
183+
184+
# Send initialized notification to complete handshake
185+
await self._send_notification("notifications/initialized", {})
186+
183187
self.status = MCPServerStatus.CONNECTED
184188

185189
# Fetch available tools, resources, and prompts
@@ -215,6 +219,13 @@ async def _send_request(self, method: str, params: Dict[str, Any]) -> Optional[D
215219
elif self.server.transport == MCPTransportType.HTTP_SSE:
216220
return await self._send_http_request(method, params)
217221

222+
async def _send_notification(self, method: str, params: Dict[str, Any]) -> None:
223+
"""Send a JSON-RPC notification to the server."""
224+
if self.server.transport == MCPTransportType.STDIO:
225+
await self._send_stdio_notification(method, params)
226+
elif self.server.transport == MCPTransportType.HTTP_SSE:
227+
await self._send_http_notification(method, params)
228+
218229
async def _send_stdio_request(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
219230
"""Send a request via stdio."""
220231
if not self.writer:
@@ -234,17 +245,43 @@ async def _send_stdio_request(self, method: str, params: Dict[str, Any]) -> Opti
234245

235246
# Send the message
236247
message_str = json.dumps(message) + "\n"
248+
logger.info(f"Sending message to MCP server: {message_str.strip()}")
237249
self.writer.write(message_str.encode())
238250
await self.writer.drain()
251+
logger.info(f"Message sent and drained, waiting for response with ID {self._message_id}")
239252

240253
# Wait for response
241254
try:
242255
response = await asyncio.wait_for(future, timeout=30.0)
243256
return response
244257
except asyncio.TimeoutError:
258+
# Check if process is still alive
259+
if self.process and self.process.returncode is not None:
260+
stderr = await self.process.stderr.read()
261+
logger.error(f"MCP server process died during request: exit code {self.process.returncode}, stderr: {stderr.decode()}")
262+
else:
263+
logger.error(f"Timeout waiting for response from MCP server, process still alive")
245264
self._pending_requests.pop(self._message_id, None)
246265
raise
247266

267+
async def _send_stdio_notification(self, method: str, params: Dict[str, Any]) -> None:
268+
"""Send a notification via stdio (no response expected)."""
269+
if not self.writer:
270+
raise RuntimeError("Not connected")
271+
272+
message = {
273+
"jsonrpc": "2.0",
274+
"method": method,
275+
"params": params
276+
}
277+
278+
# Send the notification
279+
message_str = json.dumps(message) + "\n"
280+
logger.info(f"Sending notification to MCP server: {message_str.strip()}")
281+
self.writer.write(message_str.encode())
282+
await self.writer.drain()
283+
logger.info(f"Notification sent and drained")
284+
248285
async def _send_http_request(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
249286
"""Send a request via HTTP."""
250287
if not self.session:
@@ -275,22 +312,31 @@ async def _send_http_request(self, method: str, params: Dict[str, Any]) -> Optio
275312
raise RuntimeError("Request timed out")
276313
except aiohttp.ClientError as e:
277314
raise RuntimeError(f"Connection error: {str(e)}")
315+
316+
async def _send_http_notification(self, method: str, params: Dict[str, Any]) -> None:
317+
"""Send a notification via HTTP (not implemented yet)."""
318+
# TODO: Implement HTTP notification if needed
319+
pass
278320

279321
async def _read_messages(self):
280322
"""Read messages from stdio."""
323+
logger.info(f"Starting message reader for MCP server {self.server.name}")
281324
buffer = ""
282325
while self.reader and not self.reader.at_eof():
283326
try:
284327
data = await self.reader.read(1024)
285328
if not data:
329+
logger.info(f"No more data from MCP server {self.server.name}, ending reader")
286330
break
287331

332+
logger.info(f"Read {len(data)} bytes from MCP server: {data[:100]}...")
288333
buffer += data.decode()
289334

290335
# Process complete messages
291336
while "\n" in buffer:
292337
line, buffer = buffer.split("\n", 1)
293338
if line.strip():
339+
logger.debug(f"Received message from MCP server: {line.strip()}")
294340
try:
295341
message = json.loads(line)
296342
await self._handle_message(message)

0 commit comments

Comments
 (0)