diff --git a/vllm/v1/engine/output_processor.py b/vllm/v1/engine/output_processor.py index 3be6c4821214..8bb25268c9cd 100644 --- a/vllm/v1/engine/output_processor.py +++ b/vllm/v1/engine/output_processor.py @@ -34,6 +34,7 @@ def __init__(self, output_kind: RequestOutputKind): self.aggregate = output_kind == RequestOutputKind.DELTA self.output: Optional[Union[RequestOutput, PoolingRequestOutput, Exception]] = None + self._loop = asyncio.get_running_loop() self.ready = asyncio.Event() def put(self, output: Union[RequestOutput, PoolingRequestOutput, @@ -41,7 +42,7 @@ def put(self, output: Union[RequestOutput, PoolingRequestOutput, """Non-blocking put operation.""" if self.output is None or isinstance(output, Exception): self.output = output - self.ready.set() + self._loop.call_soon_threadsafe(self.ready.set) elif isinstance(self.output, (RequestOutput, PoolingRequestOutput)): # This ensures that request outputs with different request indexes # (if n > 1) do not override each other. @@ -353,17 +354,17 @@ def process_outputs( 1) Compute stats for logging 2) Detokenize 3) Create and handle RequestOutput objects: - * If there is a queue (for usage with AsyncLLM), + * If there is a queue (for usage with AsyncLLM), put the RequestOutput objects into the queue for handling by the per-request generate() tasks. - * If there is no queue (for usage with LLMEngine), + * If there is no queue (for usage with LLMEngine), return a list of RequestOutput objects. NOTE FOR DEVELOPERS vLLM V1 minimizes the number of python loops over the full - batch to ensure system overheads are minimized. This is the + batch to ensure system overheads are minimized. This is the only function that should loop over EngineCoreOutputs. If you need to touch every element of the batch, do it from