Skip to content

Commit f7a16bb

Browse files
committed
fixed gateway healthcheck
1 parent e5e093a commit f7a16bb

File tree

7 files changed

+149
-30
lines changed

7 files changed

+149
-30
lines changed

.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ HEALTH_CHECK_TIMEOUT=10
214214
# Number of failed checks before marking peer unhealthy
215215
UNHEALTHY_THRESHOLD=3
216216

217+
#####################################
218+
# Lock file Settings
219+
#####################################
220+
FILELOCK_PATH=/tmp/gateway_healthcheck_init.lock
221+
222+
217223
#####################################
218224
# Development Settings
219225
#####################################

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,7 @@ downloads/
197197
# db_path
198198
db_path/
199199

200+
# filelock path
201+
tmp/
202+
200203
.continue

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,12 @@ You can get started by copying the provided `.env.examples` to `.env` and making
546546
547547
### Health Checks
548548
549-
| Setting | Description | Default | Options |
550-
| ----------------------- | ----------------------------- | ------- | ------- |
551-
| `HEALTH_CHECK_INTERVAL` | Health poll interval (secs) | `60` | int > 0 |
552-
| `HEALTH_CHECK_TIMEOUT` | Health request timeout (secs) | `10` | int > 0 |
553-
| `UNHEALTHY_THRESHOLD` | Fail-count before unhealthy | `3` | int > 0 |
549+
| Setting | Description | Default | Options |
550+
| ----------------------- | ----------------------------------------- | ------- | ------- |
551+
| `HEALTH_CHECK_INTERVAL` | Health poll interval (secs) | `60` | int > 0 |
552+
| `HEALTH_CHECK_TIMEOUT` | Health request timeout (secs) | `10` | int > 0 |
553+
| `UNHEALTHY_THRESHOLD` | Fail-count before peer deactivation, | `3` | int > 0 |
554+
| | Set to -1 if deactivation is not needed. | | |
554555
555556
### Database
556557

mcpgateway/config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ def _parse_federation_peers(cls, v):
171171
# Health Checks
172172
health_check_interval: int = 60 # seconds
173173
health_check_timeout: int = 10 # seconds
174-
unhealthy_threshold: int = 3
174+
unhealthy_threshold: int = 10
175+
176+
filelock_path: str = "tmp/gateway_service_leader.lock"
175177

176178
# Default Roots
177179
default_roots: List[str] = []

