Skip to content

Commit 8367d43

Browse files
[Bugfix] Fix routing to delete endpoint (#668)
* [CI] remove docker image before building router image Signed-off-by: Rui Zhang <[email protected]> * [Bugfix] Fix routing to deleted endpoint Signed-off-by: Rui Zhang <[email protected]> * Revert "[CI] remove docker image before building router image" This reverts commit 8319e01. Signed-off-by: Rui Zhang <[email protected]> * make health check timeout configurable Signed-off-by: Rui Zhang <[email protected]> --------- Signed-off-by: Rui Zhang <[email protected]>
1 parent 3bec177 commit 8367d43

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

src/vllm_router/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def initialize_all(app: FastAPI, args):
165165
prefill_model_labels=args.prefill_model_labels,
166166
decode_model_labels=args.decode_model_labels,
167167
watcher_timeout_seconds=args.k8s_watcher_timeout_seconds,
168+
health_check_timeout_seconds=args.backend_health_check_timeout_seconds,
168169
)
169170

170171
else:

src/vllm_router/parsers/parser.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ def validate_args(args):
9797
validate_static_model_types(args.static_model_types)
9898
if args.service_discovery == "k8s" and args.k8s_port is None:
9999
raise ValueError("K8s port must be provided when using K8s service discovery.")
100-
if args.k8s_watcher_timeout_seconds <= 0:
101-
raise ValueError("k8s-watcher-timeout-seconds must be greater than 0.")
102100
if args.routing_logic == "session" and args.session_key is None:
103101
raise ValueError(
104102
"Session key must be provided when using session routing logic."
@@ -193,8 +191,14 @@ def parse_args():
193191
parser.add_argument(
194192
"--k8s-watcher-timeout-seconds",
195193
type=int,
196-
default=30,
197-
help="Timeout in seconds for Kubernetes watcher streams (default: 30).",
194+
default=0,
195+
help="Timeout in seconds for Kubernetes watcher streams (default: 0).",
196+
)
197+
parser.add_argument(
198+
"--backend-health-check-timeout-seconds",
199+
type=int,
200+
default=10,
201+
help="Timeout in seconds for backend health check requests (default: 10).",
198202
)
199203
parser.add_argument(
200204
"--routing-logic",

src/vllm_router/service_discovery.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ def __init__(
350350
label_selector=None,
351351
prefill_model_labels: List[str] | None = None,
352352
decode_model_labels: List[str] | None = None,
353-
watcher_timeout_seconds: int = 30,
353+
watcher_timeout_seconds: int = 0,
354+
health_check_timeout_seconds: int = 10,
354355
):
355356
"""
356357
Initialize the Kubernetes service discovery module. This module
@@ -364,7 +365,7 @@ def __init__(
364365
namespace: the namespace of the engine pods
365366
port: the port of the engines
366367
label_selector: the label selector of the engines
367-
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30)
368+
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 0)
368369
"""
369370
self.app = app
370371
self.namespace = namespace
@@ -373,6 +374,7 @@ def __init__(
373374
self.available_engines_lock = threading.Lock()
374375
self.label_selector = label_selector
375376
self.watcher_timeout_seconds = watcher_timeout_seconds
377+
self.health_check_timeout_seconds = health_check_timeout_seconds
376378

377379
# Init kubernetes watcher
378380
try:
@@ -426,7 +428,9 @@ def _get_engine_sleep_status(self, pod_ip) -> Optional[bool]:
426428
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
427429
logger.info("Using vllm server authentication")
428430
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
429-
response = requests.get(url, headers=headers)
431+
response = requests.get(
432+
url, headers=headers, timeout=self.health_check_timeout_seconds
433+
)
430434
response.raise_for_status()
431435
sleep = response.json()["is_sleeping"]
432436
return sleep
@@ -508,7 +512,9 @@ def _get_model_names(self, pod_ip) -> List[str]:
508512
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
509513
logger.info("Using vllm server authentication")
510514
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
511-
response = requests.get(url, headers=headers)
515+
response = requests.get(
516+
url, headers=headers, timeout=self.health_check_timeout_seconds
517+
)
512518
response.raise_for_status()
513519
models = response.json()["data"]
514520

@@ -540,7 +546,9 @@ def _get_model_info(self, pod_ip) -> Dict[str, ModelInfo]:
540546
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
541547
logger.info("Using vllm server authentication")
542548
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
543-
response = requests.get(url, headers=headers)
549+
response = requests.get(
550+
url, headers=headers, timeout=self.health_check_timeout_seconds
551+
)
544552
response.raise_for_status()
545553
models = response.json()["data"]
546554
# Create a dictionary of model information
@@ -582,6 +590,11 @@ def _watch_engines(self):
582590
pod_name = pod.metadata.name
583591
pod_ip = pod.status.pod_ip
584592

593+
if event_type == "DELETED":
594+
if pod_name in self.available_engines:
595+
self._delete_engine(pod_name)
596+
continue
597+
585598
# Check if pod is terminating
586599
is_pod_terminating = self._is_pod_terminating(pod)
587600
is_container_ready = self._check_pod_ready(
@@ -755,7 +768,8 @@ def __init__(
755768
label_selector=None,
756769
prefill_model_labels: List[str] | None = None,
757770
decode_model_labels: List[str] | None = None,
758-
watcher_timeout_seconds: int = 30,
771+
watcher_timeout_seconds: int = 0,
772+
health_check_timeout_seconds: int = 10,
759773
):
760774
"""
761775
Initialize the Kubernetes service discovery module. This module
@@ -784,7 +798,8 @@ def __init__(
784798
namespace: the namespace of the engine services
785799
port: the port of the engines
786800
label_selector: the label selector of the engines
787-
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 30)
801+
watcher_timeout_seconds: timeout in seconds for Kubernetes watcher streams (default: 0)
802+
health_check_timeout_seconds: timeout in seconds for health check requests (default: 10)
788803
"""
789804
self.app = app
790805
self.namespace = namespace
@@ -793,6 +808,7 @@ def __init__(
793808
self.available_engines_lock = threading.Lock()
794809
self.label_selector = label_selector
795810
self.watcher_timeout_seconds = watcher_timeout_seconds
811+
self.health_check_timeout_seconds = health_check_timeout_seconds
796812

797813
# Init kubernetes watcher
798814
try:
@@ -837,7 +853,9 @@ def _get_engine_sleep_status(self, service_name) -> Optional[bool]:
837853
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
838854
logger.info("Using vllm server authentication")
839855
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
840-
response = requests.get(url, headers=headers)
856+
response = requests.get(
857+
url, headers=headers, timeout=self.health_check_timeout_seconds
858+
)
841859
response.raise_for_status()
842860
sleep = response.json()["is_sleeping"]
843861
return sleep
@@ -931,7 +949,9 @@ def _get_model_names(self, service_name) -> List[str]:
931949
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
932950
logger.info("Using vllm server authentication")
933951
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
934-
response = requests.get(url, headers=headers)
952+
response = requests.get(
953+
url, headers=headers, timeout=self.health_check_timeout_seconds
954+
)
935955
response.raise_for_status()
936956
models = response.json()["data"]
937957

@@ -963,7 +983,9 @@ def _get_model_info(self, service_name) -> Dict[str, ModelInfo]:
963983
if VLLM_API_KEY := os.getenv("VLLM_API_KEY"):
964984
logger.info("Using vllm server authentication")
965985
headers = {"Authorization": f"Bearer {VLLM_API_KEY}"}
966-
response = requests.get(url, headers=headers)
986+
response = requests.get(
987+
url, headers=headers, timeout=self.health_check_timeout_seconds
988+
)
967989
response.raise_for_status()
968990
models = response.json()["data"]
969991
# Create a dictionary of model information
@@ -1002,6 +1024,10 @@ def _watch_engines(self):
10021024
):
10031025
service = event["object"]
10041026
event_type = event["type"]
1027+
if event_type == "DELETED":
1028+
if service.metadata.name in self.available_engines:
1029+
self._delete_engine(service.metadata.name)
1030+
continue
10051031
service_name = service.metadata.name
10061032
is_service_ready = self._check_service_ready(
10071033
service_name, self.namespace

0 commit comments

Comments
 (0)