-
Notifications
You must be signed in to change notification settings - Fork 168
Description
π‘οΈ Epic - Circuit Breakers for Unstable MCP Server Backends
Title: Circuit Breakers for Unstable MCP Server Backends
Goal: Implement comprehensive circuit breaker pattern for MCP server backends to prevent cascading failures, reduce resource waste, and provide fast failure detection with intelligent recovery mechanisms
Why now: Current health check system lacks proper circuit breaker implementation, leading to wasted resources on failing requests, slower failure detection, and potential cascading failures across the system. Proper circuit breakers will improve system resilience and user experience.
π Current Implementation Assessment
β What We Already Have:
Basic Failure Tracking:
# In gateway_service.py
self._gateway_failure_counts: dict[str, int] = {}
count = self._gateway_failure_counts.get(gateway.id, 0) + 1
self._gateway_failure_counts[gateway.id] = count
Failure Threshold Detection:
GW_FAILURE_THRESHOLD = settings.unhealthy_threshold # Default: 3
if count >= GW_FAILURE_THRESHOLD:
logger.error(f"Gateway {gateway.name} failed {GW_FAILURE_THRESHOLD} times. Deactivating...")
await self.toggle_gateway_status(db, gateway.id, activate=True, reachable=False, only_update_reachable=True)
Basic State Management:
# Two-state system: enabled/disabled + reachable/unreachable
gateway.enabled = activate
gateway.reachable = reachable
Recovery Logic:
# Reactivate gateway if it was previously inactive and health check passed now
if gateway.enabled and not gateway.reachable:
logger.info(f"Reactivating gateway: {gateway.name}, as it is healthy now")
await self.toggle_gateway_status(db, gateway.id, activate=True, reachable=True, only_update_reachable=True)
β Critical Missing Elements:
- No Three-State Circuit Breaker Model (Closed β Open β Half-Open)
- No Fast Failure Protection - requests still attempted to unreachable gateways
- No Timeout-Based Recovery - no automatic transition periods
- No Half-Open Trial Requests - no limited testing during recovery
- No Circuit Breaker Metrics - missing detailed observability
- No Manual Circuit Control - no admin interface for operations
- No Request-Level Protection - circuit logic only in health checks
πββοΈ User Stories & Acceptance Criteria
Story 1 β Three-State Circuit Breaker Implementation
As a platform engineer
I want proper circuit breaker states (Closed/Open/Half-Open) for MCP servers
So that the system follows industry-standard resilience patterns and provides predictable behavior.
β Acceptance Criteria
Scenario: Circuit breaker state transitions
Given an MCP server starts in "CLOSED" state
And failure threshold is configured to 3 failures
When the server fails 3 consecutive health checks
Then the circuit breaker transitions to "OPEN" state
And all new requests to that server are immediately rejected with CircuitOpenError
And the circuit_breaker_state metric shows "open"
When 60 seconds pass since opening
Then the circuit breaker transitions to "HALF_OPEN" state
And the next request is allowed as a trial
If the trial request succeeds
Then the circuit breaker transitions back to "CLOSED" state
And normal operation resumes
If the trial request fails
Then the circuit breaker returns to "OPEN" state
And the 60-second timeout resets
Story 2 β Fast Failure Protection
As a system administrator
I want requests to fail immediately when circuit breakers are open
So that resources aren't wasted on requests destined to fail and users get faster error responses.
β Acceptance Criteria
Scenario: Fast failure when circuit is open
Given an MCP server circuit breaker is in "OPEN" state
When a client makes a tool request to that server
Then the request fails immediately with CircuitOpenError
And no network request is made to the backend server
And the response time is < 10ms
And the fast_failures_total metric increments
And the error message indicates "Circuit breaker is open for server X"
Story 3 β Half-Open Trial Requests
As a reliability engineer
I want limited trial requests during recovery testing
So that failed servers can be gradually brought back online without overwhelming them.
β Acceptance Criteria
Scenario: Half-open state trial requests
Given an MCP server circuit breaker is in "HALF_OPEN" state
And max_trial_requests is configured to 3
When the first request arrives for that server
Then the request is allowed and forwarded to the server
And the trial_requests_count increments to 1
When 2 more requests arrive
Then they are also allowed (total: 3 trial requests)
When a 4th request arrives before trial period ends
Then it is rejected with CircuitHalfOpenLimitError
If all 3 trial requests succeed
Then the circuit transitions to "CLOSED" state
And trial_requests_count resets to 0
If any trial request fails
Then the circuit returns to "OPEN" state immediately
And remaining trial requests are cancelled
Story 4 β Configurable Circuit Breaker Parameters
As a DevOps engineer
I want configurable circuit breaker parameters per server or server group
So that different services can have appropriate resilience settings based on their characteristics.
β Acceptance Criteria
Scenario: Per-server circuit breaker configuration
Given I configure server "mcp-db-server" with failure_threshold=5, reset_timeout=120s
And I configure server "mcp-api-server" with failure_threshold=2, reset_timeout=30s
When both servers experience failures
Then "mcp-db-server" opens after 5 failures and stays open for 120 seconds
And "mcp-api-server" opens after 2 failures and stays open for 30 seconds
And the circuit_breaker_config endpoint shows these individual settings
Story 5 β Circuit Breaker Monitoring & Observability
As a monitoring engineer
I want comprehensive metrics and alerting for circuit breaker states
So that I can track system health and receive alerts when services become unavailable.
β Acceptance Criteria
Scenario: Circuit breaker metrics and monitoring
Given circuit breakers are enabled for all MCP servers
When circuit state changes occur
Then the following metrics are exported:
- circuit_breaker_state{server_id, state} gauge (0=closed, 1=open, 2=half_open)
- circuit_breaker_transitions_total{server_id, from_state, to_state} counter
- circuit_breaker_failures_total{server_id} counter
- circuit_breaker_fast_failures_total{server_id} counter
- circuit_breaker_trial_requests_total{server_id, result} counter
- circuit_breaker_open_duration_seconds{server_id} histogram
And Prometheus can scrape these metrics from /metrics endpoint
And alerts fire when circuit_breaker_state shows servers in open state
Story 6 β Admin Interface for Circuit Breaker Management
As a system operator
I want an admin interface to view and control circuit breaker states
So that I can manually intervene during incidents and troubleshoot issues.
β Acceptance Criteria
Scenario: Admin interface for circuit breaker control
Given I access the MCP Gateway admin interface at /admin/circuit-breakers
Then I see a dashboard showing:
- Current state of all circuit breakers (visual indicators)
- Recent state transition history
- Current configuration for each circuit breaker
- Real-time metrics (failure counts, response times)
When I click "Force Open" on a circuit breaker
Then the circuit transitions to OPEN state immediately
And a manual_override flag is set
And the action is logged with my user ID
When I click "Reset" on an open circuit breaker
Then the circuit transitions to CLOSED state immediately
And failure counts are reset to zero
And the manual_override flag is cleared
πΌοΈ Architecture
1 β Circuit Breaker Component Architecture
flowchart TD
subgraph "MCP Gateway Instance"
CB[Circuit Breaker Manager]
CBM[Circuit Breaker Metrics]
HM[Health Monitor]
REQ[Request Router]
ADMIN[Admin Interface]
end
subgraph "Circuit Breaker States"
CLOSED[CLOSED State<br/>Normal Operation]
OPEN[OPEN State<br/>Fast Failure]
HALF_OPEN[HALF_OPEN State<br/>Trial Requests]
end
subgraph "MCP Servers"
S1[MCP Server 1<br/>State: CLOSED]
S2[MCP Server 2<br/>State: OPEN]
S3[MCP Server 3<br/>State: HALF_OPEN]
end
CLIENT[MCP Client] --> REQ
REQ --> CB
CB --> |Check State| CLOSED
CB --> |Fast Fail| OPEN
CB --> |Trial Request| HALF_OPEN
CB --> |Forward Request| S1
CB -.-> |Reject Request| S2
CB --> |Limited Requests| S3
HM --> |State Updates| CB
CB --> |Metrics| CBM
ADMIN --> |Manual Control| CB
CBM -. "metrics β Prometheus" .-> MON[Monitoring]
classDef server fill:#e1f5fe,stroke:#01579b;
classDef breaker fill:#fff3e0,stroke:#e65100;
classDef state fill:#e8f5e8,stroke:#2e7d32;
class S1,S2,S3 server
class CB,CBM breaker
class CLOSED,OPEN,HALF_OPEN state
2 β Circuit Breaker State Machine
stateDiagram-v2
[*] --> CLOSED : Initialize
CLOSED --> OPEN : failure_count >= threshold
CLOSED --> CLOSED : success (reset failure_count)
OPEN --> HALF_OPEN : timeout_elapsed
OPEN --> OPEN : reject_requests (fast_fail)
HALF_OPEN --> CLOSED : trial_success
HALF_OPEN --> OPEN : trial_failure
HALF_OPEN --> HALF_OPEN : trial_in_progress
note right of CLOSED
Normal operation
Track failures
Forward all requests
end note
note right of OPEN
Fast failure mode
Reject all requests
Wait for timeout
end note
note right of HALF_OPEN
Recovery testing
Limited trial requests
Quick state transition
end note
3 β Request Flow with Circuit Breaker
sequenceDiagram
participant CLIENT as MCP Client
participant ROUTER as Request Router
participant CB as Circuit Breaker
participant SERVER as MCP Server
participant METRICS as Metrics
CLIENT ->> ROUTER: tool_call("extract_data")
ROUTER ->> CB: check_circuit_state(server_id)
alt Circuit is CLOSED
CB -->> ROUTER: ALLOW_REQUEST
ROUTER ->> SERVER: forward_request()
alt Request succeeds
SERVER -->> ROUTER: success_response
ROUTER ->> CB: record_success()
CB ->> METRICS: reset_failure_count
ROUTER -->> CLIENT: success_response
else Request fails
SERVER -->> ROUTER: error_response
ROUTER ->> CB: record_failure()
CB ->> METRICS: increment_failure_count
alt failure_count >= threshold
CB ->> CB: transition_to_OPEN()
CB ->> METRICS: circuit_opened
end
ROUTER -->> CLIENT: error_response
end
else Circuit is OPEN
CB -->> ROUTER: REJECT_REQUEST (CircuitOpenError)
CB ->> METRICS: fast_failure_count++
ROUTER -->> CLIENT: CircuitOpenError
else Circuit is HALF_OPEN
alt trial_requests < max_trials
CB -->> ROUTER: ALLOW_TRIAL_REQUEST
CB ->> CB: increment_trial_count()
ROUTER ->> SERVER: forward_request()
alt Trial succeeds
SERVER -->> ROUTER: success_response
ROUTER ->> CB: record_trial_success()
CB ->> CB: transition_to_CLOSED()
CB ->> METRICS: circuit_closed
ROUTER -->> CLIENT: success_response
else Trial fails
SERVER -->> ROUTER: error_response
ROUTER ->> CB: record_trial_failure()
CB ->> CB: transition_to_OPEN()
CB ->> METRICS: circuit_opened
ROUTER -->> CLIENT: error_response
end
else trial_requests >= max_trials
CB -->> ROUTER: REJECT_REQUEST (CircuitHalfOpenLimitError)
CB ->> METRICS: trial_limit_exceeded++
ROUTER -->> CLIENT: CircuitHalfOpenLimitError
end
end
π Enhanced Design Implementation
Core Circuit Breaker Classes:
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Callable, Any
import asyncio
import logging
import time
logger = logging.getLogger(__name__)
class CircuitState(Enum):
"""Circuit breaker states following the classic pattern."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject all requests
HALF_OPEN = "half_open" # Testing recovery with limited requests
@dataclass
class CircuitBreakerConfig:
"""Configuration for a circuit breaker instance."""
failure_threshold: int = 3
reset_timeout: float = 60.0 # seconds
half_open_max_calls: int = 3
half_open_timeout: float = 30.0 # seconds
success_threshold: int = 1 # successes needed to close from half-open
# Advanced configuration
failure_rate_threshold: float = 0.5 # 50% failure rate triggers opening
minimum_requests: int = 10 # minimum requests before failure rate calculation
sliding_window_size: int = 100 # requests to track for failure rate
@dataclass
class CircuitBreakerState:
"""Current state of a circuit breaker."""
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
consecutive_successes: int = 0
last_failure_time: Optional[datetime] = None
state_changed_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
trial_requests_count: int = 0
manual_override: bool = False
# Sliding window for failure rate calculation
recent_requests: List[bool] = field(default_factory=list) # True=success, False=failure
class CircuitBreakerError(Exception):
"""Base exception for circuit breaker errors."""
pass
class CircuitOpenError(CircuitBreakerError):
"""Raised when circuit is open and requests are rejected."""
def __init__(self, server_id: str, next_attempt_time: datetime):
self.server_id = server_id
self.next_attempt_time = next_attempt_time
super().__init__(f"Circuit breaker is OPEN for server {server_id}. Next attempt at {next_attempt_time}")
class CircuitHalfOpenLimitError(CircuitBreakerError):
"""Raised when half-open circuit has reached trial request limit."""
def __init__(self, server_id: str, current_trials: int, max_trials: int):
self.server_id = server_id
self.current_trials = current_trials
self.max_trials = max_trials
super().__init__(f"Circuit breaker HALF_OPEN limit reached for server {server_id} ({current_trials}/{max_trials})")
class MCPCircuitBreaker:
"""Circuit breaker implementation for MCP servers."""
def __init__(self, server_id: str, config: CircuitBreakerConfig):
self.server_id = server_id
self.config = config
self.state = CircuitBreakerState()
self._lock = asyncio.Lock()
# Metrics callbacks
self._metrics_callbacks: List[Callable] = []
async def can_execute(self) -> bool:
"""Check if request can be executed based on current circuit state."""
async with self._lock:
current_time = datetime.now(timezone.utc)
if self.state.state == CircuitState.CLOSED:
return True
elif self.state.state == CircuitState.OPEN:
# Check if timeout has elapsed to transition to half-open
time_since_open = current_time - self.state.state_changed_time
if time_since_open.total_seconds() >= self.config.reset_timeout:
await self._transition_to_half_open()
return True
return False
elif self.state.state == CircuitState.HALF_OPEN:
# Allow limited trial requests
if self.state.trial_requests_count < self.config.half_open_max_calls:
self.state.trial_requests_count += 1
return True
return False
return False
async def record_success(self) -> None:
"""Record successful operation and update circuit state."""
async with self._lock:
current_time = datetime.now(timezone.utc)
# Add to sliding window
self.state.recent_requests.append(True)
if len(self.state.recent_requests) > self.config.sliding_window_size:
self.state.recent_requests.pop(0)
if self.state.state == CircuitState.HALF_OPEN:
self.state.consecutive_successes += 1
if self.state.consecutive_successes >= self.config.success_threshold:
await self._transition_to_closed()
elif self.state.state == CircuitState.CLOSED:
# Reset failure count on success
self.state.failure_count = 0
self.state.consecutive_successes += 1
await self._emit_metric("success_recorded")
logger.debug(f"Circuit breaker {self.server_id}: Success recorded, state={self.state.state.value}")
async def record_failure(self, error: str = "") -> None:
"""Record failed operation and update circuit state."""
async with self._lock:
current_time = datetime.now(timezone.utc)
# Add to sliding window
self.state.recent_requests.append(False)
if len(self.state.recent_requests) > self.config.sliding_window_size:
self.state.recent_requests.pop(0)
self.state.failure_count += 1
self.state.consecutive_successes = 0
self.state.last_failure_time = current_time
if self.state.state == CircuitState.CLOSED:
# Check if we should open the circuit
if await self._should_open_circuit():
await self._transition_to_open()
elif self.state.state == CircuitState.HALF_OPEN:
# Any failure in half-open immediately returns to open
await self._transition_to_open()
await self._emit_metric("failure_recorded", {"error": error})
logger.warning(f"Circuit breaker {self.server_id}: Failure recorded ({self.state.failure_count}), state={self.state.state.value}")
async def _should_open_circuit(self) -> bool:
"""Determine if circuit should be opened based on failure criteria."""
# Simple threshold-based
if self.state.failure_count >= self.config.failure_threshold:
return True
# Failure rate-based (if we have enough samples)
if len(self.state.recent_requests) >= self.config.minimum_requests:
failure_rate = 1 - (sum(self.state.recent_requests) / len(self.state.recent_requests))
if failure_rate >= self.config.failure_rate_threshold:
return True
return False
async def _transition_to_open(self) -> None:
"""Transition circuit to OPEN state."""
old_state = self.state.state
self.state.state = CircuitState.OPEN
self.state.state_changed_time = datetime.now(timezone.utc)
self.state.trial_requests_count = 0
next_attempt = self.state.state_changed_time + timedelta(seconds=self.config.reset_timeout)
await self._emit_metric("state_transition", {
"from_state": old_state.value,
"to_state": "open",
"next_attempt_time": next_attempt.isoformat()
})
logger.error(f"Circuit breaker {self.server_id}: OPENED - rejecting requests until {next_attempt}")
async def _transition_to_half_open(self) -> None:
"""Transition circuit to HALF_OPEN state."""
old_state = self.state.state
self.state.state = CircuitState.HALF_OPEN
self.state.state_changed_time = datetime.now(timezone.utc)
self.state.trial_requests_count = 0
self.state.consecutive_successes = 0
await self._emit_metric("state_transition", {
"from_state": old_state.value,
"to_state": "half_open"
})
logger.info(f"Circuit breaker {self.server_id}: HALF_OPEN - testing recovery with max {self.config.half_open_max_calls} trials")
async def _transition_to_closed(self) -> None:
"""Transition circuit to CLOSED state."""
old_state = self.state.state
self.state.state = CircuitState.CLOSED
self.state.state_changed_time = datetime.now(timezone.utc)
self.state.failure_count = 0
self.state.trial_requests_count = 0
self.state.consecutive_successes = 0
self.state.manual_override = False
await self._emit_metric("state_transition", {
"from_state": old_state.value,
"to_state": "closed"
})
logger.info(f"Circuit breaker {self.server_id}: CLOSED - normal operation resumed")
async def force_open(self, reason: str = "manual_override") -> None:
"""Manually force circuit to OPEN state."""
async with self._lock:
self.state.manual_override = True
await self._transition_to_open()
await self._emit_metric("manual_override", {"action": "force_open", "reason": reason})
async def reset(self, reason: str = "manual_reset") -> None:
"""Manually reset circuit to CLOSED state."""
async with self._lock:
await self._transition_to_closed()
await self._emit_metric("manual_override", {"action": "reset", "reason": reason})
def get_state_info(self) -> Dict[str, Any]:
"""Get comprehensive state information for monitoring."""
current_time = datetime.now(timezone.utc)
time_in_state = current_time - self.state.state_changed_time
failure_rate = None
if len(self.state.recent_requests) > 0:
failure_rate = 1 - (sum(self.state.recent_requests) / len(self.state.recent_requests))
next_attempt_time = None
if self.state.state == CircuitState.OPEN:
next_attempt_time = self.state.state_changed_time + timedelta(seconds=self.config.reset_timeout)
return {
"server_id": self.server_id,
"state": self.state.state.value,
"failure_count": self.state.failure_count,
"consecutive_successes": self.state.consecutive_successes,
"trial_requests_count": self.state.trial_requests_count,
"time_in_current_state_seconds": time_in_state.total_seconds(),
"last_failure_time": self.state.last_failure_time.isoformat() if self.state.last_failure_time else None,
"next_attempt_time": next_attempt_time.isoformat() if next_attempt_time else None,
"failure_rate": failure_rate,
"manual_override": self.state.manual_override,
"config": {
"failure_threshold": self.config.failure_threshold,
"reset_timeout": self.config.reset_timeout,
"half_open_max_calls": self.config.half_open_max_calls,
"success_threshold": self.config.success_threshold,
"failure_rate_threshold": self.config.failure_rate_threshold
}
}
def add_metrics_callback(self, callback: Callable) -> None:
"""Add callback for metrics emission."""
self._metrics_callbacks.append(callback)
async def _emit_metric(self, event_type: str, data: Dict[str, Any] = None) -> None:
"""Emit metric event to registered callbacks."""
metric_data = {
"server_id": self.server_id,
"event_type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"state": self.state.state.value,
**(data or {})
}
for callback in self._metrics_callbacks:
try:
await callback(metric_data)
except Exception as e:
logger.error(f"Error in metrics callback: {e}")
class CircuitBreakerManager:
"""Manages circuit breakers for all MCP servers."""
def __init__(self, default_config: CircuitBreakerConfig = None):
self.default_config = default_config or CircuitBreakerConfig()
self._circuit_breakers: Dict[str, MCPCircuitBreaker] = {}
self._server_configs: Dict[str, CircuitBreakerConfig] = {}
self._lock = asyncio.Lock()
# Metrics tracking
self.metrics = CircuitBreakerMetrics()
def configure_server(self, server_id: str, config: CircuitBreakerConfig) -> None:
"""Configure circuit breaker for specific server."""
self._server_configs[server_id] = config
# Update existing circuit breaker if it exists
if server_id in self._circuit_breakers:
self._circuit_breakers[server_id].config = config
async def get_circuit_breaker(self, server_id: str) -> MCPCircuitBreaker:
"""Get or create circuit breaker for server."""
if server_id not in self._circuit_breakers:
async with self._lock:
if server_id not in self._circuit_breakers:
config = self._server_configs.get(server_id, self.default_config)
circuit_breaker = MCPCircuitBreaker(server_id, config)
# Add metrics callback
circuit_breaker.add_metrics_callback(self.metrics.record_event)
self._circuit_breakers[server_id] = circuit_breaker
return self._circuit_breakers[server_id]
async def can_execute_request(self, server_id: str) -> bool:
"""Check if request can be executed for server."""
circuit_breaker = await self.get_circuit_breaker(server_id)
can_execute = await circuit_breaker.can_execute()
if not can_execute:
await self.metrics.record_event({
"server_id": server_id,
"event_type": "request_rejected",
"state": circuit_breaker.state.state.value,
"timestamp": datetime.now(timezone.utc).isoformat()
})
return can_execute
async def record_request_result(self, server_id: str, success: bool, error: str = "") -> None:
"""Record the result of a request."""
circuit_breaker = await self.get_circuit_breaker(server_id)
if success:
await circuit_breaker.record_success()
else:
await circuit_breaker.record_failure(error)
async def get_all_states(self) -> Dict[str, Dict[str, Any]]:
"""Get state information for all circuit breakers."""
states = {}
for server_id, circuit_breaker in self._circuit_breakers.items():
states[server_id] = circuit_breaker.get_state_info()
return states
async def force_open_circuit(self, server_id: str, reason: str = "manual") -> None:
"""Manually force a circuit breaker to OPEN state."""
circuit_breaker = await self.get_circuit_breaker(server_id)
await circuit_breaker.force_open(reason)
logger.info(f"Circuit breaker {server_id} manually forced OPEN: {reason}")
async def reset_circuit(self, server_id: str, reason: str = "manual") -> None:
"""Manually reset a circuit breaker to CLOSED state."""
circuit_breaker = await self.get_circuit_breaker(server_id)
await circuit_breaker.reset(reason)
logger.info(f"Circuit breaker {server_id} manually reset to CLOSED: {reason}")
async def get_metrics_summary(self) -> Dict[str, Any]:
"""Get comprehensive metrics summary."""
return await self.metrics.get_summary()
class CircuitBreakerMetrics:
"""Metrics collection and aggregation for circuit breakers."""
def __init__(self):
self.events: List[Dict[str, Any]] = []
self._lock = asyncio.Lock()
# Prometheus-style metrics (counters, gauges, histograms)
self.state_transitions = {} # server_id -> {from_state -> to_state -> count}
self.failure_counts = {} # server_id -> count
self.fast_failures = {} # server_id -> count
self.trial_requests = {} # server_id -> {success -> count, failure -> count}
async def record_event(self, event_data: Dict[str, Any]) -> None:
"""Record a circuit breaker event."""
async with self._lock:
self.events.append(event_data)
# Update aggregated metrics
server_id = event_data["server_id"]
event_type = event_data["event_type"]
if event_type == "state_transition":
if server_id not in self.state_transitions:
self.state_transitions[server_id] = {}
from_state = event_data["from_state"]
to_state = event_data["to_state"]
key = f"{from_state}->{to_state}"
self.state_transitions[server_id][key] = self.state_transitions[server_id].get(key, 0) + 1
elif event_type == "failure_recorded":
self.failure_counts[server_id] = self.failure_counts.get(server_id, 0) + 1
elif event_type == "request_rejected":
self.fast_failures[server_id] = self.fast_failures.get(server_id, 0) + 1
async def get_summary(self) -> Dict[str, Any]:
"""Get metrics summary for monitoring."""
async with self._lock:
return {
"total_events": len(self.events),
"state_transitions": self.state_transitions,
"failure_counts": self.failure_counts,
"fast_failures": self.fast_failures,
"trial_requests": self.trial_requests,
"recent_events": self.events[-10:] if self.events else []
}
def get_prometheus_metrics(self) -> str:
"""Generate Prometheus-formatted metrics."""
metrics = []
# Circuit breaker state gauge
for server_id, cb in self._circuit_breakers.items():
state_value = {"closed": 0, "open": 1, "half_open": 2}[cb.state.state.value]
metrics.append(f'circuit_breaker_state{{server_id="{server_id}"}} {state_value}')
# State transition counters
for server_id, transitions in self.state_transitions.items():
for transition, count in transitions.items():
from_state, to_state = transition.split('->')
metrics.append(f'circuit_breaker_transitions_total{{server_id="{server_id}",from_state="{from_state}",to_state="{to_state}"}} {count}')
# Failure counters
for server_id, count in self.failure_counts.items():
metrics.append(f'circuit_breaker_failures_total{{server_id="{server_id}"}} {count}')
# Fast failure counters
for server_id, count in self.fast_failures.items():
metrics.append(f'circuit_breaker_fast_failures_total{{server_id="{server_id}"}} {count}')
return '\n'.join(metrics)
Integration with Existing Gateway Service:
# Enhanced gateway_service.py with full circuit breaker integration
class EnhancedGatewayService:
"""Enhanced gateway service with comprehensive circuit breaker pattern."""
def __init__(self) -> None:
# ... existing initialization ...
# Circuit breaker components
self.circuit_breaker_manager = CircuitBreakerManager()
self._circuit_breaker_config = self._load_circuit_breaker_config()
# Configure circuit breakers for existing servers
self._configure_circuit_breakers()
def _load_circuit_breaker_config(self) -> Dict[str, CircuitBreakerConfig]:
"""Load circuit breaker configuration from settings."""
return {
"default": CircuitBreakerConfig(
failure_threshold=getattr(settings, 'circuit_breaker_failure_threshold', 3),
reset_timeout=getattr(settings, 'circuit_breaker_reset_timeout', 60.0),
half_open_max_calls=getattr(settings, 'circuit_breaker_half_open_max_calls', 3),
success_threshold=getattr(settings, 'circuit_breaker_success_threshold', 1),
failure_rate_threshold=getattr(settings, 'circuit_breaker_failure_rate_threshold', 0.5),
minimum_requests=getattr(settings, 'circuit_breaker_minimum_requests', 10)
)
}
async def forward_request(self, gateway: DbGateway, method: str, params: dict = None):
"""Forward request with comprehensive circuit breaker protection."""
server_id = str(gateway.id)
# 1. Check circuit breaker state
can_execute = await self.circuit_breaker_manager.can_execute_request(server_id)
if not can_execute:
circuit_breaker = await self.circuit_breaker_manager.get_circuit_breaker(server_id)
state_info = circuit_breaker.get_state_info()
if state_info["state"] == "open":
next_attempt = state_info.get("next_attempt_time")
raise CircuitOpenError(server_id, next_attempt)
elif state_info["state"] == "half_open":
current_trials = state_info["trial_requests_count"]
max_trials = circuit_breaker.config.half_open_max_calls
raise CircuitHalfOpenLimitError(server_id, current_trials, max_trials)
# 2. Check traditional gateway state
if not gateway.enabled:
raise GatewayConnectionError(f"Cannot forward request to inactive gateway: {gateway.name}")
# 3. Execute request with timeout and error handling
success = False
error_msg = ""
start_time = time.time()
try:
# Build RPC request
request = {"jsonrpc": "2.0", "id": 1, "method": method}
if params:
request["params"] = params
# Execute with timeout
async with asyncio.timeout(settings.request_timeout):
response = await self._http_client.post(
f"{gateway.url}/rpc",
json=request,
headers=self._get_auth_headers()
)
response.raise_for_status()
result = response.json()
# Update last seen timestamp
gateway.last_seen = datetime.now(timezone.utc)
if "error" in result:
error_msg = f"Gateway error: {result['error'].get('message')}"
raise GatewayError(error_msg)
success = True
return result.get("result")
except asyncio.TimeoutError:
error_msg = f"Request timeout after {settings.request_timeout}s"
raise GatewayConnectionError(f"Request timeout to {gateway.name}: {error_msg}")
except Exception as e:
error_msg = str(e)
raise GatewayConnectionError(f"Failed to forward request to {gateway.name}: {error_msg}")
finally:
# 4. Always record result with circuit breaker
await self.circuit_breaker_manager.record_request_result(server_id, success, error_msg)
# 5. Record metrics
response_time = time.time() - start_time
await self._record_request_metrics(server_id, method, success, response_time, error_msg)
async def _check_gateway_health(self, gateway: DbGateway) -> bool:
"""Enhanced health check with circuit breaker integration."""
server_id = str(gateway.id)
# Get circuit breaker for this gateway
circuit_breaker = await self.circuit_breaker_manager.get_circuit_breaker(server_id)
# Check if we should perform health check based on circuit state
can_check = await circuit_breaker.can_execute()
if not can_check and circuit_breaker.state.state == CircuitState.OPEN:
logger.debug(f"Skipping health check for {gateway.name} - circuit breaker is OPEN")
return False
success = False
error_msg = ""
try:
# Perform health check with timeout
async with asyncio.timeout(settings.health_check_timeout):
await self._initialize_gateway(gateway.url)
success = True
return True
except Exception as e:
error_msg = str(e)
logger.warning(f"Health check failed for {gateway.name}: {e}")
return False
finally:
# Record health check result with circuit breaker
await self.circuit_breaker_manager.record_request_result(server_id, success, error_msg)
async def get_circuit_breaker_status(self) -> Dict[str, Dict]:
"""Get comprehensive circuit breaker status for monitoring."""
return await self.circuit_breaker_manager.get_all_states()
async def force_open_circuit_breaker(self, gateway_id: str, reason: str = "manual") -> None:
"""Manually force a circuit breaker to OPEN state."""
await self.circuit_breaker_manager.force_open_circuit(gateway_id, reason)
async def reset_circuit_breaker(self, gateway_id: str, reason: str = "manual") -> None:
"""Manually reset a circuit breaker to CLOSED state."""
await self.circuit_breaker_manager.reset_circuit(gateway_id, reason)
async def configure_circuit_breaker(self, gateway_id: str, config: Dict[str, Any]) -> None:
"""Configure circuit breaker parameters for a specific gateway."""
cb_config = CircuitBreakerConfig(**config)
self.circuit_breaker_manager.configure_server(gateway_id, cb_config)
logger.info(f"Circuit breaker configured for gateway {gateway_id}: {config}")
async def get_circuit_breaker_metrics(self) -> Dict[str, Any]:
"""Get circuit breaker metrics for monitoring."""
return await self.circuit_breaker_manager.get_metrics_summary()
def get_prometheus_metrics(self) -> str:
"""Get Prometheus-formatted metrics."""
return self.circuit_breaker_manager.metrics.get_prometheus_metrics()
π§ Enhanced Configuration
# mcpgateway/config.py - Enhanced configuration
class CircuitBreakerSettings:
"""Circuit breaker configuration settings."""
# === Global Circuit Breaker Settings ===
circuit_breaker_enabled: bool = True
circuit_breaker_failure_threshold: int = 3
circuit_breaker_reset_timeout: float = 60.0
circuit_breaker_half_open_max_calls: int = 3
circuit_breaker_success_threshold: int = 1
# === Advanced Circuit Breaker Settings ===
circuit_breaker_failure_rate_threshold: float = 0.5 # 50% failure rate
circuit_breaker_minimum_requests: int = 10
circuit_breaker_sliding_window_size: int = 100
circuit_breaker_half_open_timeout: float = 30.0
# === Request Handling ===
request_timeout: float = 30.0
max_concurrent_requests_per_server: int = 50
retry_attempts: int = 2
retry_delay: float = 1.0
# === Metrics and Monitoring ===
circuit_breaker_metrics_enabled: bool = True
circuit_breaker_detailed_metrics: bool = False
metrics_retention_period: int = 3600 # 1 hour
# === Per-Server Configuration ===
# Format: server_name.setting or server_id.setting
circuit_breaker_server_configs: Dict[str, Dict[str, Any]] = {
"critical_service": {
"failure_threshold": 2,
"reset_timeout": 30.0,
"failure_rate_threshold": 0.3
},
"batch_processor": {
"failure_threshold": 5,
"reset_timeout": 120.0,
"minimum_requests": 20
}
}
# .env configuration
"""
# Circuit Breaker Global Settings
CIRCUIT_BREAKER_ENABLED=true
CIRCUIT_BREAKER_FAILURE_THRESHOLD=3
CIRCUIT_BREAKER_RESET_TIMEOUT=60.0
CIRCUIT_BREAKER_HALF_OPEN_MAX_CALLS=3
CIRCUIT_BREAKER_SUCCESS_THRESHOLD=1
# Advanced Circuit Breaker Settings
CIRCUIT_BREAKER_FAILURE_RATE_THRESHOLD=0.5
CIRCUIT_BREAKER_MINIMUM_REQUESTS=10
CIRCUIT_BREAKER_SLIDING_WINDOW_SIZE=100
CIRCUIT_BREAKER_HALF_OPEN_TIMEOUT=30.0
# Request Handling
REQUEST_TIMEOUT=30.0
MAX_CONCURRENT_REQUESTS_PER_SERVER=50
RETRY_ATTEMPTS=2
RETRY_DELAY=1.0
# Metrics
CIRCUIT_BREAKER_METRICS_ENABLED=true
CIRCUIT_BREAKER_DETAILED_METRICS=false
METRICS_RETENTION_PERIOD=3600
"""
π§ Implementation Tasks
Phase 1: Core Circuit Breaker Implementation
-
Implement CircuitState enum and core data structures
# mcpgateway/circuit_breaker/core.py class CircuitState(Enum): # CLOSED, OPEN, HALF_OPEN class CircuitBreakerConfig(dataclass): # configuration class CircuitBreakerState(dataclass): # runtime state class MCPCircuitBreaker: # main circuit breaker logic
-
Create CircuitBreakerManager for multi-server management
# mcpgateway/circuit_breaker/manager.py class CircuitBreakerManager: async def get_circuit_breaker(server_id: str) -> MCPCircuitBreaker async def can_execute_request(server_id: str) -> bool async def record_request_result(server_id: str, success: bool, error: str)
-
Implement state transition logic with proper timing
async def _transition_to_open(self) -> None: async def _transition_to_half_open(self) -> None: async def _transition_to_closed(self) -> None: async def _should_open_circuit(self) -> bool:
-
Add circuit breaker exceptions
# mcpgateway/circuit_breaker/exceptions.py class CircuitOpenError(Exception): class CircuitHalfOpenLimitError(Exception):
Phase 2: Integration with Gateway Service
-
Update GatewayService.forward_request() with circuit breaker checks
async def forward_request(self, gateway: DbGateway, method: str, params: dict = None): # 1. Check circuit breaker state can_execute = await self.circuit_breaker_manager.can_execute_request(server_id) if not can_execute: # Raise appropriate circuit breaker exception # 2. Execute request with timeout # 3. Record result with circuit breaker
-
Enhance health check system with circuit breaker integration
async def _check_gateway_health(self, gateway: DbGateway) -> bool: # Skip health checks for OPEN circuits (unless testing recovery) # Record health check results with circuit breaker
-
Add circuit breaker configuration loading
def _load_circuit_breaker_config(self) -> Dict[str, CircuitBreakerConfig]: # Load global and per-server configurations
-
Implement manual circuit breaker controls
async def force_open_circuit_breaker(gateway_id: str, reason: str) async def reset_circuit_breaker(gateway_id: str, reason: str) async def configure_circuit_breaker(gateway_id: str, config: Dict[str, Any])
Phase 3: Metrics and Observability
-
Create comprehensive metrics collection
# mcpgateway/circuit_breaker/metrics.py class CircuitBreakerMetrics: async def record_event(event_data: Dict[str, Any]) async def get_summary() -> Dict[str, Any] def get_prometheus_metrics() -> str
-
Add Prometheus metrics endpoint
# Metrics to export: # - circuit_breaker_state{server_id, state} gauge # - circuit_breaker_transitions_total{server_id, from_state, to_state} counter # - circuit_breaker_failures_total{server_id} counter # - circuit_breaker_fast_failures_total{server_id} counter # - circuit_breaker_trial_requests_total{server_id, result} counter # - circuit_breaker_open_duration_seconds{server_id} histogram
-
Implement circuit breaker event logging
async def _emit_metric(self, event_type: str, data: Dict[str, Any]) # Log state transitions, failures, manual interventions
-
Add performance tracking and analysis
async def _record_request_metrics(server_id: str, method: str, success: bool, response_time: float, error: str)
Phase 4: Admin Interface Integration
-
Create circuit breaker admin pages
<!-- templates/admin/circuit_breakers.html --> <!-- Real-time dashboard showing all circuit breaker states --> <!-- Manual controls for force open/reset --> <!-- Configuration interface -->
-
Add REST API endpoints for circuit breaker management
# /admin/api/circuit-breakers/ # GET - list all circuit breakers with states # POST /{server_id}/force-open - manually open circuit # POST /{server_id}/reset - manually close circuit # PUT /{server_id}/config - update configuration
-
Implement real-time status updates
# WebSocket or SSE for real-time circuit breaker state updates # Integration with existing admin interface
-
Add circuit breaker visualization components
// Real-time state indicators (green/yellow/red) // State transition timeline // Metrics charts (failure rates, response times)
Phase 5: Advanced Features
-
Implement failure rate-based circuit opening
async def _should_open_circuit(self) -> bool: # Consider both absolute failure count and failure rate # Use sliding window for failure rate calculation
-
Add adaptive timeout and threshold adjustment
class AdaptiveCircuitBreaker(MCPCircuitBreaker): # Automatically adjust thresholds based on historical performance # Machine learning-based optimization
-
Implement circuit breaker bulkhead pattern
# Separate circuit breakers for different request types # Resource isolation and protection
-
Add integration with health check correlation
# Correlate circuit breaker state with health check results # Intelligent health check scheduling based on circuit state
Phase 6: Testing and Validation
-
Unit tests for circuit breaker state machine
# test_circuit_breaker.py # Test all state transitions # Test timeout behavior # Test configuration variations
-
Integration tests with gateway service
# test_gateway_circuit_breaker.py # Test request forwarding with circuit breaker # Test health check integration # Test metrics collection
-
Load testing and performance validation
# Verify circuit breaker performance impact < 5ms # Test with high request volumes # Validate memory usage and resource efficiency
-
Chaos engineering tests
# Simulate various failure scenarios # Test recovery behavior # Validate circuit breaker effectiveness
π― Success Criteria
Functional Requirements:
- Three-state circuit breaker (Closed/Open/Half-Open) implemented correctly
- Fast failure response time < 10ms for requests to open circuits
- Automatic recovery testing in half-open state with configurable limits
- Manual circuit control via admin interface with proper authorization
- Per-server configuration with global defaults and overrides
- Comprehensive metrics exported to Prometheus with proper labels
Performance Requirements:
- Circuit breaker overhead < 5ms per request evaluation
- State transition time < 100ms for all transitions
- Memory usage < 1MB per 1000 circuit breakers
- Metrics collection overhead < 1ms per recorded event
Reliability Requirements:
- Zero false positives in circuit opening (no healthy servers marked as failed)
- Recovery time < 60 seconds from failure detection to circuit opening
- Manual override reliability - 100% success rate for admin operations
- Configuration hot-reload without service restart
Observability Requirements:
- Real-time state visibility in admin dashboard
- Historical metrics retention for analysis and alerting
- Structured logging for all circuit breaker events
- Integration with monitoring systems (Prometheus, Grafana)
π Migration Strategy
Phase 1: Parallel Implementation (No Breaking Changes)
# Add circuit breaker alongside existing health check system
# Both systems run in parallel, circuit breaker in "observe-only" mode
# Collect metrics and validate behavior
Phase 2: Gradual Enablement (Feature Flags)
# Enable circuit breaker for non-critical servers first
# Use feature flags to control rollout
# Monitor system behavior and performance
Phase 3: Full Migration (Deprecate Old System)
# Replace existing failure tracking with circuit breaker
# Remove redundant health check logic
# Optimize performance and reduce complexity
Rollback Plan:
# Feature flags allow immediate rollback to old system
# Database schema changes are additive only
# Configuration compatibility maintained
π§© Additional Considerations
- π Performance: Circuit breaker adds minimal latency (<5ms) while preventing expensive failed requests
- π Reliability: Prevents cascading failures and resource exhaustion during outages
- β‘ Resilience: Fast failure detection and automatic recovery improve system stability
- π Observability: Comprehensive metrics enable better monitoring and alerting
- π§ Maintainability: Standard circuit breaker pattern improves system understanding
- π Scalability: Reduces load on failing servers, improving overall system capacity
- π― User Experience: Faster error responses and fewer timeouts improve perceived performance
- π‘οΈ Operations: Manual controls enable rapid incident response and system management