Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 118 additions & 6 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
from datetime import datetime
from functools import wraps
import io
from io import StringIO
import json
from pathlib import Path
import time
from typing import Any, cast, Dict, List, Optional, Union
import uuid

# Third-Party
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse
import httpx
from pydantic import ValidationError
Expand Down Expand Up @@ -671,7 +672,6 @@ async def admin_edit_server(
update operation.

Expects form fields:
- id (optional): Updated UUID for the server
- name (optional): The updated name of the server
- description (optional): An updated description of the server's purpose
- icon (optional): Updated URL or path to the server's icon
Expand Down Expand Up @@ -780,7 +780,6 @@ async def admin_edit_server(
try:
LOGGER.debug(f"User {user} is editing server ID {server_id} with name: {form.get('name')}")
server = ServerUpdate(
id=form.get("id"),
name=form.get("name"),
description=form.get("description"),
icon=form.get("icon"),
Expand Down Expand Up @@ -1982,7 +1981,6 @@ async def admin_add_tool(

tool_data: dict[str, Any] = {
"name": form.get("name"),
"displayName": form.get("displayName"),
"url": form.get("url"),
"description": form.get("description"),
"request_type": request_type,
Expand Down Expand Up @@ -2048,7 +2046,6 @@ async def admin_edit_tool(

Expects form fields:
- name
- displayName (optional)
- url
- description (optional)
- requestType (to be mapped to request_type)
Expand Down Expand Up @@ -2223,7 +2220,6 @@ async def admin_edit_tool(

tool_data: dict[str, Any] = {
"name": form.get("name"),
"displayName": form.get("displayName"),
"custom_name": form.get("customName"),
"url": form.get("url"),
"description": form.get("description"),
Expand Down Expand Up @@ -4156,6 +4152,8 @@ async def admin_delete_root(uri: str, request: Request, user: str = Depends(requ
# Metrics
MetricsDict = Dict[str, Union[ToolMetrics, ResourceMetrics, ServerMetrics, PromptMetrics]]

# Import the response time formatting function
from mcpgateway.utils.metrics_common import format_response_time

# @admin_router.get("/metrics", response_model=MetricsDict)
# async def admin_get_metrics(
Expand Down Expand Up @@ -4234,6 +4232,120 @@ async def get_aggregated_metrics(
return metrics


@admin_router.get("/metrics/export", response_class=Response)
async def export_metrics_csv(
db: Session = Depends(get_db),
entity_type: str = Query(..., description="Entity type to export (tools, resources, prompts, servers)"),
limit: Optional[int] = Query(None, description="Maximum number of results to return. If not provided, all results are returned."),
user: str = Depends(require_auth),
) -> Response:
"""Export metrics for a specific entity type to CSV format.

This endpoint retrieves performance metrics for the specified entity type and
exports them to CSV format for download. All rows are exported, not just the top 5.
Response times are formatted to 3 decimal places.

Args:
db (Session): Database session dependency for querying metrics.
entity_type (str): Type of entity to export (tools, resources, prompts, servers).
limit (Optional[int]): Maximum number of results to return. If None, all results are returned.
user (str): Authenticated user.

Returns:
Response: CSV file download response containing the metrics data.

Raises:
HTTPException: If the entity type is invalid.
"""
LOGGER.debug(f"User {user} requested CSV export of {entity_type} metrics")

# Validate entity type
valid_types = ["tools", "resources", "prompts", "servers"]
if entity_type not in valid_types:
raise HTTPException(status_code=400, detail=f"Invalid entity type. Must be one of: {', '.join(valid_types)}")

# Get the top performers for the requested entity type without limit to get all rows
try:
if entity_type == "tools":
if limit:
performers = await tool_service.get_top_tools(db, limit=limit)
else:
performers = await tool_service.get_top_tools(db, limit=None)
elif entity_type == "resources":
if limit:
performers = await resource_service.get_top_resources(db, limit=limit)
else:
performers = await resource_service.get_top_resources(db, limit=None)
elif entity_type == "prompts":
if limit:
performers = await prompt_service.get_top_prompts(db, limit=limit)
else:
performers = await prompt_service.get_top_prompts(db, limit=None)
elif entity_type == "servers":
if limit:
performers = await server_service.get_top_servers(db, limit=limit)
else:
performers = await server_service.get_top_servers(db, limit=None)
except Exception as e:
LOGGER.error(f"Error exporting {entity_type} metrics to CSV: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to export metrics: {str(e)}")

# Handle empty data case
if not performers:
# Return empty CSV with headers
csv_content = "ID,Name,Execution Count,Average Response Time (s),Success Rate (%),Last Execution\n"
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={entity_type}_metrics.csv"}
)

# Create CSV content
output = StringIO()
writer = csv.writer(output)

# Write header row
writer.writerow([
"ID",
"Name",
"Execution Count",
"Average Response Time (s)",
"Success Rate (%)",
"Last Execution"
])

# Write data rows with formatted values
for performer in performers:
# Format response time to 3 decimal places
formatted_response_time = format_response_time(performer.avg_response_time) if performer.avg_response_time is not None else "N/A"

# Format success rate
success_rate = f"{performer.success_rate:.1f}" if performer.success_rate is not None else "N/A"

# Format timestamp
last_execution = performer.last_execution.isoformat() if performer.last_execution else "N/A"

writer.writerow([
performer.id,
performer.name,
performer.execution_count,
formatted_response_time,
success_rate,
last_execution
])

# Get the CSV content as a string
csv_content = output.getvalue()
output.close()

# Return CSV response
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={entity_type}_metrics.csv"}
)


@admin_router.post("/metrics/reset", response_model=Dict[str, object])
async def admin_reset_metrics(db: Session = Depends(get_db), user: str = Depends(require_auth)) -> Dict[str, object]:
"""
Expand Down
14 changes: 9 additions & 5 deletions mcpgateway/services/prompt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def shutdown(self) -> None:
self._event_subscribers.clear()
logger.info("Prompt service shutdown complete")

async def get_top_prompts(self, db: Session, limit: int = 5) -> List[TopPerformer]:
async def get_top_prompts(self, db: Session, limit: Optional[int] = 5) -> List[TopPerformer]:
"""Retrieve the top-performing prompts based on execution count.

Queries the database to get prompts with their metrics, ordered by the number of executions
Expand All @@ -151,7 +151,8 @@ async def get_top_prompts(self, db: Session, limit: int = 5) -> List[TopPerforme

Args:
db (Session): Database session for querying prompt metrics.
limit (int): Maximum number of prompts to return. Defaults to 5.
limit (Optional[int]): Maximum number of prompts to return. Defaults to 5.
If None, returns all prompts.

Returns:
List[TopPerformer]: A list of TopPerformer objects, each containing:
Expand All @@ -162,7 +163,7 @@ async def get_top_prompts(self, db: Session, limit: int = 5) -> List[TopPerforme
- success_rate: Success rate percentage, or None if no metrics.
- last_execution: Timestamp of the last execution, or None if no metrics.
"""
results = (
query = (
db.query(
DbPrompt.id,
DbPrompt.name,
Expand All @@ -180,9 +181,12 @@ async def get_top_prompts(self, db: Session, limit: int = 5) -> List[TopPerforme
.outerjoin(PromptMetric)
.group_by(DbPrompt.id, DbPrompt.name)
.order_by(desc("execution_count"))
.limit(limit)
.all()
)

if limit is not None:
query = query.limit(limit)

results = query.all()

return build_top_performers(results)

Expand Down
14 changes: 9 additions & 5 deletions mcpgateway/services/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def shutdown(self) -> None:
self._event_subscribers.clear()
logger.info("Resource service shutdown complete")

async def get_top_resources(self, db: Session, limit: int = 5) -> List[TopPerformer]:
async def get_top_resources(self, db: Session, limit: Optional[int] = 5) -> List[TopPerformer]:
"""Retrieve the top-performing resources based on execution count.

Queries the database to get resources with their metrics, ordered by the number of executions
Expand All @@ -146,7 +146,8 @@ async def get_top_resources(self, db: Session, limit: int = 5) -> List[TopPerfor

Args:
db (Session): Database session for querying resource metrics.
limit (int): Maximum number of resources to return. Defaults to 5.
limit (Optional[int]): Maximum number of resources to return. Defaults to 5.
If None, returns all resources.

Returns:
List[TopPerformer]: A list of TopPerformer objects, each containing:
Expand All @@ -157,7 +158,7 @@ async def get_top_resources(self, db: Session, limit: int = 5) -> List[TopPerfor
- success_rate: Success rate percentage, or None if no metrics.
- last_execution: Timestamp of the last execution, or None if no metrics.
"""
results = (
query = (
db.query(
DbResource.id,
DbResource.uri.label("name"), # Using URI as the name field for TopPerformer
Expand All @@ -175,9 +176,12 @@ async def get_top_resources(self, db: Session, limit: int = 5) -> List[TopPerfor
.outerjoin(ResourceMetric)
.group_by(DbResource.id, DbResource.uri)
.order_by(desc("execution_count"))
.limit(limit)
.all()
)

if limit is not None:
query = query.limit(limit)

results = query.all()

return build_top_performers(results)

Expand Down
33 changes: 12 additions & 21 deletions mcpgateway/services/server_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async def shutdown(self) -> None:
logger.info("Server service shutdown complete")

# get_top_server
async def get_top_servers(self, db: Session, limit: int = 5) -> List[TopPerformer]:
async def get_top_servers(self, db: Session, limit: Optional[int] = 5) -> List[TopPerformer]:
"""Retrieve the top-performing servers based on execution count.

Queries the database to get servers with their metrics, ordered by the number of executions
Expand All @@ -138,7 +138,8 @@ async def get_top_servers(self, db: Session, limit: int = 5) -> List[TopPerforme

Args:
db (Session): Database session for querying server metrics.
limit (int): Maximum number of servers to return. Defaults to 5.
limit (Optional[int]): Maximum number of servers to return. Defaults to 5.
If None, returns all servers.

Returns:
List[TopPerformer]: A list of TopPerformer objects, each containing:
Expand All @@ -149,7 +150,7 @@ async def get_top_servers(self, db: Session, limit: int = 5) -> List[TopPerforme
- success_rate: Success rate percentage, or None if no metrics.
- last_execution: Timestamp of the last execution, or None if no metrics.
"""
results = (
query = (
db.query(
DbServer.id,
DbServer.name,
Expand All @@ -167,9 +168,14 @@ async def get_top_servers(self, db: Session, limit: int = 5) -> List[TopPerforme
.outerjoin(ServerMetric)
.group_by(DbServer.id, DbServer.name)
.order_by(desc("execution_count"))
.limit(limit)
.all()
)

if limit is not None:
query = query.limit(limit)

results = query.all()

return build_top_performers(results)

return build_top_performers(results)

Expand Down Expand Up @@ -315,12 +321,6 @@ async def register_server(self, db: Session, server_in: ServerCreate) -> ServerR
is_active=True,
tags=server_in.tags or [],
)

# Set custom UUID if provided
if server_in.id:
# Normalize UUID to hex format (no dashes) to match database storage
normalized_uuid = str(uuid_module.UUID(server_in.id)).replace("-", "")
db_server.id = normalized_uuid
db.add(db_server)

# Associate tools, verifying each exists.
Expand Down Expand Up @@ -505,17 +505,14 @@ async def update_server(self, db: Session, server_id: str, server_update: Server
>>> service = ServerService()
>>> db = MagicMock()
>>> server = MagicMock()
>>> server.id = 'server_id'
>>> db.get.return_value = server
>>> db.commit = MagicMock()
>>> db.refresh = MagicMock()
>>> db.execute.return_value.scalar_one_or_none.return_value = None
>>> service._convert_server_to_read = MagicMock(return_value='server_read')
>>> ServerRead.model_validate = MagicMock(return_value='server_read')
>>> server_update = MagicMock()
>>> server_update.id = None # No UUID change
>>> import asyncio
>>> asyncio.run(service.update_server(db, 'server_id', server_update))
>>> asyncio.run(service.update_server(db, 'server_id', MagicMock()))
'server_read'
"""
try:
Expand All @@ -534,12 +531,6 @@ async def update_server(self, db: Session, server_id: str, server_update: Server
)

# Update simple fields
if server_update.id is not None and server_update.id != server.id:
# Check if the new UUID is already in use
existing = db.get(DbServer, server_update.id)
if existing:
raise ServerError(f"Server with ID {server_update.id} already exists")
server.id = server_update.id
if server_update.name is not None:
server.name = server_update.name
if server_update.description is not None:
Expand Down
Loading
Loading