@@ -350,7 +350,8 @@ def __init__(
350350 label_selector = None ,
351351 prefill_model_labels : List [str ] | None = None ,
352352 decode_model_labels : List [str ] | None = None ,
353- watcher_timeout_seconds : int = 30 ,
353+ watcher_timeout_seconds : int = 0 ,
354+ health_check_timeout_seconds : int = 10 ,
354355 ):
355356 """
356357 Initialize the Kubernetes service discovery module. This module
@@ -364,7 +365,7 @@ def __init__(
364365 namespace: the namespace of the engine pods
365366 port: the port of the engines
366367 label_selector: the label selector of the engines
367- watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30 )
368+ watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 0 )
368369 """
369370 self .app = app
370371 self .namespace = namespace
@@ -373,6 +374,7 @@ def __init__(
373374 self .available_engines_lock = threading .Lock ()
374375 self .label_selector = label_selector
375376 self .watcher_timeout_seconds = watcher_timeout_seconds
377+ self .health_check_timeout_seconds = health_check_timeout_seconds
376378
377379 # Init kubernetes watcher
378380 try :
@@ -426,7 +428,9 @@ def _get_engine_sleep_status(self, pod_ip) -> Optional[bool]:
426428 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
427429 logger .info ("Using vllm server authentication" )
428430 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
429- response = requests .get (url , headers = headers )
431+ response = requests .get (
432+ url , headers = headers , timeout = self .health_check_timeout_seconds
433+ )
430434 response .raise_for_status ()
431435 sleep = response .json ()["is_sleeping" ]
432436 return sleep
@@ -508,7 +512,9 @@ def _get_model_names(self, pod_ip) -> List[str]:
508512 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
509513 logger .info ("Using vllm server authentication" )
510514 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
511- response = requests .get (url , headers = headers )
515+ response = requests .get (
516+ url , headers = headers , timeout = self .health_check_timeout_seconds
517+ )
512518 response .raise_for_status ()
513519 models = response .json ()["data" ]
514520
@@ -540,7 +546,9 @@ def _get_model_info(self, pod_ip) -> Dict[str, ModelInfo]:
540546 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
541547 logger .info ("Using vllm server authentication" )
542548 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
543- response = requests .get (url , headers = headers )
549+ response = requests .get (
550+ url , headers = headers , timeout = self .health_check_timeout_seconds
551+ )
544552 response .raise_for_status ()
545553 models = response .json ()["data" ]
546554 # Create a dictionary of model information
@@ -582,6 +590,11 @@ def _watch_engines(self):
582590 pod_name = pod .metadata .name
583591 pod_ip = pod .status .pod_ip
584592
593+ if event_type == "DELETED" :
594+ if pod_name in self .available_engines :
595+ self ._delete_engine (pod_name )
596+ continue
597+
585598 # Check if pod is terminating
586599 is_pod_terminating = self ._is_pod_terminating (pod )
587600 is_container_ready = self ._check_pod_ready (
@@ -755,7 +768,8 @@ def __init__(
755768 label_selector = None ,
756769 prefill_model_labels : List [str ] | None = None ,
757770 decode_model_labels : List [str ] | None = None ,
758- watcher_timeout_seconds : int = 30 ,
771+ watcher_timeout_seconds : int = 0 ,
772+ health_check_timeout_seconds : int = 10 ,
759773 ):
760774 """
761775 Initialize the Kubernetes service discovery module. This module
@@ -784,7 +798,8 @@ def __init__(
784798 namespace: the namespace of the engine services
785799 port: the port of the engines
786800 label_selector: the label selector of the engines
787- watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30)
801+ watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 0)
802+ health_check_timeout_seconds: timeout in seconds for health check requests (default: 10)
788803 """
789804 self .app = app
790805 self .namespace = namespace
@@ -793,6 +808,7 @@ def __init__(
793808 self .available_engines_lock = threading .Lock ()
794809 self .label_selector = label_selector
795810 self .watcher_timeout_seconds = watcher_timeout_seconds
811+ self .health_check_timeout_seconds = health_check_timeout_seconds
796812
797813 # Init kubernetes watcher
798814 try :
@@ -837,7 +853,9 @@ def _get_engine_sleep_status(self, service_name) -> Optional[bool]:
837853 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
838854 logger .info ("Using vllm server authentication" )
839855 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
840- response = requests .get (url , headers = headers )
856+ response = requests .get (
857+ url , headers = headers , timeout = self .health_check_timeout_seconds
858+ )
841859 response .raise_for_status ()
842860 sleep = response .json ()["is_sleeping" ]
843861 return sleep
@@ -931,7 +949,9 @@ def _get_model_names(self, service_name) -> List[str]:
931949 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
932950 logger .info ("Using vllm server authentication" )
933951 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
934- response = requests .get (url , headers = headers )
952+ response = requests .get (
953+ url , headers = headers , timeout = self .health_check_timeout_seconds
954+ )
935955 response .raise_for_status ()
936956 models = response .json ()["data" ]
937957
@@ -963,7 +983,9 @@ def _get_model_info(self, service_name) -> Dict[str, ModelInfo]:
963983 if VLLM_API_KEY := os .getenv ("VLLM_API_KEY" ):
964984 logger .info ("Using vllm server authentication" )
965985 headers = {"Authorization" : f"Bearer { VLLM_API_KEY } " }
966- response = requests .get (url , headers = headers )
986+ response = requests .get (
987+ url , headers = headers , timeout = self .health_check_timeout_seconds
988+ )
967989 response .raise_for_status ()
968990 models = response .json ()["data" ]
969991 # Create a dictionary of model information
@@ -1002,6 +1024,10 @@ def _watch_engines(self):
10021024 ):
10031025 service = event ["object" ]
10041026 event_type = event ["type" ]
1027+ if event_type == "DELETED" :
1028+ if service .metadata .name in self .available_engines :
1029+ self ._delete_engine (service .metadata .name )
1030+ continue
10051031 service_name = service .metadata .name
10061032 is_service_ready = self ._check_service_ready (
10071033 service_name , self .namespace
0 commit comments