Skip to content

[Feature Request]: Enhance Metrics Tab UI with Virtual Servers and Top 5 Performance Tables #657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 10, 2025
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
generated-members=sqlalchemy.func.*

# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ include Containerfile.lite
include __init__
include alembic.ini
include tox.ini
include alembic/README

# 2️⃣ Top-level config, examples and helper scripts
include *.py
Expand Down
171 changes: 67 additions & 104 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3975,118 +3975,81 @@ async def admin_delete_root(uri: str, request: Request, user: str = Depends(requ
MetricsDict = Dict[str, Union[ToolMetrics, ResourceMetrics, ServerMetrics, PromptMetrics]]


@admin_router.get("/metrics", response_model=MetricsDict)
async def admin_get_metrics(
# @admin_router.get("/metrics", response_model=MetricsDict)
# async def admin_get_metrics(
# db: Session = Depends(get_db),
# user: str = Depends(require_auth),
# ) -> MetricsDict:
# """
# Retrieve aggregate metrics for all entity types via the admin UI.

# This endpoint collects and returns usage metrics for tools, resources, servers,
# and prompts. The metrics are retrieved by calling the aggregate_metrics method
# on each respective service, which compiles statistics about usage patterns,
# success rates, and other relevant metrics for administrative monitoring
# and analysis purposes.

# Args:
# db (Session): Database session dependency.
# user (str): Authenticated user dependency.

# Returns:
# MetricsDict: A dictionary containing the aggregated metrics for tools,
# resources, servers, and prompts. Each value is a Pydantic model instance
# specific to the entity type.
# """
# logger.debug(f"User {user} requested aggregate metrics")
# tool_metrics = await tool_service.aggregate_metrics(db)
# resource_metrics = await resource_service.aggregate_metrics(db)
# server_metrics = await server_service.aggregate_metrics(db)
# prompt_metrics = await prompt_service.aggregate_metrics(db)

# # Return actual Pydantic model instances
# return {
# "tools": tool_metrics,
# "resources": resource_metrics,
# "servers": server_metrics,
# "prompts": prompt_metrics,
# }


@admin_router.get("/metrics")
async def get_aggregated_metrics(
db: Session = Depends(get_db),
user: str = Depends(require_auth),
) -> MetricsDict:
"""
Retrieve aggregate metrics for all entity types via the admin UI.
_user: str = Depends(require_auth),
) -> Dict[str, Any]:
"""Retrieve aggregated metrics and top performers for all entity types.

This endpoint collects and returns usage metrics for tools, resources, servers,
and prompts. The metrics are retrieved by calling the aggregate_metrics method
on each respective service, which compiles statistics about usage patterns,
success rates, and other relevant metrics for administrative monitoring
and analysis purposes.
This endpoint collects usage metrics and top-performing entities for tools,
resources, prompts, and servers by calling the respective service methods.
The results are compiled into a dictionary for administrative monitoring.

Args:
db (Session): Database session dependency.
user (str): Authenticated user dependency.
db (Session): Database session dependency for querying metrics.

Returns:
MetricsDict: A dictionary containing the aggregated metrics for tools,
resources, servers, and prompts. Each value is a Pydantic model instance
specific to the entity type.

Examples:
>>> import asyncio
>>> from unittest.mock import AsyncMock, MagicMock
>>> from mcpgateway.schemas import ToolMetrics, ResourceMetrics, ServerMetrics, PromptMetrics
>>>
>>> mock_db = MagicMock()
>>> mock_user = "test_user"
>>>
>>> mock_tool_metrics = ToolMetrics(
... total_executions=10,
... successful_executions=9,
... failed_executions=1,
... failure_rate=0.1,
... min_response_time=0.05,
... max_response_time=1.0,
... avg_response_time=0.3,
... last_execution_time=None
... )
>>> mock_resource_metrics = ResourceMetrics(
... total_executions=5,
... successful_executions=5,
... failed_executions=0,
... failure_rate=0.0,
... min_response_time=0.1,
... max_response_time=0.5,
... avg_response_time=0.2,
... last_execution_time=None
... )
>>> mock_server_metrics = ServerMetrics(
... total_executions=7,
... successful_executions=7,
... failed_executions=0,
... failure_rate=0.0,
... min_response_time=0.2,
... max_response_time=0.7,
... avg_response_time=0.4,
... last_execution_time=None
... )
>>> mock_prompt_metrics = PromptMetrics(
... total_executions=3,
... successful_executions=3,
... failed_executions=0,
... failure_rate=0.0,
... min_response_time=0.15,
... max_response_time=0.6,
... avg_response_time=0.35,
... last_execution_time=None
... )
>>>
>>> original_aggregate_metrics_tool = tool_service.aggregate_metrics
>>> original_aggregate_metrics_resource = resource_service.aggregate_metrics
>>> original_aggregate_metrics_server = server_service.aggregate_metrics
>>> original_aggregate_metrics_prompt = prompt_service.aggregate_metrics
>>>
>>> tool_service.aggregate_metrics = AsyncMock(return_value=mock_tool_metrics)
>>> resource_service.aggregate_metrics = AsyncMock(return_value=mock_resource_metrics)
>>> server_service.aggregate_metrics = AsyncMock(return_value=mock_server_metrics)
>>> prompt_service.aggregate_metrics = AsyncMock(return_value=mock_prompt_metrics)
>>>
>>> async def test_admin_get_metrics():
... result = await admin_get_metrics(mock_db, mock_user)
... return (
... isinstance(result, dict) and
... result.get("tools") == mock_tool_metrics and
... result.get("resources") == mock_resource_metrics and
... result.get("servers") == mock_server_metrics and
... result.get("prompts") == mock_prompt_metrics
... )
>>>
>>> import asyncio; asyncio.run(test_admin_get_metrics())
True
>>>
>>> tool_service.aggregate_metrics = original_aggregate_metrics_tool
>>> resource_service.aggregate_metrics = original_aggregate_metrics_resource
>>> server_service.aggregate_metrics = original_aggregate_metrics_server
>>> prompt_service.aggregate_metrics = original_aggregate_metrics_prompt
Dict[str, Any]: A dictionary containing aggregated metrics and top performers
for tools, resources, prompts, and servers. The structure includes:
- 'tools': Metrics for tools.
- 'resources': Metrics for resources.
- 'prompts': Metrics for prompts.
- 'servers': Metrics for servers.
- 'topPerformers': A nested dictionary with top 5 tools, resources, prompts,
and servers.
"""
logger.debug(f"User {user} requested aggregate metrics")
tool_metrics = await tool_service.aggregate_metrics(db)
resource_metrics = await resource_service.aggregate_metrics(db)
server_metrics = await server_service.aggregate_metrics(db)
prompt_metrics = await prompt_service.aggregate_metrics(db)

return {
"tools": tool_metrics,
"resources": resource_metrics,
"servers": server_metrics,
"prompts": prompt_metrics,
metrics = {
"tools": await tool_service.aggregate_metrics(db),
"resources": await resource_service.aggregate_metrics(db),
"prompts": await prompt_service.aggregate_metrics(db),
"servers": await server_service.aggregate_metrics(db),
"topPerformers": {
"tools": await tool_service.get_top_tools(db, limit=5),
"resources": await resource_service.get_top_resources(db, limit=5),
"prompts": await prompt_service.get_top_prompts(db, limit=5),
"servers": await server_service.get_top_servers(db, limit=5),
},
}
return metrics


@admin_router.post("/metrics/reset", response_model=Dict[str, object])
Expand Down
80 changes: 73 additions & 7 deletions mcpgateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import asyncio
from contextlib import asynccontextmanager
import json
import time
from typing import Any, AsyncIterator, Dict, List, Optional, Union
from urllib.parse import urlparse, urlunparse

Expand All @@ -52,7 +53,7 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import ValidationError
from sqlalchemy import text
from sqlalchemy import select, text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from starlette.middleware.base import BaseHTTPMiddleware
Expand All @@ -64,7 +65,8 @@
from mcpgateway.bootstrap_db import main as bootstrap_db
from mcpgateway.cache import ResourceCache, SessionRegistry
from mcpgateway.config import jsonpath_modifier, settings
from mcpgateway.db import refresh_slugs_on_startup, SessionLocal
from mcpgateway.db import Prompt as DbPrompt
from mcpgateway.db import PromptMetric, refresh_slugs_on_startup, SessionLocal
from mcpgateway.handlers.sampling import SamplingHandler
from mcpgateway.models import (
InitializeRequest,
Expand Down Expand Up @@ -1780,17 +1782,49 @@ async def get_prompt(

Returns:
Rendered prompt or metadata.

Raises:
Exception: Re-raised if not a handled exception type.
"""
logger.debug(f"User: {user} requested prompt: {name} with args={args}")
start_time = time.monotonic()
success = False
error_message = None
result = None

try:
PromptExecuteArgs(args=args)
return await prompt_service.get_prompt(db, name, args)
result = await prompt_service.get_prompt(db, name, args)
success = True
logger.debug(f"Prompt execution successful for '{name}'")
except Exception as ex:
error_message = str(ex)
logger.error(f"Could not retrieve prompt {name}: {ex}")
if isinstance(ex, (ValueError, PromptError)):
return JSONResponse(content={"message": "Prompt execution arguments contains HTML tags that may cause security issues"}, status_code=422)
if isinstance(ex, PluginViolationError):
return JSONResponse(content={"message": "Prompt execution arguments contains HTML tags that may cause security issues", "details": ex.message}, status_code=422)
result = JSONResponse(content={"message": "Prompt execution arguments contains HTML tags that may cause security issues"}, status_code=422)
elif isinstance(ex, PluginViolationError):
result = JSONResponse(content={"message": "Prompt execution arguments contains HTML tags that may cause security issues", "details": ex.message}, status_code=422)
else:
raise

# Record metrics (moved outside try/except/finally to ensure it runs)
end_time = time.monotonic()
response_time = end_time - start_time

# Get the prompt from database to get its ID
prompt = db.execute(select(DbPrompt).where(DbPrompt.name == name)).scalar_one_or_none()

if prompt:
metric = PromptMetric(
prompt_id=prompt.id,
response_time=response_time,
is_success=success,
error_message=error_message,
)
db.add(metric)
db.commit()

return result


@prompt_router.get("/{name}")
Expand All @@ -1810,9 +1844,41 @@ async def get_prompt_no_args(

Returns:
The prompt template information

Raises:
Exception: Re-raised from prompt service.
"""
logger.debug(f"User: {user} requested prompt: {name} with no arguments")
return await prompt_service.get_prompt(db, name, {})
start_time = time.monotonic()
success = False
error_message = None
result = None

try:
result = await prompt_service.get_prompt(db, name, {})
success = True
except Exception as ex:
error_message = str(ex)
raise

# Record metrics
end_time = time.monotonic()
response_time = end_time - start_time

# Get the prompt from database to get its ID
prompt = db.execute(select(DbPrompt).where(DbPrompt.name == name)).scalar_one_or_none()

if prompt:
metric = PromptMetric(
prompt_id=prompt.id,
response_time=response_time,
is_success=success,
error_message=error_message,
)
db.add(metric)
db.commit()

return result


@prompt_router.put("/{name}", response_model=PromptRead)
Expand Down
25 changes: 24 additions & 1 deletion mcpgateway/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def prevent_manual_mcp_creation(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
integration_type = values.get("integration_type")
if integration_type == "MCP":
raise ValueError("Cannot manually create MCP tools. Add MCP servers via the Gateways interface - " "tools will be auto-discovered and registered with integration_type='MCP'.")
raise ValueError("Cannot manually create MCP tools. Add MCP servers via the Gateways interface - tools will be auto-discovered and registered with integration_type='MCP'.")
return values


Expand Down Expand Up @@ -2936,3 +2936,26 @@ class TagInfo(BaseModelWithConfigDict):
name: str = Field(..., description="The tag name")
stats: TagStats = Field(..., description="Statistics for this tag")
entities: Optional[List[TaggedEntity]] = Field(default_factory=list, description="Entities that have this tag")


class TopPerformer(BaseModelWithConfigDict):
"""Schema for representing top-performing entities with performance metrics.

Used to encapsulate metrics for entities such as prompts, resources, servers, or tools,
including execution count, average response time, success rate, and last execution timestamp.

Attributes:
id (Union[str, int]): Unique identifier for the entity.
name (str): Name of the entity (e.g., prompt name, resource URI, server name, or tool name).
execution_count (int): Total number of executions for the entity.
avg_response_time (Optional[float]): Average response time in seconds, or None if no metrics.
success_rate (Optional[float]): Success rate percentage, or None if no metrics.
last_execution (Optional[datetime]): Timestamp of the last execution, or None if no metrics.
"""

id: Union[str, int] = Field(..., description="Entity ID")
name: str = Field(..., description="Entity name")
execution_count: int = Field(..., description="Number of executions")
avg_response_time: Optional[float] = Field(None, description="Average response time in seconds")
success_rate: Optional[float] = Field(None, description="Success rate percentage")
last_execution: Optional[datetime] = Field(None, description="Timestamp of last execution")
2 changes: 1 addition & 1 deletion mcpgateway/services/logging_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def initialize(self) -> None:
try:
root_logger.addHandler(_get_file_handler())
if settings.log_rotation_enabled:
logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} " f"(max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})")
logging.info(f"File logging enabled with rotation: {settings.log_folder or '.'}/{settings.log_file} (max: {settings.log_max_size_mb}MB, backups: {settings.log_backup_count})")
else:
logging.info(f"File logging enabled (no rotation): {settings.log_folder or '.'}/{settings.log_file}")
except Exception as e:
Expand Down
Loading
Loading