@@ -455,28 +455,29 @@ async def forward_request(self, gateway: DbGateway, method: str, params: Optiona
455
455
except Exception as e :
456
456
raise GatewayConnectionError (f"Failed to forward request to { gateway .name } : { str (e )} " )
457
457
458
- async def check_gateway_health (self , gateway : DbGateway ) -> bool :
459
- """Check if a gateway is healthy.
458
+ async def check_health_of_gateways (self , gateways : List [ DbGateway ] ) -> bool :
459
+ """Health check for gateways
460
460
461
461
Args:
462
- gateway: Gateway to check
462
+ gateways: Gateways to check
463
463
464
464
Returns:
465
465
True if gateway is healthy
466
466
"""
467
- if not gateway .is_active :
468
- return False
467
+ for gateway in gateways :
468
+ if not gateway .is_active :
469
+ return False
469
470
470
- try :
471
- # Try to initialize connection
472
- await self ._initialize_gateway (gateway .url , gateway .auth_value )
471
+ try :
472
+ # Try to initialize connection
473
+ await self ._initialize_gateway (gateway .url , gateway .auth_value )
473
474
474
- # Update last seen
475
- gateway .last_seen = datetime .utcnow ()
476
- return True
475
+ # Update last seen
476
+ gateway .last_seen = datetime .utcnow ()
477
+ return True
477
478
478
- except Exception :
479
- return False
479
+ except Exception :
480
+ return False
480
481
481
482
async def aggregate_capabilities (self , db : Session ) -> Dict [str , Any ]:
482
483
"""Aggregate capabilities from all gateways.
@@ -576,29 +577,24 @@ async def connect_to_sse_server(server_url: str, authentication: Optional[Dict[s
576
577
except Exception as e :
577
578
raise GatewayConnectionError (f"Failed to initialize gateway at { url } : { str (e )} " )
578
579
580
+ def _get_active_gateways (self ) -> list [DbGateway ]:
581
+ """Sync function for database operations (runs in thread)."""
582
+ with Session () as db :
583
+ return db .execute (select (DbGateway ).where (DbGateway .is_active )).scalars ().all ()
584
+
579
585
async def _run_health_checks (self ) -> None :
580
- """Run periodic health checks on all gateways ."""
586
+ """Run health checks with sync Session in async code ."""
581
587
while True :
582
588
try :
583
- async with Session () as db :
584
- # Get active gateways
585
- gateways = db .execute (select (DbGateway ).where (DbGateway .is_active )).scalars ().all ()
589
+ # Run sync database code in a thread
590
+ gateways = await asyncio .to_thread (self ._get_active_gateways )
586
591
587
- # Check each gateway
588
- for gateway in gateways :
589
- try :
590
- is_healthy = await self .check_gateway_health (gateway )
591
- if not is_healthy :
592
- logger .warning (f"Gateway { gateway .name } is unhealthy" )
593
- except Exception as e :
594
- logger .error (f"Health check failed for { gateway .name } : { str (e )} " )
595
-
596
- db .commit ()
592
+ # Async health checks (non-blocking)
593
+ await self .check_health_of_gateways (gateways )
597
594
598
595
except Exception as e :
599
596
logger .error (f"Health check run failed: { str (e )} " )
600
-
601
- # Wait for next check
597
+
602
598
await asyncio .sleep (self ._health_check_interval )
603
599
604
600
def _get_auth_headers (self ) -> Dict [str , str ]:
0 commit comments