@@ -179,7 +179,7 @@ def __init__(
179179 # to eliminate pipeline bubbles.
180180 self .batch_queue_size = self .model_executor .max_concurrent_batches
181181 self .batch_queue : (
182- deque [tuple [Future [ModelRunnerOutput ], SchedulerOutput ]] | None
182+ deque [tuple [Future [ModelRunnerOutput ], SchedulerOutput , Future [ Any ] ]] | None
183183 ) = None
184184 if self .batch_queue_size > 1 :
185185 logger .info ("Batch queue is enabled with size %d" , self .batch_queue_size )
@@ -337,16 +337,6 @@ def log_error_detail(self, scheduler_output: SchedulerOutput):
337337 )
338338 raise err
339339
340- def _log_err_callback (self , scheduler_output : SchedulerOutput ):
341- """Log error details of a future that's not expected to return a result."""
342-
343- def callback (f , sched_output = scheduler_output ):
344- with self .log_error_detail (sched_output ):
345- result = f .result ()
346- assert result is None
347-
348- return callback
349-
350340 def step (self ) -> tuple [dict [int , EngineCoreOutputs ], bool ]:
351341 """Schedule, execute, and make output.
352342
@@ -423,8 +413,6 @@ def step_with_batch_queue(
423413 # No sampling required (no requests scheduled).
424414 future = cast (Future [ModelRunnerOutput ], exec_future )
425415 else :
426- exec_future .add_done_callback (self ._log_err_callback (scheduler_output ))
427-
428416 if not scheduler_output .pending_structured_output_tokens :
429417 # We aren't waiting for any tokens, get any grammar output
430418 # and sample immediately.
@@ -441,7 +429,7 @@ def step_with_batch_queue(
441429
442430 if not deferred_scheduler_output :
443431 # Add this step's future to the queue.
444- batch_queue .appendleft ((future , scheduler_output ))
432+ batch_queue .appendleft ((future , scheduler_output , exec_future ))
445433 if (
446434 model_executed
447435 and len (batch_queue ) < self .batch_queue_size
@@ -458,9 +446,14 @@ def step_with_batch_queue(
458446 return None , False
459447
460448 # Block until the next result is available.
461- future , scheduler_output = batch_queue .pop ()
449+ future , scheduler_output , exec_model_fut = batch_queue .pop ()
462450 with self .log_error_detail (scheduler_output ):
463451 model_output = future .result ()
452+ if model_output is None :
453+ # None from sample_tokens() implies that the original execute_model()
454+ # call failed - raise that exception.
455+ exec_model_fut .result ()
456+ raise RuntimeError ("unexpected error" )
464457
465458 # Before processing the model output, process any aborts that happened
466459 # during the model execution.
@@ -479,7 +472,7 @@ def step_with_batch_queue(
479472 deferred_scheduler_output
480473 )
481474 future = self .model_executor .sample_tokens (grammar_output , non_block = True )
482- batch_queue .appendleft ((future , deferred_scheduler_output ))
475+ batch_queue .appendleft ((future , deferred_scheduler_output , exec_future ))
483476
484477 return engine_core_outputs , model_executed
485478
0 commit comments