Skip to content

Commit faf8262

Browse files
calmininiechen
andauthored
add event tracking and query API for monitoring MCP operations (#67)
Co-authored-by: cnie <[email protected]>
1 parent 617eabe commit faf8262

File tree

7 files changed

+446
-36
lines changed

7 files changed

+446
-36
lines changed

src/mcpm/monitor/base.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,37 @@
77
from enum import Enum, auto
88
from typing import Any, Dict, Optional, Union
99

10+
from pydantic import BaseModel, Field
11+
12+
13+
class Pagination(BaseModel):
14+
total: int = Field(description="Total number of events")
15+
page: int = Field(description="Page number")
16+
limit: int = Field(description="Number of events per page")
17+
total_pages: int = Field(description="Total number of pages")
18+
19+
20+
class MCPEvent(BaseModel):
21+
id: int = Field(description="Event ID")
22+
event_type: str = Field(description="Event type")
23+
server_id: str = Field(description="Server ID")
24+
resource_id: str = Field(description="Resource ID")
25+
client_id: Optional[str] = Field(description="Client ID")
26+
timestamp: str = Field(description="Event timestamp")
27+
duration_ms: Optional[int] = Field(description="Event duration in milliseconds")
28+
request_size: Optional[int] = Field(description="Request size in bytes")
29+
response_size: Optional[int] = Field(description="Response size in bytes")
30+
success: bool = Field(description="Event success status")
31+
error_message: Optional[str] = Field(description="Error message")
32+
metadata: Optional[Dict[str, Any]] = Field(description="Event metadata")
33+
raw_request: Optional[Union[str, Dict]] = Field(description="Raw request data")
34+
raw_response: Optional[Union[str, Dict]] = Field(description="Raw response data")
35+
36+
37+
class QueryEventResponse(BaseModel):
38+
pagination: Pagination = Field(description="Pagination information")
39+
events: list[MCPEvent] = Field(description="List of events")
40+
1041

1142
class AccessEventType(Enum):
1243
"""Type of MCP access event"""
@@ -75,3 +106,21 @@ async def close(self) -> None:
75106
Close any open connections to the storage backend
76107
"""
77108
pass
109+
110+
@abstractmethod
111+
async def query_events(
112+
self, offset: str, page: int, limit: int, event_type: Optional[str] = None
113+
) -> QueryEventResponse:
114+
"""
115+
Query events from the storage backend
116+
117+
Args:
118+
offset: Time offset for the query
119+
page: Page number
120+
limit: Number of events per page
121+
event_type: Type of events to query (optional)
122+
123+
Returns:
124+
QueryEventResponse: List of events matching the query
125+
"""
126+
pass

src/mcpm/monitor/duckdb.py

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import duckdb
1212

13-
from mcpm.monitor.base import AccessEventType, AccessMonitor
13+
from mcpm.monitor.base import AccessEventType, AccessMonitor, MCPEvent, Pagination, QueryEventResponse
1414
from mcpm.utils.config import ConfigManager
1515

1616

@@ -234,6 +234,133 @@ def _track_event_impl(
234234
print(f"Error tracking event: {e}")
235235
return False
236236

237+
async def query_events(
238+
self, offset: str, page: int, limit: int, event_type: Optional[str] = None
239+
) -> QueryEventResponse:
240+
"""
241+
Query events from the database with pagination.
242+
243+
Args:
244+
offset: Time offset pattern like "3h" for past 3 hours, "1d" for past day, etc.
245+
page: Page number (1-based)
246+
limit: Number of events per page
247+
event_type: Type of events to filter by
248+
249+
Returns:
250+
Dict containing events, total count, page, and limit
251+
"""
252+
if not self._initialized:
253+
if not await self.initialize_storage():
254+
return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[])
255+
256+
async with self._lock:
257+
response = await asyncio.to_thread(
258+
self._query_events_impl,
259+
offset,
260+
page,
261+
limit,
262+
event_type,
263+
)
264+
return response
265+
266+
def _query_events_impl(
267+
self,
268+
offset: str,
269+
page: int,
270+
limit: int,
271+
event_type: Optional[str],
272+
) -> QueryEventResponse:
273+
"""
274+
Query events from the storage backend
275+
276+
Args:
277+
offset: Time offset for the query
278+
page: Page number
279+
limit: Number of events per page
280+
event_type: Type of events to query (optional)
281+
282+
Returns:
283+
QueryEventResponse: List of events matching the query
284+
"""
285+
try:
286+
# Build the base query and conditions
287+
conditions = []
288+
parameters = []
289+
290+
# handle time offset
291+
time_value = 0
292+
time_unit = ""
293+
294+
# Parse offset pattern like "3h", "1d", etc.
295+
for i, char in enumerate(offset):
296+
if char.isdigit():
297+
time_value = time_value * 10 + int(char)
298+
else:
299+
time_unit = offset[i:]
300+
break
301+
302+
if time_unit and time_value > 0:
303+
# Convert to SQL interval format
304+
interval_map = {"h": "HOUR", "d": "DAY", "w": "WEEK", "m": "MONTH"}
305+
306+
if time_unit.lower() in interval_map:
307+
conditions.append(
308+
f"timestamp >= TIMESTAMP '{datetime.now()}' - INTERVAL {time_value} {interval_map.get(time_unit.lower())}"
309+
)
310+
else:
311+
return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[])
312+
313+
if event_type:
314+
conditions.append("event_type = ?")
315+
parameters.append(event_type)
316+
317+
# Build the final query
318+
where_clause = " AND ".join(conditions)
319+
if where_clause:
320+
where_clause = f"WHERE {where_clause}"
321+
322+
sql_offset = (page - 1) * limit
323+
# Get total count
324+
count_query = f"SELECT COUNT(*) FROM monitor_events {where_clause}"
325+
total_result = self.connection.execute(count_query, parameters).fetchone()
326+
total = total_result[0] if total_result else 0
327+
328+
# Get paginated results
329+
query = f"""
330+
SELECT * FROM monitor_events
331+
{where_clause}
332+
ORDER BY timestamp DESC
333+
LIMIT ? OFFSET ?
334+
"""
335+
cursor = self.connection.execute(query, parameters + [limit, sql_offset])
336+
337+
# Convert result to dictionary
338+
column_names = [desc[0] for desc in cursor.description]
339+
events = []
340+
341+
for row in cursor.fetchall():
342+
event_dict = dict(zip(column_names, row))
343+
344+
for field in ["metadata", "raw_request", "raw_response"]:
345+
if event_dict[field] and isinstance(event_dict[field], str):
346+
try:
347+
event_dict[field] = json.loads(event_dict[field])
348+
except Exception:
349+
pass
350+
351+
event_dict["timestamp"] = datetime.strftime(event_dict["timestamp"], "%Y-%m-%d %H:%M:%S")
352+
events.append(MCPEvent.model_validate(event_dict))
353+
354+
return QueryEventResponse(
355+
pagination=Pagination(
356+
total=total, page=page, limit=limit, total_pages=1 if limit == 0 else (total + limit - 1) // limit
357+
),
358+
events=events,
359+
)
360+
except Exception as e:
361+
print(f"Error querying events: {e}")
362+
return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[])
363+
237364
async def close(self) -> None:
238365
"""Close the database connection asynchronously."""
239366
async with self._lock:

src/mcpm/monitor/event.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import datetime
2+
import time
3+
import typing
4+
from functools import wraps
5+
6+
from mcp.types import (
7+
CallToolRequest,
8+
CallToolResult,
9+
EmptyResult,
10+
GetPromptRequest,
11+
ReadResourceRequest,
12+
Request,
13+
ServerResult,
14+
TextContent,
15+
)
16+
17+
from mcpm.utils.config import PROMPT_SPLITOR, RESOURCE_SPLITOR, TOOL_SPLITOR
18+
19+
from .base import AccessEventType
20+
from .duckdb import DuckDBAccessMonitor
21+
22+
monitor = DuckDBAccessMonitor()
23+
24+
RequestT = typing.TypeVar("RequestT", bound=Request)
25+
MCPRequestHandler = typing.Callable[[RequestT], typing.Awaitable[ServerResult]]
26+
27+
28+
class TraceIdentifier(typing.TypedDict):
29+
client_id: str
30+
server_id: str
31+
resource_id: str
32+
33+
34+
class ResponseStatus(typing.TypedDict):
35+
success: bool
36+
error_message: str
37+
38+
39+
def get_trace_identifier(req: Request) -> TraceIdentifier:
40+
resource_id = ""
41+
if isinstance(req, CallToolRequest):
42+
server_id = req.params.name.split(TOOL_SPLITOR, 1)[0]
43+
elif isinstance(req, GetPromptRequest):
44+
server_id = req.params.name.split(PROMPT_SPLITOR, 1)[0]
45+
elif isinstance(req, ReadResourceRequest):
46+
# resource uri is formatted as {server_id}:{protocol}://{resource_path}
47+
server_id, resource_id = str(req.params.uri).split(RESOURCE_SPLITOR, 1)
48+
else:
49+
# currently only support call tool, get prompt and read resource
50+
server_id = ""
51+
resource_id = ""
52+
53+
return TraceIdentifier(client_id=req.params.meta.client_id, server_id=server_id, resource_id=resource_id) # type: ignore
54+
55+
56+
def get_response_status(server_result: ServerResult) -> ResponseStatus:
57+
result_root = server_result.root
58+
59+
if isinstance(result_root, EmptyResult):
60+
return ResponseStatus(success=False, error_message="empty result")
61+
62+
if isinstance(result_root, CallToolResult):
63+
if result_root.isError:
64+
return ResponseStatus(
65+
success=False,
66+
error_message=typing.cast(TextContent, result_root.content[0]).text,
67+
)
68+
else:
69+
return ResponseStatus(
70+
success=True,
71+
error_message="",
72+
)
73+
74+
return ResponseStatus(success=True, error_message="")
75+
76+
77+
def trace_event(event_type: AccessEventType):
78+
def decorator(func: MCPRequestHandler):
79+
@wraps(func)
80+
async def wrapper(request: Request):
81+
request_time = datetime.datetime.now().replace(microsecond=0)
82+
start_time = time.perf_counter()
83+
# parse client id, server id and resource id (optional) from request
84+
trace_identifier = get_trace_identifier(request)
85+
86+
response: ServerResult = await func(request)
87+
88+
# empty results and call tool failures are treated as not success
89+
response_status: ResponseStatus = get_response_status(response)
90+
91+
await monitor.track_event(
92+
event_type=event_type,
93+
server_id=trace_identifier["server_id"],
94+
client_id=trace_identifier["client_id"],
95+
resource_id=trace_identifier["resource_id"],
96+
timestamp=request_time,
97+
duration_ms=int((time.perf_counter() - start_time) * 1000),
98+
request_size=len(request.params.model_dump_json().encode("utf-8")),
99+
response_size=len(response.root.model_dump_json().encode("utf-8")),
100+
success=response_status["success"],
101+
error_message=response_status["error_message"],
102+
metadata=None,
103+
raw_request=request.model_dump_json(),
104+
raw_response=response.root.model_dump_json(),
105+
)
106+
return response
107+
108+
return wrapper
109+
110+
return decorator

0 commit comments

Comments
 (0)