Skip to content

Commit ccdae73

Browse files
authored
[BugFix] Don't cancel asyncio tasks directly from destructors (#22476)
Signed-off-by: Nick Hill <[email protected]>
1 parent 9040639 commit ccdae73

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

vllm/utils/__init__.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -687,19 +687,30 @@ def _queue_key(self, op: str, kwargs: dict) -> tuple:
687687
max_length = kwargs.get("max_length")
688688

689689
if not truncation:
690-
return ("encode", add_special_tokens, False, None)
690+
return "encode", add_special_tokens, False, None
691691

692692
model_max = getattr(self.tokenizer, "model_max_length", None)
693693
if max_length is None or (model_max is not None
694694
and max_length == model_max):
695-
return ("encode", add_special_tokens, True, "model_max")
695+
return "encode", add_special_tokens, True, "model_max"
696696

697-
return ("encode", "other")
697+
return "encode", "other"
698698

699699
def __del__(self):
700-
for task in self._batcher_tasks:
701-
if not task.done():
702-
task.cancel()
700+
if ((tasks := getattr(self, "_batcher_tasks", None))
701+
and (loop := getattr(self, "_loop", None))
702+
and not loop.is_closed()):
703+
704+
def cancel_tasks():
705+
for task in tasks:
706+
task.cancel()
707+
708+
loop.call_soon_threadsafe(cancel_tasks)
709+
710+
711+
def cancel_task_threadsafe(task: Task):
712+
if task and not task.done() and not (loop := task.get_loop()).is_closed():
713+
loop.call_soon_threadsafe(task.cancel)
703714

704715

705716
def make_async(

vllm/v1/engine/async_llm.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from vllm.transformers_utils.tokenizer import AnyTokenizer
2828
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
2929
from vllm.usage.usage_lib import UsageContext
30-
from vllm.utils import Device, cdiv, deprecate_kwargs
30+
from vllm.utils import Device, cancel_task_threadsafe, cdiv, deprecate_kwargs
3131
from vllm.v1.engine import EngineCoreRequest
3232
from vllm.v1.engine.core_client import EngineCoreClient
3333
from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError
@@ -219,8 +219,7 @@ def shutdown(self):
219219
if engine_core := getattr(self, "engine_core", None):
220220
engine_core.shutdown()
221221

222-
if handler := getattr(self, "output_handler", None):
223-
handler.cancel()
222+
cancel_task_threadsafe(getattr(self, "output_handler", None))
224223

225224
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
226225
return await self.engine_core.get_supported_tasks_async()

vllm/v1/engine/core_client.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
from vllm.logger import init_logger
2424
from vllm.lora.request import LoRARequest
2525
from vllm.tasks import SupportedTask
26-
from vllm.utils import get_open_port, get_open_zmq_inproc_path, make_zmq_socket
26+
from vllm.utils import (cancel_task_threadsafe, get_open_port,
27+
get_open_zmq_inproc_path, make_zmq_socket)
2728
from vllm.v1.engine import (EngineCoreOutputs, EngineCoreRequest,
2829
EngineCoreRequestType,
2930
ReconfigureDistributedRequest, ReconfigureRankType,
@@ -342,10 +343,8 @@ def __call__(self):
342343
if self.coordinator is not None:
343344
self.coordinator.close()
344345

345-
if self.output_queue_task is not None:
346-
self.output_queue_task.cancel()
347-
if self.stats_update_task is not None:
348-
self.stats_update_task.cancel()
346+
cancel_task_threadsafe(self.output_queue_task)
347+
cancel_task_threadsafe(self.stats_update_task)
349348

350349
# ZMQ context termination can hang if the sockets
351350
# aren't explicitly closed first.

0 commit comments

Comments
 (0)