Skip to content

Commit ca9f2a1

Browse files
Merge pull request #93 from amy-why-3459/bug_fix_v0.9.1
【bugfix】Support individual health check calls
2 parents 3432710 + d15cb4d commit ca9f2a1

File tree

2 files changed

+41
-5
lines changed

2 files changed

+41
-5
lines changed

examples/offline_inference/epd/chat_with_image.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from PIL import Image
99

1010
from vllm import SamplingParams
11+
from vllm.disaggregated.protocol import ServerType
1112
from vllm.disaggregated.proxy import Proxy
1213

1314
parser = argparse.ArgumentParser()
@@ -56,11 +57,40 @@ async def main():
5657
pd_addr_list=args.pd_addr_list,
5758
model_name=args.model_name,
5859
)
60+
retry_times = 20
61+
sleep_times = 10
62+
timeout_times = 3
63+
for i in range(retry_times):
64+
tasks_0 = [
65+
asyncio.create_task(
66+
asyncio.wait_for(
67+
p.check_health(ServerType.E_INSTANCE, iid), timeout=timeout_times
68+
)
69+
)
70+
for iid in range(len(args.encode_addr_list))
71+
]
72+
tasks_1 = [
73+
asyncio.create_task(
74+
asyncio.wait_for(
75+
p.check_health(ServerType.PD_INSTANCE, iid), timeout=timeout_times
76+
)
77+
)
78+
for iid in range(len(args.pd_addr_list))
79+
]
80+
tasks = tasks_0 + tasks_1
81+
results = await asyncio.gather(*tasks, return_exceptions=True)
82+
if all([isinstance(result, bool) and result for result in results]):
83+
print("All instances are ready")
84+
break
85+
else:
86+
print(f"retry_times:{i}, results:{results}")
87+
await asyncio.sleep(sleep_times)
88+
5989
prompt = (
6090
"<|im_start|> system\n"
6191
"You are a helpful assistant.<|im_end|> \n"
6292
"<|im_start|> user\n"
63-
"<image> \n"
93+
"<|image_pad|> \n"
6494
"What is the text in the illustrate?<|im_end|> \n"
6595
"<|im_start|> assistant\n"
6696
)
@@ -73,6 +103,7 @@ async def main():
73103
for i in range(10)
74104
]
75105
await asyncio.gather(*tasks)
106+
p.shutdown()
76107

77108

78109
if __name__ == "__main__":

vllm/disaggregated/proxy.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(
4949
pd_addr_list: list[str],
5050
model_name: str,
5151
router: type[RoutingInterface] = RandomRouter,
52-
enable_health_monitor=True,
52+
enable_health_monitor=False,
5353
health_check_interval=10,
5454
health_threshold=3,
5555
):
@@ -369,9 +369,10 @@ async def _run_output_handler(self) -> None:
369369
f"Unknown response type from worker: {resp_type}")
370370

371371
if resp.request_id not in self.queues:
372-
logger.warning(
373-
"Request %s may have been aborted, ignore response.",
374-
resp.request_id)
372+
if resp_type != ResponseType.HEARTBEAT:
373+
logger.warning(
374+
"Request %s may have been aborted, "
375+
"ignore response.", resp.request_id)
375376
elif isinstance(resp, FailureResponse):
376377
self.queues[resp.request_id].put_nowait(
377378
RuntimeError(f"Request error: {resp.error_message}"))
@@ -429,6 +430,10 @@ async def do_log_stats(
429430
pass
430431

431432
async def check_health(self, server_type: ServerType, id: int):
433+
# lazy initialization
434+
if self.output_handler is None:
435+
self.output_handler = asyncio.create_task(
436+
self._run_output_handler())
432437
request_id = str(uuid.uuid4())
433438
request = HeartbeatRequest(request_id=request_id)
434439
q = asyncio.Queue()

0 commit comments

Comments
 (0)