99from vllm .inputs .preprocess import InputPreprocessor
1010from vllm .logger import init_logger
1111from vllm .lora .request import LoRARequest
12- from vllm .outputs import PoolingRequestOutput , RequestOutput
12+ from vllm .outputs import RequestOutput
1313from vllm .pooling_params import PoolingParams
1414from vllm .prompt_adapter .request import PromptAdapterRequest
1515from vllm .sampling_params import SamplingParams
1616from vllm .transformers_utils .tokenizer import AnyTokenizer
1717from vllm .transformers_utils .tokenizer_group import init_tokenizer_from_configs
1818from vllm .usage .usage_lib import UsageContext
19- from vllm .v1 .engine .async_stream import AsyncStream
2019from vllm .v1 .engine .core_client import EngineCoreClient
2120from vllm .v1 .engine .detokenizer import Detokenizer
2221from vllm .v1 .engine .processor import Processor
@@ -54,10 +53,8 @@ def __init__(
5453 lora_config = vllm_config .lora_config )
5554 self .tokenizer .ping ()
5655
57- # Request streams (map of request_id -> AsyncStream).
58- self .request_streams : Dict [str , AsyncStream ] = {}
59- # List of cancelled request ids to be aborted.
60- self .client_aborted_requests : List [str ] = []
56+ # Request streams (map of request_id -> queue).
57+ self .rid_to_queue : Dict [str , asyncio .Queue ] = {}
6158
6259 # Processor (converts Inputs --> EngineCoreRequests).
6360 self .processor = Processor (
@@ -153,14 +150,13 @@ async def add_request(
153150 trace_headers : Optional [Mapping [str , str ]] = None ,
154151 prompt_adapter_request : Optional [PromptAdapterRequest ] = None ,
155152 priority : int = 0 ,
156- ) -> AsyncGenerator [ Union [ RequestOutput , PoolingRequestOutput ], None ]:
153+ ) -> asyncio . Queue [ RequestOutput ]:
157154 """Add new request to the AsyncLLM."""
158155
159- if self .detokenizer .is_request_active (request_id ):
160- raise ValueError (f"Request { request_id } already exists." )
161-
162- # 1) Create a new AsyncStream for the request.
163- stream = self ._add_request_to_streams (request_id )
156+ # 1) Create a new output queue for the request.
157+ if request_id in self .rid_to_queue :
158+ raise ValueError (f"Request id { request_id } already running." )
159+ self .rid_to_queue [request_id ] = asyncio .Queue ()
164160
165161 # 2) Convert input --> DetokenizerRequest / EngineCoreRequest.
166162 detokenizer_req , engine_core_req = self .processor .process_inputs (
@@ -173,8 +169,10 @@ async def add_request(
173169 # 4) Add the EngineCoreRequest to EngineCore (separate process).
174170 await self .engine_core .add_request_async (engine_core_req )
175171
176- # 5) Return the generator.
177- return stream .generator ()
172+ if self .log_requests :
173+ logger .info ("Added request %s." , request_id )
174+
175+ return self .rid_to_queue [request_id ]
178176
179177 # TODO: we should support multiple prompts in one call, as you
180178 # can do with LLM.generate. So that for multi-prompt completion
@@ -194,7 +192,7 @@ async def generate(
194192 """
195193 Main function called by the API server to kick off a request
196194 * 1) Making an AsyncStream corresponding to the Request.
197- # 2) Processing the Input.
195+ * 2) Processing the Input.
198196 * 3) Adding the Request to the Detokenizer.
199197 * 4) Adding the Request to the EngineCore (separate process).
200198
@@ -206,94 +204,58 @@ async def generate(
206204 returning the RequestOutput back to the caller.
207205 """
208206
209- # We start the output_handler on the first call to generate() so that
210- # we can call __init__ before the event loop starts, which enables us
211- # to handle startup failure gracefully in the OpenAI server.
212- if self .output_handler is None :
213- self .output_handler = asyncio .create_task (
214- self ._run_output_handler ())
215-
216- async for output in await self .add_request (
207+ try :
208+ # We start the output_handler on the first call to generate() so
209+ # we can call __init__ before the event loop, which enables us
210+ # to handle startup failure gracefully in the OpenAI server.
211+ if self .output_handler is None :
212+ self .output_handler = asyncio .create_task (
213+ self ._run_output_handler ())
214+
215+ q = await self .add_request (
217216 request_id ,
218217 prompt ,
219218 sampling_params ,
220219 lora_request = lora_request ,
221220 trace_headers = trace_headers ,
222221 prompt_adapter_request = prompt_adapter_request ,
223222 priority = priority ,
224- ):
225- yield output
226-
227- def _finish_stream (self , request_id : str ):
228- stream = self .request_streams .pop (request_id , None )
229- if stream is not None :
230- stream .finish ()
231-
232- def _add_request_to_streams (
233- self ,
234- request_id : str ,
235- ) -> AsyncStream :
236-
237- if request_id in self .request_streams :
238- raise ValueError (f"Request id { request_id } already running." )
239-
240- # Avoid streams having circular ref to parent AsyncLLM object.
241- aborted_reqs = self .client_aborted_requests
242- stream = AsyncStream (request_id , aborted_reqs .append )
243- self .request_streams [request_id ] = stream
244-
245- if self .log_requests :
246- logger .info ("Added request %s." , request_id )
223+ )
247224
248- return stream
249-
250- async def _process_cancellations (self ) -> None :
251- """
252- Process requests cancelled from user disconnecting.
253-
254- When a client disconnects, AsyncStream._cancel() is called.
255- We passed a callback to AsyncStream(), which appends to
256- self.client_aborted_requests.
257-
258- As a result, if any requests are canceled from the user side
259- the request_id will show up in self.client_aborted_requests.
260- """
261-
262- # Avoid streams having circular ref to parent AsyncLLM object.
263- if not self .client_aborted_requests :
264- return
265- reqs_to_abort = self .client_aborted_requests .copy ()
266- self .client_aborted_requests .clear ()
267-
268- # Remove from Detokenizer.
269- self .detokenizer .abort_requests (reqs_to_abort )
270-
271- # Remove from RequestStreams.
272- for request_id in reqs_to_abort :
273- if self .log_requests :
274- logger .info ("User-cancelled request %s." , request_id )
275- self ._finish_stream (request_id )
276-
277- # Remove from EngineCore.
278- await self .engine_core .abort_requests_async (reqs_to_abort )
225+ # The output_handler task pushes items into the queue.
226+ # This task pulls from the queue and yields to caller.
227+ while True :
228+ # Note: drain queue without await if possible (avoids
229+ # task switching under load which helps performance).
230+ out = q .get_nowait () if q .qsize () > 0 else await q .get ()
231+
232+ # Note: both Detokenizer and EngineCore handle their
233+ # own request cleanup based on finished.
234+ if out .finished :
235+ del self .rid_to_queue [request_id ]
236+ yield out
237+ break
238+
239+ yield out
240+
241+ # If the request is disconnected by the client, the
242+ # generate() task will be canceled. So, we abort the
243+ # request if we end up here.
244+ except asyncio .CancelledError :
245+ await self .abort (request_id )
246+ raise
279247
280248 def _process_request_outputs (self , request_outputs : List [RequestOutput ]):
281- """Process outputs by putting them into per-request AsyncStreams ."""
249+ """Process outputs by putting them into per-request queues ."""
282250
283251 for request_output in request_outputs :
284252 request_id = request_output .request_id
285- assert request_id in self .request_streams
286253
287- # Each request in the API server pulls from the per-request stream.
288- stream = self .request_streams .get (request_id )
289- if stream is not None :
290- stream .put (request_output )
291-
292- # If finished, remove from the tracker.
293- if request_output .finished :
294- if self .log_requests :
295- logger .info ("Finished request %s." , request_id )
296- self ._finish_stream (request_id )
254+ # Note: it is possible a request was aborted and removed from
255+ # the state due to client cancellations, so if we encounter a
256+ # request id not in the state, we skip.
257+ if request_id in self .rid_to_queue :
258+ self .rid_to_queue [request_id ].put_nowait (request_output )
297259
298260 async def _run_output_handler (self ):
299261 """Background loop: pulls from EngineCore and pushes to AsyncStreams."""
@@ -306,24 +268,27 @@ async def _run_output_handler(self):
306268 # 2) Detokenize based on the output.
307269 request_outputs , reqs_to_abort = self .detokenizer .step (outputs )
308270
309- # 3) Put the RequestOutputs into the per-request AsyncStreams .
271+ # 3) Put the RequestOutputs into the per-request queues .
310272 self ._process_request_outputs (request_outputs )
311273
312274 # 4) Abort any requests that finished due to stop strings.
313275 await self .engine_core .abort_requests_async (reqs_to_abort )
314276
315- # 5) Abort any requests due to client cancellations.
316- await self ._process_cancellations ()
317-
318277 except BaseException as e :
319278 logger .error (e )
320279 raise e
321280
322- # TODO: can we eliminate these?
323-
324281 async def abort (self , request_id : str ) -> None :
325- # Note: Who Calls this? I dont think this is actually used.
326- raise ValueError ("Not Supported on V1 yet." )
282+ """Abort RequestId in self, detokenizer, and engine core."""
283+
284+ request_ids = [request_id ]
285+ await self .engine_core .abort_requests_async (request_ids )
286+ self .detokenizer .abort_requests (request_ids )
287+
288+ # If a request finishes while we await then the request_id
289+ # will be removed from the tracked queues before we get here.
290+ if request_id in self .rid_to_queue :
291+ del self .rid_to_queue [request_id ]
327292
328293 def encode (
329294 self ,
0 commit comments