Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions AIGateway/src/crud/mcp_approvals.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,52 @@ async def get_approval_history(
return []


async def get_pending_request(org_id: int, agent_key_id: int, tool_name: str) -> Optional[dict]:
"""Return an existing pending, non-expired approval request for this agent+tool."""
async with get_db() as db:
result = await db.execute(
text("""
SELECT id, status, expires_at
FROM ai_gateway_mcp_approval_requests
WHERE organization_id = :org_id
AND agent_key_id = :agent_key_id
AND tool_name = :tool_name
AND status = 'pending'
AND expires_at > NOW()
ORDER BY created_at DESC
LIMIT 1
"""),
{"org_id": org_id, "agent_key_id": agent_key_id, "tool_name": tool_name},
)
row = result.mappings().first()
if row is None:
return None
return dict(row)


async def get_approved_request(org_id: int, agent_key_id: int, tool_name: str) -> Optional[dict]:
"""Check if an approved, non-expired approval request exists for this agent+tool."""
async with get_db() as db:
result = await db.execute(
text("""
SELECT id, status, expires_at
FROM ai_gateway_mcp_approval_requests
WHERE organization_id = :org_id
AND agent_key_id = :agent_key_id
AND tool_name = :tool_name
AND status = 'approved'
AND expires_at > NOW()
ORDER BY decided_at DESC
LIMIT 1
"""),
{"org_id": org_id, "agent_key_id": agent_key_id, "tool_name": tool_name},
)
row = result.mappings().first()
if row is None:
return None
return dict(row)


async def get_approval_status(org_id: int, request_id: int) -> Optional[dict]:
async with get_db() as db:
result = await db.execute(
Expand Down Expand Up @@ -202,3 +248,15 @@ async def decide_approval(
return None
return dict(row)
return None


async def delete_expired_approval_requests(retention_days: int = 30, batch_size: int = 5000) -> int:
"""Delete decided or expired approval requests older than retention_days in batches."""
from utils.batch_delete import batch_delete_expired

return await batch_delete_expired(
table="ai_gateway_mcp_approval_requests",
where_clause="(status != 'pending' OR expires_at < NOW())",
retention_days=retention_days,
batch_size=batch_size,
)
24 changes: 10 additions & 14 deletions AIGateway/src/crud/mcp_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,13 @@ async def get_audit_stats_by_agent(org_id: int, days: int = 7) -> list[dict]:
]


async def delete_expired_audit_logs(retention_days: int = 30) -> int:
"""Delete audit logs older than retention_days. Returns count of deleted rows."""
retention_days = int(retention_days)

async with get_db() as db:
result = await db.execute(
text(f"""
DELETE FROM ai_gateway_mcp_audit_logs
WHERE created_at < NOW() - INTERVAL '{retention_days} days'
RETURNING id
"""),
)
await db.commit()
return len(result.fetchall())
async def delete_expired_audit_logs(retention_days: int = 30, batch_size: int = 5000) -> int:
"""Delete audit logs older than retention_days in batches. Returns count of deleted rows."""
from utils.batch_delete import batch_delete_expired

return await batch_delete_expired(
table="ai_gateway_mcp_audit_logs",
where_clause="",
retention_days=retention_days,
batch_size=batch_size,
)
15 changes: 15 additions & 0 deletions AIGateway/src/routers/mcp_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,18 @@ async def cleanup_audit_logs(request: Request):

deleted = await delete_expired_audit_logs(settings.mcp_audit_retention_days)
return {"status": "success", "deleted": deleted}


# ---------------------------------------------------------------------------
# POST /mcp/audit/cleanup-approvals
# ---------------------------------------------------------------------------

@router.post("/cleanup-approvals", status_code=status.HTTP_200_OK)
async def cleanup_approval_requests(request: Request):
"""Delete decided/expired approval requests older than retention period."""
verify_internal_key(request)
from config import settings
from crud.mcp_approvals import delete_expired_approval_requests

deleted = await delete_expired_approval_requests(settings.mcp_audit_retention_days)
return {"status": "success", "deleted": deleted}
52 changes: 49 additions & 3 deletions AIGateway/src/routers/mcp_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import asyncio
import logging
import time
from datetime import datetime, timezone, timedelta

from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse

from config import settings
from crud.mcp_approvals import create_approval_request, get_approval_status, get_approved_request, get_pending_request
from crud.mcp_tools import get_all_tools
from services.mcp_audit_service import log_tool_call
from services.mcp_guardrail_service import scan_tool_input
Expand All @@ -34,8 +37,11 @@
router = APIRouter()


def _jsonrpc_error(id, code: int, message: str) -> dict:
return {"jsonrpc": "2.0", "id": id, "error": {"code": code, "message": message}}
def _jsonrpc_error(id, code: int, message: str, data: dict | None = None) -> dict:
error = {"code": code, "message": message}
if data:
error["data"] = data
return {"jsonrpc": "2.0", "id": id, "error": error}


def _jsonrpc_result(id, result: dict) -> dict:
Expand Down Expand Up @@ -135,7 +141,28 @@ async def _audit(status: str, summary: str | None, is_error: bool):
await enforce_mcp_rate_limits(agent_key, tool_name)

if tool.get("requires_approval"):
return JSONResponse(content=_jsonrpc_error(msg_id, -32001, "Tool requires approval"), status_code=200)
# Check if an approved request already exists for this agent+tool
approved = await get_approved_request(org_id, agent_key["id"], tool_name)
if not approved:
# Reuse an existing pending request if one exists, otherwise create a new one
pending = await get_pending_request(org_id, agent_key["id"], tool_name)
if pending:
approval = pending
else:
expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.mcp_approval_expiry_seconds)
approval = await create_approval_request(org_id, {
"agent_key_id": agent_key["id"],
"tool_id": tool.get("id"),
"tool_name": tool_name,
"arguments": arguments,
"expires_at": expires_at,
})
await _audit("approval_required", f"Approval request {approval.get('id')} created", False)
return JSONResponse(content=_jsonrpc_error(msg_id, -32001, "Tool requires approval", {
"approval_id": approval.get("id"),
"poll_endpoint": f"/v1/mcp/approvals/{approval.get('id')}/status",
"expires_at": approval["expires_at"].isoformat() if hasattr(approval["expires_at"], "isoformat") else str(approval["expires_at"]),
}), status_code=200)

scan_result = await scan_tool_input(org_id, tool_name, arguments)
if scan_result and scan_result.blocked:
Expand Down Expand Up @@ -182,6 +209,25 @@ async def _audit(status: str, summary: str | None, is_error: bool):
return JSONResponse(content=_jsonrpc_error(msg_id, -32601, f"Method not found: {method}"), status_code=200)


@router.get("/v1/mcp/approvals/{request_id}/status")
async def mcp_approval_status(request: Request, request_id: int):
"""Poll approval status — authenticated via agent key."""
agent_key = await _extract_agent_key(request)
org_id = agent_key["organization_id"]

approval = await get_approval_status(org_id, request_id)
if not approval:
raise HTTPException(status_code=404, detail="Approval request not found")

return {
"approval_id": approval["id"],
"status": approval["status"],
"decided_at": approval["decided_at"].isoformat() if approval.get("decided_at") else None,
"decision_reason": approval.get("decision_reason"),
"expires_at": approval["expires_at"].isoformat() if approval.get("expires_at") else None,
}


@router.get("/v1/mcp")
async def mcp_sse(request: Request):
"""SSE endpoint for server-initiated messages. Keep-alive for v1."""
Expand Down
15 changes: 14 additions & 1 deletion AIGateway/src/services/mcp_audit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@

logger = logging.getLogger("uvicorn")

MAX_ARGUMENTS_SIZE = 10_240 # 10 KB


def _cap_arguments(arguments: dict | None) -> str:
"""Serialize arguments to JSON, truncating to MAX_ARGUMENTS_SIZE."""
if not arguments:
return "{}"
serialized = json.dumps(arguments)
if len(serialized) <= MAX_ARGUMENTS_SIZE:
return serialized
original_size = len(serialized)
return json.dumps({"_truncated": True, "_original_size": original_size})


async def log_tool_call(
organization_id: int,
Expand Down Expand Up @@ -48,7 +61,7 @@ async def log_tool_call(
"agent_key_id": agent_key_id,
"server_id": server_id,
"tool_name": tool_name,
"arguments": json.dumps(arguments) if arguments else "{}",
"arguments": _cap_arguments(arguments),
"result_status": result_status,
"result_summary": result_summary,
"is_error": is_error,
Expand Down
47 changes: 47 additions & 0 deletions AIGateway/src/utils/batch_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Shared batched-delete helper for retention cleanup jobs."""

from sqlalchemy import text

from database.db import get_db


async def batch_delete_expired(
table: str,
where_clause: str,
retention_days: int = 30,
batch_size: int = 5000,
) -> int:
"""Delete rows matching where_clause older than retention_days in batches.

``where_clause`` is appended after
``WHERE created_at < NOW() - INTERVAL '<retention_days> days'``.
Pass an empty string if no extra conditions are needed.

Returns total number of deleted rows.
"""
retention_days = int(retention_days)
extra = f" AND {where_clause}" if where_clause else ""
total_deleted = 0

async with get_db() as db:
while True:
result = await db.execute(
text(f"""
DELETE FROM {table}
WHERE id IN (
SELECT id FROM {table}
WHERE created_at < NOW() - INTERVAL '{retention_days} days'
{extra}
LIMIT :batch_size
)
RETURNING id
"""),
{"batch_size": batch_size},
)
deleted = len(result.fetchall())
await db.commit()
total_deleted += deleted
if deleted < batch_size:
break

return total_deleted
18 changes: 18 additions & 0 deletions Clients/src/presentation/components/breadcrumbs/routeMapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ export const routeMapping: Record<string, string> = {
"/ai-gateway/settings/guardrails": "Guardrail settings",
"/ai-gateway/settings/risks": "Suggested risks",

// MCP Gateway
"/ai-gateway/mcp": "MCP Gateway",
"/ai-gateway/mcp/agent-keys": "Agent keys",
"/ai-gateway/mcp/servers": "Servers",
"/ai-gateway/mcp/tools": "Tool catalog",
"/ai-gateway/mcp/audit": "Audit log",
"/ai-gateway/mcp/approvals": "Approvals",
"/ai-gateway/mcp/guardrails": "Guardrails",

// Shadow AI
"/shadow-ai": "Shadow AI",
"/shadow-ai/insights": "Insights",
Expand Down Expand Up @@ -301,6 +310,15 @@ export const routeIconMapping: Record<string, () => React.ReactNode> = {
"/ai-gateway/settings/guardrails": () => React.createElement(Settings, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/settings/risks": () => React.createElement(Settings, { size: 14, strokeWidth: 1.5 }),

// MCP Gateway
"/ai-gateway/mcp": () => React.createElement(Router, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/agent-keys": () => React.createElement(KeyRound, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/servers": () => React.createElement(Router, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/tools": () => React.createElement(Layers, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/audit": () => React.createElement(FileSearch, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/approvals": () => React.createElement(ShieldCheck, { size: 14, strokeWidth: 1.5 }),
"/ai-gateway/mcp/guardrails": () => React.createElement(ShieldAlert, { size: 14, strokeWidth: 1.5 }),

// Super Admin
"/super-admin": () => React.createElement(Shield, { size: 14, strokeWidth: 1.5 }),
"/super-admin/users": () => React.createElement(Users, { size: 14, strokeWidth: 1.5 }),
Expand Down
2 changes: 1 addition & 1 deletion Clients/src/presentation/types/interfaces/i.header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface PageHeaderExtendedProps {
tipBoxEntity?: string;
summaryCards?: ReactNode;
summaryCardsJoyrideId?: string;
children: ReactNode;
children?: ReactNode;
alert?: ReactNode;
loadingToast?: ReactNode;
titleFontFamily?: string;
Expand Down
3 changes: 2 additions & 1 deletion Servers/jobs/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export * from "../services/slack/slackProducer";

import { scheduleDailyNotification } from "../services/slack/slackProducer";
import logger from "../utils/logger/fileLogger";
import { scheduleReportNotification, scheduleVendorReviewDateNotification, schedulePMMHourlyCheck, scheduleShadowAiJobs, schedulePolicyDueSoonNotification, scheduleAgentDiscoverySync, scheduleAiDetectionScanCheck, scheduleAiGatewayRiskDetection, scheduleAiGatewayCacheCleanup } from "../services/automations/automationProducer";
import { scheduleReportNotification, scheduleVendorReviewDateNotification, schedulePMMHourlyCheck, scheduleShadowAiJobs, schedulePolicyDueSoonNotification, scheduleAgentDiscoverySync, scheduleAiDetectionScanCheck, scheduleAiGatewayRiskDetection, scheduleAiGatewayCacheCleanup, scheduleMcpGatewayCleanup } from "../services/automations/automationProducer";

export async function addAllJobs(): Promise<void> {
await scheduleDailyNotification();
Expand All @@ -15,6 +15,7 @@ export async function addAllJobs(): Promise<void> {
await scheduleAiDetectionScanCheck();
await scheduleAiGatewayRiskDetection();
await scheduleAiGatewayCacheCleanup();
await scheduleMcpGatewayCleanup();
}

if (require.main === module) {
Expand Down
14 changes: 14 additions & 0 deletions Servers/services/automations/automationProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,17 @@ export async function scheduleAiGatewayCacheCleanup() {
},
);
}

export async function scheduleMcpGatewayCleanup() {
logger.info("Adding MCP Gateway cleanup jobs to the queue...");
// Daily at 3 AM — purge expired audit logs and decided approval requests
await automationQueue.add(
"mcp_audit_cleanup",
{ type: "mcp_gateway" },
{
repeat: { pattern: "0 3 * * *" },
removeOnComplete: true,
removeOnFail: false,
},
);
}
24 changes: 12 additions & 12 deletions Servers/services/automations/automationWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -518,23 +518,23 @@ export const createAutomationWorker = () => {
} else if (name === "ai_gateway_risk_detection") {
await runRiskDetection();
} else if (name === "ai_gateway_cache_cleanup") {
const AI_GATEWAY_URL = process.env.AI_GATEWAY_URL || "http://127.0.0.1:8100";
try {
const response = await fetch(
`${AI_GATEWAY_URL}/internal/cache/purge-expired`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
"x-internal-key": process.env.AI_GATEWAY_INTERNAL_KEY || "",
},
}
);
const result = await response.json() as { deleted?: number };
const result = await callAIGateway("POST", "/internal/cache/purge-expired");
console.log(`AI Gateway cache cleanup: ${result.deleted ?? 0} expired entries purged`);
} catch (err) {
console.error(`AI Gateway cache cleanup failed: ${err}`);
}
} else if (name === "mcp_audit_cleanup") {
try {
const [auditResult, approvalResult] = await Promise.all([
callAIGateway("POST", "/internal/mcp/audit/cleanup"),
callAIGateway("POST", "/internal/mcp/audit/cleanup-approvals"),
]);
console.log(`MCP audit cleanup: ${auditResult.deleted ?? 0} expired logs purged`);
console.log(`MCP approval cleanup: ${approvalResult.deleted ?? 0} expired requests purged`);
} catch (err) {
console.error(`MCP Gateway cleanup failed: ${err}`);
}
} else if (name === "send_pmm_notification") {
// PMM notification handling - send email using MJML templates
const { type, data } = job.data;
Expand Down
Loading