mcpgateway/main.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,7 @@ async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
185185
await tool_service.initialize()
186186
await resource_service.initialize()
187187
await prompt_service.initialize()
188-
try:
189-
# Try to create the file exclusively
190-
fd = os.open(settings.lock_file_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
191-
except FileExistsError:
192-
logger.info("Gateway already initialized by another worker")
193-
else:
194-
with os.fdopen(fd, "w") as lock_file:
195-
lock_file.write("initialized")
196-
await gateway_service.initialize()
188+
await gateway_service.initialize()
197189
await root_service.initialize()
198190
await completion_service.initialize()
199191
await logging_service.initialize()

mcpgateway/services/gateway_service.py

Lines changed: 129 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
import asyncio
1818
import logging
19+
import uuid
1920
from datetime import datetime, timezone
2021
from typing import Any, AsyncGenerator, Dict, List, Optional, Set
2122

2223
import httpx
24+
from filelock import FileLock, Timeout
2325
from mcp import ClientSession
2426
from mcp.client.sse import sse_client
2527
from sqlalchemy import select
@@ -33,9 +35,21 @@
3335
from mcpgateway.services.tool_service import ToolService
3436
from mcpgateway.utils.services_auth import decode_auth
3537

38+
try:
39+
import redis
40+
REDIS_AVAILABLE = True
41+
except ImportError:
42+
REDIS_AVAILABLE = False
43+
logging.info("Redis is not utilized in this environment.")
44+
45+
# logging.getLogger("httpx").setLevel(logging.WARNING) # Disables httpx logs for regular health checks
3646
logger = logging.getLogger(__name__)
3747

3848

49+
GW_FAILURE_THRESHOLD = settings.unhealthy_threshold
50+
GW_HEALTH_CHECK_INTERVAL = settings.health_check_interval
51+
52+
3953
class GatewayError(Exception):
4054
"""Base class for gateway-related errors."""
4155

@@ -83,17 +97,45 @@ def __init__(self):
8397
"""Initialize the gateway service."""
8498
self._event_subscribers: List[asyncio.Queue] = []
8599
self._http_client = httpx.AsyncClient(timeout=settings.federation_timeout, verify=not settings.skip_ssl_verify)
86-
self._health_check_interval = 60 # seconds
100+
self._health_check_interval = GW_HEALTH_CHECK_INTERVAL
87101
self._health_check_task: Optional[asyncio.Task] = None
88102
self._active_gateways: Set[str] = set() # Track active gateway URLs
89103
self._stream_response = None
90104
self._pending_responses = {}
91105
self.tool_service = ToolService()
106+
self._gateway_failure_counts: dict[str, int] = {}
107+
108+
# For health checks, we determine the leader instance.
109+
self.redis_url = settings.redis_url if settings.cache_type == "redis" else None
110+
111+
if self.redis_url and REDIS_AVAILABLE:
112+
self._redis_client = redis.from_url(self.redis_url)
113+
self._instance_id = str(uuid.uuid4()) # Unique ID for this process
114+
self._leader_key = "gateway_service_leader"
115+
self._leader_ttl = 40 # seconds
116+
elif settings.cache_type != "none":
117+
# Fallback: File-based lock
118+
self._redis_client = None
119+
self._lock_path = settings.filelock_path
120+
self._file_lock = FileLock(self._lock_path)
92121

93122
async def initialize(self) -> None:
94-
"""Initialize the service."""
123+
"""Initialize the service and start health check if this instance is the leader."""
95124
logger.info("Initializing gateway service")
96-
self._health_check_task = asyncio.create_task(self._run_health_checks())
125+
126+
if self._redis_client:
127+
# Check if Redis is available
128+
pong = self._redis_client.ping()
129+
if not pong:
130+
raise ConnectionError("Redis ping failed.")
131+
132+
is_leader = self._redis_client.set(self._leader_key, self._instance_id, ex=self._leader_ttl, nx=True)
133+
if is_leader:
134+
logger.info("Acquired Redis leadership. Starting health check task.")
135+
self._health_check_task = asyncio.create_task(self._run_health_checks())
136+
else:
137+
# Always create the health check task in filelock mode; leader check is handled inside.
138+
self._health_check_task = asyncio.create_task(self._run_health_checks())
97139

98140
async def shutdown(self) -> None:
99141
"""Shutdown the service."""
@@ -474,6 +516,30 @@ async def forward_request(self, gateway: DbGateway, method: str, params: Optiona
474516
except Exception as e:
475517
raise GatewayConnectionError(f"Failed to forward request to {gateway.name}: {str(e)}")
476518

519+
async def _handle_gateway_failure(self, gateway: str) -> None:
520+
"""
521+
Tracks and handles gateway failures during health checks.
522+
If the failure count exceeds the threshold, the gateway is deactivated.
523+
524+
Args:
525+
gateway (str): The gateway object that failed its health check.
526+
527+
Returns:
528+
None
529+
"""
530+
if GW_FAILURE_THRESHOLD == -1:
531+
return # Gateway failure action disabled
532+
count = self._gateway_failure_counts.get(gateway.id, 0) + 1
533+
self._gateway_failure_counts[gateway.id] = count
534+
535+
logger.warning(f"Gateway {gateway.name} failed health check {count} time(s).")
536+
537+
if count >= GW_FAILURE_THRESHOLD:
538+
logger.error(f"Gateway {gateway.name} failed {GW_FAILURE_THRESHOLD} times. Deactivating...")
539+
with SessionLocal() as db:
540+
await self.toggle_gateway_status(db, gateway.id, False)
541+
self._gateway_failure_counts[gateway.id] = 0 # Reset after deactivation
542+
477543
async def check_health_of_gateways(self, gateways: List[DbGateway]) -> bool:
478544
"""Health check for a list of gateways.
479545
@@ -506,9 +572,8 @@ async def check_health_of_gateways(self, gateways: List[DbGateway]) -> bool:
506572
gateway.last_seen = datetime.utcnow()
507573

508574
except Exception:
509-
with SessionLocal() as db:
510-
await self.toggle_gateway_status(db=db, gateway_id=gateway.id, activate=False)
511-
575+
await self._handle_gateway_failure(gateway)
576+
512577
# All gateways passed
513578
return True
514579

@@ -620,19 +685,68 @@ def _get_active_gateways(self) -> list[DbGateway]:
620685
return db.execute(select(DbGateway).where(DbGateway.is_active)).scalars().all()
621686

622687
async def _run_health_checks(self) -> None:
623-
"""Run health checks with sync Session in async code."""
688+
"""Run health checks periodically,
689+
Uses Redis or FileLock - for multiple workers.
690+
Uses simple health check for single worker mode."""
691+
624692
while True:
625693
try:
626-
# Run sync database code in a thread
627-
gateways = await asyncio.to_thread(self._get_active_gateways)
694+
if self._redis_client and settings.cache_type == "redis":
695+
# Redis-based leader check
696+
current_leader = self._redis_client.get(self._leader_key)
697+
if current_leader != self._instance_id.encode():
698+
return
699+
self._redis_client.expire(self._leader_key, self._leader_ttl)
628700

629-
if len(gateways) > 0:
630-
# Async health checks (non-blocking)
631-
await self.check_health_of_gateways(gateways)
632-
except Exception as e:
633-
logger.error(f"Health check run failed: {str(e)}")
701+
# Run health checks
702+
gateways = await asyncio.to_thread(self._get_active_gateways)
703+
if gateways:
704+
await self.check_health_of_gateways(gateways)
705+
706+
await asyncio.sleep(self._health_check_interval)
707+
708+
elif settings.cache_type == "none":
709+
try:
710+
# For single worker mode, run health checks directly
711+
gateways = await asyncio.to_thread(self._get_active_gateways)
634712

635-
await asyncio.sleep(self._health_check_interval)
713+
if gateways:
714+
await self.check_health_of_gateways(gateways)
715+
except Exception as e:
716+
logger.error(f"Health check run failed: {str(e)}")
717+
718+
await asyncio.sleep(self._health_check_interval)
719+
720+
else:
721+
# FileLock-based leader fallback
722+
try:
723+
self._file_lock.acquire(timeout=0)
724+
logger.info("File lock acquired. Running health checks.")
725+
726+
while True:
727+
gateways = await asyncio.to_thread(self._get_active_gateways)
728+
if gateways:
729+
await self.check_health_of_gateways(gateways)
730+
await asyncio.sleep(self._health_check_interval)
731+
732+
except Timeout:
733+
logger.debug("File lock already held. Retrying later.")
734+
await asyncio.sleep(self._health_check_interval)
735+
736+
except Exception as e:
737+
logger.error(f"FileLock health check failed: {str(e)}")
738+
739+
finally:
740+
if self._file_lock.is_locked:
741+
try:
742+
self._file_lock.release()
743+
logger.info("Released file lock.")
744+
except Exception as e:
745+
logger.warning(f"Failed to release file lock: {str(e)}")
746+
747+
except Exception as e:
748+
logger.error(f"Unexpected error in health check loop: {str(e)}")
749+
await asyncio.sleep(self._health_check_interval)
636750

637751
def _get_auth_headers(self) -> Dict[str, str]:
638752
"""

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ dependencies = [
6565
"starlette>=0.46.2",
6666
"uvicorn>=0.34.3",
6767
"zeroconf>=0.147.0",
68+
"filelock>=3.18.0",
6869
]
6970

7071
# ----------------------------------------------------------------

0 commit comments

Comments
 (0)