Skip to content

Commit ee6d141

Browse files
[MAIN][BUGFIX] BugFix: Resolve the issue of waiting queue accumulation when requests are canceled. (#2426)
### What this PR does / why we need it? Resolve the issue of waiting queue accumulation when requests are canceled. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By ci - vLLM version: v0.10.1.1 - vLLM main: vllm-project/vllm@006477e --------- Signed-off-by: wangxiaoteng666 <[email protected]>
1 parent 52aff9e commit ee6d141

File tree

1 file changed

+31
-3
lines changed

1 file changed

+31
-3
lines changed

examples/disaggregated_prefill_v1/load_balance_proxy_server_example.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,11 @@
8787

8888
import argparse
8989
import asyncio
90+
import functools
9091
import heapq
9192
import os
9293
import sys
94+
import uuid
9395
from contextlib import asynccontextmanager
9496
from typing import List
9597

@@ -137,7 +139,6 @@ def __init__(self, prefiller_instances, decoder_instances):
137139
]
138140
self.req_to_prefiller = {}
139141
self.req_id_lock = asyncio.Lock()
140-
self.req_id_counter = 0
141142
# Removed selection locks - no longer needed for synchronous methods
142143

143144
# Initialize priority queues for efficient server selection
@@ -193,8 +194,7 @@ def aquire_aborted_prefiller_requests(
193194

194195
async def next_req_id(self):
195196
async with self.req_id_lock:
196-
self.req_id_counter += 1
197-
return str(self.req_id_counter)
197+
return str(uuid.uuid4())
198198

199199
def select_prefiller(self, token_count): # Changed to synchronous
200200
# No lock needed - entire function is atomic
@@ -313,6 +313,32 @@ async def lifespan(app: FastAPI):
313313
await d.client.aclose()
314314

315315

316+
async def listen_for_disconnect(request: Request) -> None:
317+
"""Return if a disconnect message is received"""
318+
while True:
319+
message = await request.receive()
320+
if message["type"] == "http.disconnect":
321+
break
322+
323+
324+
def with_cancellation(handler_func):
325+
326+
@functools.wraps(handler_func)
327+
async def wrapper(*args, **kwargs):
328+
request = kwargs["request"]
329+
handler_task = asyncio.create_task(handler_func(*args, **kwargs))
330+
cancellation_task = asyncio.create_task(listen_for_disconnect(request))
331+
done, pending = await asyncio.wait([handler_task, cancellation_task],
332+
return_when=asyncio.FIRST_COMPLETED)
333+
for task in pending:
334+
task.cancel()
335+
if handler_task in done:
336+
return handler_task.result()
337+
return None
338+
339+
return wrapper
340+
341+
316342
app = FastAPI(lifespan=lifespan)
317343

318344

@@ -493,11 +519,13 @@ async def generate_stream():
493519

494520

495521
@app.post("/v1/completions")
522+
@with_cancellation
496523
async def handle_completions(request: Request):
497524
return await _handle_completions("/completions", request)
498525

499526

500527
@app.post("/v1/chat/completions")
528+
@with_cancellation
501529
async def handle_chat_completions(request: Request):
502530
return await _handle_completions("/chat/completions", request)
503531

0 commit comments

Comments
 (0)