Skip to content

Commit 1dfc118

Browse files
committed
Change monitor functions to async
1 parent 4282020 commit 1dfc118

File tree

4 files changed

+208
-138
lines changed

4 files changed

+208
-138
lines changed

src/mcpm/monitor/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,19 @@
1111
from .duckdb import DuckDBAccessMonitor
1212

1313

14-
# Convenience function to get a monitor instance
15-
def get_monitor(db_path: Optional[str] = None) -> AccessMonitor:
14+
# Convenience function
15+
async def get_monitor(db_path: Optional[str] = None) -> AccessMonitor:
1616
"""
1717
Get a configured access monitor instance
1818
1919
Args:
20-
db_path: Optional custom path to the database file
20+
db_path: Optional custom path to the database file. Defaults to ~/.mcpm/monitor.duckdb
2121
2222
Returns:
2323
Configured AccessMonitor instance
2424
"""
2525
monitor = DuckDBAccessMonitor(db_path) if db_path else DuckDBAccessMonitor()
26-
monitor.initialize_storage()
26+
await monitor.initialize_storage()
2727
return monitor
2828

2929

src/mcpm/monitor/base.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class AccessMonitor(ABC):
2020
"""Abstract interface for monitoring MCP access events"""
2121

2222
@abstractmethod
23-
def track_event(
23+
async def track_event(
2424
self,
2525
event_type: AccessEventType,
2626
server_id: str,
@@ -35,7 +35,7 @@ def track_event(
3535
metadata: Optional[Dict[str, Any]] = None,
3636
raw_request: Optional[Union[Dict[str, Any], str]] = None,
3737
raw_response: Optional[Union[Dict[str, Any], str]] = None,
38-
) -> None:
38+
) -> bool:
3939
"""
4040
Track an MCP access event
4141
@@ -53,11 +53,14 @@ def track_event(
5353
metadata: Additional metadata about the event
5454
raw_request: Raw request data as JSON object or string
5555
raw_response: Raw response data as JSON object or string
56+
57+
Returns:
58+
bool: True if event was successfully tracked, False otherwise
5659
"""
5760
pass
5861

5962
@abstractmethod
60-
def initialize_storage(self) -> bool:
63+
async def initialize_storage(self) -> bool:
6164
"""
6265
Initialize the storage backend for tracking events
6366
@@ -67,7 +70,7 @@ def initialize_storage(self) -> bool:
6770
pass
6871

6972
@abstractmethod
70-
def close(self) -> None:
73+
async def close(self) -> None:
7174
"""
7275
Close any open connections to the storage backend
7376
"""

src/mcpm/monitor/duckdb.py

Lines changed: 150 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""
2-
DuckDB implementation of the access monitor
2+
Implementation of the access monitor using DuckDB
33
"""
44

5+
import asyncio
56
import json
67
import os
78
from datetime import datetime
@@ -13,83 +14,85 @@
1314

1415

1516
class DuckDBAccessMonitor(AccessMonitor):
16-
"""DuckDB implementation of MCP access monitoring"""
17+
"""
18+
Implementation of the access monitor using DuckDB.
19+
This uses a thread pool to execute DuckDB operations asynchronously.
20+
"""
1721

18-
def __init__(self, db_path: str = "~/.config/mcpm/monitor.duckdb"):
22+
def __init__(self, db_path: str = "~/.mcpm/monitor.duckdb"):
1923
"""
20-
Initialize the DuckDB access monitor
24+
Initialize the DuckDBAccessMonitor.
2125
2226
Args:
2327
db_path: Path to the DuckDB database file
2428
"""
2529
self.db_path = os.path.expanduser(db_path)
26-
self.db_dir = os.path.dirname(self.db_path)
2730
self.connection = None
31+
self._initialized = False
32+
self._lock = asyncio.Lock()
2833

29-
def initialize_storage(self) -> bool:
30-
"""Initialize the DuckDB database and tables"""
34+
async def initialize_storage(self) -> bool:
35+
"""
36+
Initialize the storage for the access monitor asynchronously.
37+
38+
Returns:
39+
bool: True if successful, False otherwise
40+
"""
41+
async with self._lock:
42+
if self._initialized:
43+
return True
44+
45+
try:
46+
# Run the initialization in a thread
47+
return await asyncio.to_thread(self._initialize_storage_impl)
48+
except Exception as e:
49+
print(f"Error initializing storage asynchronously: {e}")
50+
return False
51+
52+
def _initialize_storage_impl(self) -> bool:
53+
"""Internal implementation of storage initialization."""
3154
try:
32-
# Create directory if it doesn't exist
33-
os.makedirs(self.db_dir, exist_ok=True)
55+
# Create the directory if it doesn't exist
56+
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
3457

35-
# Connect to database
58+
# Connect to the database
3659
self.connection = duckdb.connect(self.db_path)
3760

38-
# Create a sequence for auto-incrementing IDs
39-
self.connection.execute("""
40-
CREATE SEQUENCE IF NOT EXISTS monitor_events_id_seq START 1;
41-
""")
42-
43-
# Create monitor_events table if it doesn't exist
61+
# Create the events table if it doesn't exist using identity column for auto-incrementing
4462
self.connection.execute("""
63+
CREATE SEQUENCE IF NOT EXISTS event_id_seq;
64+
4565
CREATE TABLE IF NOT EXISTS monitor_events (
46-
id INTEGER DEFAULT nextval('monitor_events_id_seq') PRIMARY KEY,
47-
event_type VARCHAR NOT NULL,
48-
server_id VARCHAR NOT NULL,
49-
resource_id VARCHAR NOT NULL,
66+
id INTEGER DEFAULT nextval('event_id_seq') PRIMARY KEY,
67+
event_type VARCHAR,
68+
server_id VARCHAR,
69+
resource_id VARCHAR,
5070
client_id VARCHAR,
51-
timestamp TIMESTAMP NOT NULL,
71+
timestamp TIMESTAMP,
5272
duration_ms INTEGER,
5373
request_size INTEGER,
5474
response_size INTEGER,
55-
success BOOLEAN NOT NULL,
75+
success BOOLEAN,
5676
error_message VARCHAR,
5777
metadata JSON,
5878
raw_request JSON,
5979
raw_response JSON
6080
)
6181
""")
6282

63-
# Create index on timestamp for efficient time-based queries
64-
self.connection.execute("""
65-
CREATE INDEX IF NOT EXISTS idx_monitor_events_timestamp
66-
ON monitor_events (timestamp)
67-
""")
68-
69-
# Create index on server_id for filtering by server
70-
self.connection.execute("""
71-
CREATE INDEX IF NOT EXISTS idx_monitor_events_server
72-
ON monitor_events (server_id)
73-
""")
74-
75-
# Create index on event_type for filtering by event type
76-
self.connection.execute("""
77-
CREATE INDEX IF NOT EXISTS idx_monitor_events_type
78-
ON monitor_events (event_type)
79-
""")
80-
81-
# For backward compatibility, create a view that maps to the old table name
83+
# Create a backward compatibility view
8284
self.connection.execute("""
8385
CREATE VIEW IF NOT EXISTS access_events AS
8486
SELECT * FROM monitor_events
8587
""")
8688

89+
self._initialized = True
8790
return True
8891
except Exception as e:
89-
print(f"Error initializing DuckDB storage: {e}")
92+
print(f"Error initializing storage: {e}")
9093
return False
9194

92-
def track_event(
95+
async def track_event(
9396
self,
9497
event_type: AccessEventType,
9598
server_id: str,
@@ -102,61 +105,108 @@ def track_event(
102105
success: bool = True,
103106
error_message: Optional[str] = None,
104107
metadata: Optional[Dict[str, Any]] = None,
105-
raw_request: Optional[Union[Dict[str, Any], str]] = None,
106-
raw_response: Optional[Union[Dict[str, Any], str]] = None,
107-
) -> None:
108-
"""Track an MCP access event"""
109-
# Initialize connection if needed
110-
if self.connection is None:
111-
self.initialize_storage()
112-
113-
# Use current time if no timestamp provided
114-
if timestamp is None:
115-
timestamp = datetime.now()
116-
117-
# Convert metadata to JSON string if provided
118-
metadata_json = json.dumps(metadata) if metadata else None
119-
120-
# Convert raw request and response to JSON strings
121-
# If they're already dictionaries, convert them to JSON strings
122-
# If they're strings, try to parse as JSON first, if that fails, store as JSON-encoded strings
123-
request_json = None
124-
if raw_request is not None:
125-
if isinstance(raw_request, dict):
126-
request_json = json.dumps(raw_request)
127-
else:
128-
try:
129-
# Try to parse as JSON first
130-
json.loads(raw_request)
131-
request_json = raw_request # It's already a valid JSON string
132-
except json.JSONDecodeError:
133-
# Not valid JSON, encode as a JSON string
108+
raw_request: Optional[Union[str, Dict]] = None,
109+
raw_response: Optional[Union[str, Dict]] = None,
110+
) -> bool:
111+
"""
112+
Track an access event asynchronously.
113+
114+
Args:
115+
event_type: Type of the event
116+
server_id: Identifier for the server handling the request
117+
resource_id: Identifier for the accessed resource
118+
client_id: Identifier for the client making the request
119+
timestamp: When the event occurred
120+
duration_ms: Duration of the event in milliseconds
121+
request_size: Size of the request in bytes
122+
response_size: Size of the response in bytes
123+
success: Whether the access was successful
124+
error_message: Error message if the access failed
125+
metadata: Additional metadata for the event
126+
raw_request: Raw request data (string or dict)
127+
raw_response: Raw response data (string or dict)
128+
129+
Returns:
130+
bool: True if event was successfully tracked, False otherwise
131+
"""
132+
if not self._initialized:
133+
if not await self.initialize_storage():
134+
return False
135+
136+
async with self._lock:
137+
try:
138+
# Use current time if timestamp is not provided
139+
if timestamp is None:
140+
timestamp = datetime.now()
141+
142+
# Run the tracking operation in a thread
143+
return await asyncio.to_thread(
144+
self._track_event_impl,
145+
event_type,
146+
server_id,
147+
resource_id,
148+
client_id,
149+
timestamp,
150+
duration_ms,
151+
request_size,
152+
response_size,
153+
success,
154+
error_message,
155+
metadata,
156+
raw_request,
157+
raw_response,
158+
)
159+
except Exception as e:
160+
print(f"Error tracking event asynchronously: {e}")
161+
return False
162+
163+
def _track_event_impl(
164+
self,
165+
event_type: AccessEventType,
166+
server_id: str,
167+
resource_id: str,
168+
client_id: Optional[str],
169+
timestamp: datetime,
170+
duration_ms: Optional[int],
171+
request_size: Optional[int],
172+
response_size: Optional[int],
173+
success: bool,
174+
error_message: Optional[str],
175+
metadata: Optional[Dict[str, Any]],
176+
raw_request: Optional[Union[str, Dict]],
177+
raw_response: Optional[Union[str, Dict]],
178+
) -> bool:
179+
"""Internal implementation of track_event."""
180+
try:
181+
# Convert metadata to JSON if provided
182+
metadata_json = json.dumps(metadata) if metadata else None
183+
184+
# Process raw request data
185+
request_json = None
186+
if raw_request is not None:
187+
if isinstance(raw_request, dict):
134188
request_json = json.dumps(raw_request)
189+
else:
190+
request_json = raw_request
135191

136-
response_json = None
137-
if raw_response is not None:
138-
if isinstance(raw_response, dict):
139-
response_json = json.dumps(raw_response)
140-
else:
141-
try:
142-
# Try to parse as JSON first
143-
json.loads(raw_response)
144-
response_json = raw_response # It's already a valid JSON string
145-
except json.JSONDecodeError:
146-
# Not valid JSON, encode as a JSON string
192+
# Process raw response data
193+
response_json = None
194+
if raw_response is not None:
195+
if isinstance(raw_response, dict):
147196
response_json = json.dumps(raw_response)
197+
else:
198+
response_json = raw_response
148199

149-
# Insert event into database
150-
try:
200+
# Insert the event into the database
151201
self.connection.execute(
152202
"""
153203
INSERT INTO monitor_events (
154204
event_type, server_id, resource_id, client_id, timestamp,
155-
duration_ms, request_size, response_size,
156-
success, error_message, metadata, raw_request, raw_response
205+
duration_ms, request_size, response_size, success, error_message,
206+
metadata, raw_request, raw_response
157207
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
158208
""",
159-
[
209+
(
160210
event_type.name,
161211
server_id,
162212
resource_id,
@@ -170,17 +220,22 @@ def track_event(
170220
metadata_json,
171221
request_json,
172222
response_json,
173-
],
223+
),
174224
)
225+
226+
return True
175227
except Exception as e:
176228
print(f"Error tracking event: {e}")
229+
return False
177230

178-
def close(self) -> None:
179-
"""Close the database connection"""
231+
async def close(self) -> None:
232+
"""Close the database connection asynchronously."""
233+
async with self._lock:
234+
if self.connection:
235+
await asyncio.to_thread(self._close_impl)
236+
237+
def _close_impl(self) -> None:
238+
"""Internal implementation of close."""
180239
if self.connection:
181240
self.connection.close()
182241
self.connection = None
183-
184-
def __del__(self):
185-
"""Ensure connection is closed when object is deleted"""
186-
self.close()

0 commit comments

Comments
 (0)