@@ -632,15 +632,17 @@ def _maybe_update_last_active_time(self):
632632 async def health_check (self ) -> bool :
633633 elapsed = time .monotonic () - self ._last_active_time
634634 if elapsed > HEALTH_CHECK_MIN_INTERVAL :
635- task = self ._loop .create_task (
636- self .make_compilation_config_serializer ()
637- )
638- task .add_done_callback (
639- lambda _ : self ._maybe_update_last_active_time ()
640- )
641- await asyncio .wait ([task ], timeout = HEALTH_CHECK_TIMEOUT )
642- if not task .done ():
635+ try :
636+ with asyncio .timeout (HEALTH_CHECK_TIMEOUT ):
637+ await self .make_compilation_config_serializer ()
638+ except TimeoutError :
639+ logger .error ("health check timed out" )
643640 return False
641+ except Exception :
642+ logger .exception ("health check failed" )
643+ return False
644+ else :
645+ self ._maybe_update_last_active_time ()
644646 return True
645647
646648
@@ -822,14 +824,26 @@ def _report_worker(self, worker: Worker, *, action: str = "spawn"):
822824 async def _acquire_worker (
823825 self , * , condition = None , weighter = None , ** compiler_args
824826 ):
825- while (
826- worker := await self ._workers_queue .acquire (
827- condition = condition , weighter = weighter
827+ start_time = time .monotonic ()
828+ try :
829+ while (
830+ worker := await self ._workers_queue .acquire (
831+ condition = condition , weighter = weighter
832+ )
833+ ).get_pid () not in self ._workers :
834+ # The worker was disconnected; skip to the next one.
835+ pass
836+ except TimeoutError :
837+ metrics .compiler_pool_queue_errors .inc (1.0 , "timeout" )
838+ raise
839+ except Exception :
840+ metrics .compiler_pool_queue_errors .inc (1.0 , "ise" )
841+ raise
842+ else :
843+ metrics .compiler_pool_queue_wait_duration .observe (
844+ time .monotonic () - start_time
828845 )
829- ).get_pid () not in self ._workers :
830- # The worker was disconnected; skip to the next one.
831- pass
832- return worker
846+ return worker
833847
834848 def _release_worker (self , worker , * , put_in_front : bool = True ):
835849 # Skip disconnected workers
@@ -1201,8 +1215,21 @@ def _connection_lost(self, _pid):
12011215 async def _acquire_worker (
12021216 self , * , condition = None , cmp = None , ** compiler_args
12031217 ):
1204- await self ._semaphore .acquire ()
1205- return await self ._worker
1218+ start_time = time .monotonic ()
1219+ try :
1220+ await self ._semaphore .acquire ()
1221+ worker = await self ._worker
1222+ except TimeoutError :
1223+ metrics .compiler_pool_queue_errors .inc (1.0 , "timeout" )
1224+ raise
1225+ except Exception :
1226+ metrics .compiler_pool_queue_errors .inc (1.0 , "ise" )
1227+ raise
1228+ else :
1229+ metrics .compiler_pool_queue_wait_duration .observe (
1230+ time .monotonic () - start_time
1231+ )
1232+ return worker
12061233
12071234 def _release_worker (self , worker , * , put_in_front : bool = True ):
12081235 if self ._sync_lock .locked ():
0 commit comments