@@ -624,6 +624,9 @@ def __init__(self,
624624 self ._streaming = streaming
625625
626626 def _handle_response (self , response : "GenerationExecutor.Response" ):
627+ # Save token lengths before processing to detect which outputs received new tokens
628+ prev_token_lens = {o .index : len (o .token_ids ) for o in self ._outputs }
629+
627630 GenerationResultBase ._handle_response (self , response )
628631
629632 # The postprocess has been performed, return directly
@@ -638,7 +641,15 @@ def _handle_response(self, response: "GenerationExecutor.Response"):
638641 }
639642 if self .sampling_params .detokenize and self .tokenizer is not None :
640643 for beam_output in self .outputs :
644+ # Always update _last_text_len to prevent stale text_diff
641645 beam_output ._last_text_len = len (beam_output .text )
646+ # For n > 1 streaming: only detokenize outputs that received new tokens
647+ # to prevent re-decoding the same tokens multiple times
648+ output_received_new_tokens = len (
649+ beam_output .token_ids ) != prev_token_lens .get (
650+ beam_output .index , 0 )
651+ if not output_received_new_tokens :
652+ continue
642653 if hasattr (
643654 self .tokenizer , 'decode_incrementally'
644655 ) and self ._streaming and not self .sampling_params .use_beam_search :
@@ -653,10 +664,6 @@ def _handle_response(self, response: "GenerationExecutor.Response"):
653664 beam_output .text = self .tokenizer .decode (
654665 beam_output .token_ids , ** kwargs )
655666
656- # Update _last_token_ids_len after detokenization to prevent
657- # re-decoding the same tokens in subsequent responses when n > 1.
658- beam_output ._last_token_ids_len = len (beam_output .token_ids )
659-
660667 is_generating = not self ._done
661668 is_finished_with_stop_or_length = (
662669 beam_output .finish_reason == 'stop'
0 commit comments