-
Notifications
You must be signed in to change notification settings - Fork 101
Expand file tree
/
Copy pathmcp_audit_service.py
More file actions
75 lines (66 loc) · 2.67 KB
/
mcp_audit_service.py
File metadata and controls
75 lines (66 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
MCP Gateway audit service — logs every tool invocation for compliance and observability.
"""
import json
import logging
from typing import Optional
from sqlalchemy import text
from database.db import get_db
logger = logging.getLogger("uvicorn")
MAX_ARGUMENTS_SIZE = 10_240 # 10 KB
def _cap_arguments(arguments: dict | None) -> str:
"""Serialize arguments to JSON, truncating to MAX_ARGUMENTS_SIZE."""
if not arguments:
return "{}"
serialized = json.dumps(arguments)
if len(serialized) <= MAX_ARGUMENTS_SIZE:
return serialized
original_size = len(serialized)
return json.dumps({"_truncated": True, "_original_size": original_size})
async def log_tool_call(
organization_id: int,
agent_key_id: int,
server_id: Optional[int],
tool_name: str,
arguments: Optional[dict],
result_status: str, # "success", "error", "blocked", "rate_limited", "approval_required"
result_summary: Optional[str], # truncated to 500 chars
is_error: bool,
latency_ms: int,
session_id: Optional[str] = None,
metadata: Optional[dict] = None,
) -> None:
"""Insert an MCP audit log entry. Fire-and-forget with error catch."""
try:
# Truncate result_summary to 500 chars
if result_summary and len(result_summary) > 500:
result_summary = result_summary[:497] + "..."
async with get_db() as db:
await db.execute(
text("""
INSERT INTO ai_gateway_mcp_audit_logs
(organization_id, agent_key_id, server_id, tool_name,
arguments, result_status, result_summary, is_error,
latency_ms, session_id, metadata)
VALUES
(:org_id, :agent_key_id, :server_id, :tool_name,
CAST(:arguments AS jsonb), :result_status, :result_summary, :is_error,
:latency_ms, :session_id, CAST(:metadata AS jsonb))
"""),
{
"org_id": organization_id,
"agent_key_id": agent_key_id,
"server_id": server_id,
"tool_name": tool_name,
"arguments": _cap_arguments(arguments),
"result_status": result_status,
"result_summary": result_summary,
"is_error": is_error,
"latency_ms": latency_ms,
"session_id": session_id,
"metadata": json.dumps(metadata or {}),
},
)
await db.commit()
except Exception as e:
logger.error(f"Failed to log MCP tool call: {e}")