|
10 | 10 |
|
11 | 11 | import duckdb |
12 | 12 |
|
13 | | -from mcpm.monitor.base import AccessEventType, AccessMonitor |
| 13 | +from mcpm.monitor.base import AccessEventType, AccessMonitor, MCPEvent, Pagination, QueryEventResponse |
14 | 14 | from mcpm.utils.config import ConfigManager |
15 | 15 |
|
16 | 16 |
|
@@ -234,6 +234,133 @@ def _track_event_impl( |
234 | 234 | print(f"Error tracking event: {e}") |
235 | 235 | return False |
236 | 236 |
|
| 237 | + async def query_events( |
| 238 | + self, offset: str, page: int, limit: int, event_type: Optional[str] = None |
| 239 | + ) -> QueryEventResponse: |
| 240 | + """ |
| 241 | + Query events from the database with pagination. |
| 242 | +
|
| 243 | + Args: |
| 244 | + offset: Time offset pattern like "3h" for past 3 hours, "1d" for past day, etc. |
| 245 | + page: Page number (1-based) |
| 246 | + limit: Number of events per page |
| 247 | + event_type: Type of events to filter by |
| 248 | +
|
| 249 | + Returns: |
| 250 | + Dict containing events, total count, page, and limit |
| 251 | + """ |
| 252 | + if not self._initialized: |
| 253 | + if not await self.initialize_storage(): |
| 254 | + return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[]) |
| 255 | + |
| 256 | + async with self._lock: |
| 257 | + response = await asyncio.to_thread( |
| 258 | + self._query_events_impl, |
| 259 | + offset, |
| 260 | + page, |
| 261 | + limit, |
| 262 | + event_type, |
| 263 | + ) |
| 264 | + return response |
| 265 | + |
| 266 | + def _query_events_impl( |
| 267 | + self, |
| 268 | + offset: str, |
| 269 | + page: int, |
| 270 | + limit: int, |
| 271 | + event_type: Optional[str], |
| 272 | + ) -> QueryEventResponse: |
| 273 | + """ |
| 274 | + Query events from the storage backend |
| 275 | +
|
| 276 | + Args: |
| 277 | + offset: Time offset for the query |
| 278 | + page: Page number |
| 279 | + limit: Number of events per page |
| 280 | + event_type: Type of events to query (optional) |
| 281 | +
|
| 282 | + Returns: |
| 283 | + QueryEventResponse: List of events matching the query |
| 284 | + """ |
| 285 | + try: |
| 286 | + # Build the base query and conditions |
| 287 | + conditions = [] |
| 288 | + parameters = [] |
| 289 | + |
| 290 | + # handle time offset |
| 291 | + time_value = 0 |
| 292 | + time_unit = "" |
| 293 | + |
| 294 | + # Parse offset pattern like "3h", "1d", etc. |
| 295 | + for i, char in enumerate(offset): |
| 296 | + if char.isdigit(): |
| 297 | + time_value = time_value * 10 + int(char) |
| 298 | + else: |
| 299 | + time_unit = offset[i:] |
| 300 | + break |
| 301 | + |
| 302 | + if time_unit and time_value > 0: |
| 303 | + # Convert to SQL interval format |
| 304 | + interval_map = {"h": "HOUR", "d": "DAY", "w": "WEEK", "m": "MONTH"} |
| 305 | + |
| 306 | + if time_unit.lower() in interval_map: |
| 307 | + conditions.append( |
| 308 | + f"timestamp >= TIMESTAMP '{datetime.now()}' - INTERVAL {time_value} {interval_map.get(time_unit.lower())}" |
| 309 | + ) |
| 310 | + else: |
| 311 | + return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[]) |
| 312 | + |
| 313 | + if event_type: |
| 314 | + conditions.append("event_type = ?") |
| 315 | + parameters.append(event_type) |
| 316 | + |
| 317 | + # Build the final query |
| 318 | + where_clause = " AND ".join(conditions) |
| 319 | + if where_clause: |
| 320 | + where_clause = f"WHERE {where_clause}" |
| 321 | + |
| 322 | + sql_offset = (page - 1) * limit |
| 323 | + # Get total count |
| 324 | + count_query = f"SELECT COUNT(*) FROM monitor_events {where_clause}" |
| 325 | + total_result = self.connection.execute(count_query, parameters).fetchone() |
| 326 | + total = total_result[0] if total_result else 0 |
| 327 | + |
| 328 | + # Get paginated results |
| 329 | + query = f""" |
| 330 | + SELECT * FROM monitor_events |
| 331 | + {where_clause} |
| 332 | + ORDER BY timestamp DESC |
| 333 | + LIMIT ? OFFSET ? |
| 334 | + """ |
| 335 | + cursor = self.connection.execute(query, parameters + [limit, sql_offset]) |
| 336 | + |
| 337 | + # Convert result to dictionary |
| 338 | + column_names = [desc[0] for desc in cursor.description] |
| 339 | + events = [] |
| 340 | + |
| 341 | + for row in cursor.fetchall(): |
| 342 | + event_dict = dict(zip(column_names, row)) |
| 343 | + |
| 344 | + for field in ["metadata", "raw_request", "raw_response"]: |
| 345 | + if event_dict[field] and isinstance(event_dict[field], str): |
| 346 | + try: |
| 347 | + event_dict[field] = json.loads(event_dict[field]) |
| 348 | + except Exception: |
| 349 | + pass |
| 350 | + |
| 351 | + event_dict["timestamp"] = datetime.strftime(event_dict["timestamp"], "%Y-%m-%d %H:%M:%S") |
| 352 | + events.append(MCPEvent.model_validate(event_dict)) |
| 353 | + |
| 354 | + return QueryEventResponse( |
| 355 | + pagination=Pagination( |
| 356 | + total=total, page=page, limit=limit, total_pages=1 if limit == 0 else (total + limit - 1) // limit |
| 357 | + ), |
| 358 | + events=events, |
| 359 | + ) |
| 360 | + except Exception as e: |
| 361 | + print(f"Error querying events: {e}") |
| 362 | + return QueryEventResponse(pagination=Pagination(total=0, page=0, limit=0, total_pages=0), events=[]) |
| 363 | + |
237 | 364 | async def close(self) -> None: |
238 | 365 | """Close the database connection asynchronously.""" |
239 | 366 | async with self._lock: |
|
0 commit comments