From fc7d331357c803bf1da320f539d7722583fafe71 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:03:07 +0000 Subject: [PATCH 1/7] Handle multiple channels in one decoding stage Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 155 ++++++++++++++---------- 1 file changed, 92 insertions(+), 63 deletions(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 12dd474936db..e31a9c8587b6 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -748,12 +748,18 @@ async def chat_completion_stream_generator( if self.use_harmony: harmony_parser = harmony_parsers[i] prev_recipient = harmony_parser.current_recipient - delta_text = "" + + # Track accumulated content per token with their state + token_states = [] for token_id in output.token_ids: harmony_parser.process(token_id) - delta_text += harmony_parser.last_content_delta or "" - cur_channel = harmony_parser.current_channel - cur_recipient = harmony_parser.current_recipient + token_delta = harmony_parser.last_content_delta or "" + token_states.append(( + harmony_parser.current_channel, + harmony_parser.current_recipient, + token_delta + )) + delta_text = "".join(state[2] for state in token_states) else: delta_text = output.text @@ -783,61 +789,80 @@ async def chat_completion_stream_generator( current_token_ids = as_list(output.token_ids) if self.use_harmony: - if cur_channel == "final": - delta_message = DeltaMessage(content=delta_text) - elif cur_channel == "analysis": - if request.include_reasoning: - delta_message = DeltaMessage( - reasoning_content=delta_text - ) - else: - delta_message = None - elif ( - cur_channel == "commentary" - and cur_recipient - and cur_recipient.startswith("functions.") - ): - # Count completed tool calls to determine index - base_index = 0 - for msg in harmony_parser.messages: - if ( - msg.channel == "commentary" - and msg.recipient - and msg.recipient.startswith("functions.") - ): - base_index += 1 - - if prev_recipient != cur_recipient: - tool_name = cur_recipient.split("functions.", 1)[1] - delta_message = DeltaMessage( - tool_calls=[ - DeltaToolCall( - id=make_tool_call_id(), - type="function", - function=DeltaFunctionCall( - name=tool_name, - arguments="", - ), - index=base_index, - ) - ] - ) - elif delta_text: - delta_message = DeltaMessage( - tool_calls=[ - DeltaToolCall( - index=base_index, - function=DeltaFunctionCall( - arguments=delta_text - ), - ) - ] - ) + # Group consecutive tokens with same channel/recipient + groups = [] + for channel, recipient, text in token_states: + if not text: + continue + if groups and groups[-1]['channel'] == channel and groups[-1]['recipient'] == recipient: + groups[-1]['text'] += text else: - delta_message = None + groups.append({ + 'channel': channel, + 'recipient': recipient, + 'text': text + }) - if delta_message is not None: + # Process each group and create delta messages + delta_message = None + combined_content = "" + combined_reasoning = "" + tool_messages = [] + + for group in groups: + group_channel = group['channel'] + group_recipient = group['recipient'] + group_text = group['text'] + + if group_channel == "final": + combined_content += group_text + elif group_channel == "analysis": + if request.include_reasoning: + combined_reasoning += group_text + elif (group_channel == "commentary" and group_recipient + and group_recipient.startswith("functions.")): + + base_index = 0 + for msg in harmony_parser.messages: + if (msg.channel == "commentary" + and msg.recipient + and msg.recipient.startswith( + "functions.")): + base_index += 1 + + if prev_recipient != group_recipient: + tool_name = group_recipient.split( + "functions.", 1)[1] + tool_messages.append(DeltaToolCall( + id=make_tool_call_id(), + type="function", + function=DeltaFunctionCall( + name=tool_name, + arguments="", + ), + index=base_index, + )) + prev_recipient = group_recipient + + if group_text: + tool_messages.append(DeltaToolCall( + index=base_index, + function=DeltaFunctionCall( + arguments=group_text), + )) + + # Combine all non-empty fields into a single message + if combined_content or combined_reasoning or tool_messages: + delta_kwargs = {} + if combined_content: + delta_kwargs['content'] = combined_content + if combined_reasoning: + delta_kwargs['reasoning_content'] = combined_reasoning + if tool_messages: + delta_kwargs['tool_calls'] = tool_messages harmony_tools_streamed[i] = True + + delta_message = DeltaMessage(**delta_kwargs) else: delta_message = None # handle streaming deltas for tools with named tool_choice @@ -1076,17 +1101,21 @@ async def chat_completion_stream_generator( # Log streaming delta if output logging is enabled if self.enable_log_outputs and self.request_logger: - delta_content = "" + delta_content_parts = [] if delta_message.content: - delta_content = delta_message.content - elif delta_message.tool_calls: - delta_content = "".join( + delta_content_parts.append(delta_message.content) + if delta_message.reasoning_content: + delta_content_parts.append(f"[reasoning: {delta_message.reasoning_content}]") + if delta_message.tool_calls: + tool_args = "".join( tc.function.arguments for tc in delta_message.tool_calls - if tc.function and tc.function.arguments - ) + if tc.function and tc.function.arguments) + if tool_args: + delta_content_parts.append(f"[tool_calls: {tool_args}]") - if delta_content: + if delta_content_parts: + delta_content = " ".join(delta_content_parts) self.request_logger.log_outputs( request_id=request_id, outputs=delta_content, From 32f84ddf26fa9534c6739209200fe728206baa17 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:18:34 +0000 Subject: [PATCH 2/7] Fixed base_index calculation innefficiency Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index e31a9c8587b6..c383023dfc30 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -809,6 +809,15 @@ async def chat_completion_stream_generator( combined_reasoning = "" tool_messages = [] + # Calculate base_index once before the loop + base_index = 0 + for msg in harmony_parser.messages: + if (msg.channel == "commentary" + and msg.recipient + and msg.recipient.startswith( + "functions.")): + base_index += 1 + for group in groups: group_channel = group['channel'] group_recipient = group['recipient'] @@ -822,14 +831,6 @@ async def chat_completion_stream_generator( elif (group_channel == "commentary" and group_recipient and group_recipient.startswith("functions.")): - base_index = 0 - for msg in harmony_parser.messages: - if (msg.channel == "commentary" - and msg.recipient - and msg.recipient.startswith( - "functions.")): - base_index += 1 - if prev_recipient != group_recipient: tool_name = group_recipient.split( "functions.", 1)[1] @@ -843,10 +844,12 @@ async def chat_completion_stream_generator( index=base_index, )) prev_recipient = group_recipient + # Increment index for next tool call + base_index += 1 if group_text: tool_messages.append(DeltaToolCall( - index=base_index, + index=base_index - 1, # Use the index of the current tool call function=DeltaFunctionCall( arguments=group_text), )) From 3e5b7344ec17333ef22431d48cc097b64b4fe650 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:32:41 +0000 Subject: [PATCH 3/7] Avoid off-by-one when streaming ongoing tool call arguments Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index c383023dfc30..fe49d3ccf4fb 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -810,6 +810,7 @@ async def chat_completion_stream_generator( tool_messages = [] # Calculate base_index once before the loop + # This represents the number of completed tool calls base_index = 0 for msg in harmony_parser.messages: if (msg.channel == "commentary" @@ -818,6 +819,9 @@ async def chat_completion_stream_generator( "functions.")): base_index += 1 + # next_tool_index tracks the index for the next NEW tool call + next_tool_index = base_index + for group in groups: group_channel = group['channel'] group_recipient = group['recipient'] @@ -832,6 +836,7 @@ async def chat_completion_stream_generator( and group_recipient.startswith("functions.")): if prev_recipient != group_recipient: + # New tool call - emit the opening message tool_name = group_recipient.split( "functions.", 1)[1] tool_messages.append(DeltaToolCall( @@ -841,15 +846,19 @@ async def chat_completion_stream_generator( name=tool_name, arguments="", ), - index=base_index, + index=next_tool_index, )) prev_recipient = group_recipient - # Increment index for next tool call - base_index += 1 + # Increment for any subsequent new tool calls in this chunk + next_tool_index += 1 if group_text: + # Stream arguments for the ongoing tool call + # The current call index is next_tool_index - 1 if we just + # opened it, OR base_index if continuing from prev chunk + tool_call_index = next_tool_index - 1 if next_tool_index > base_index else base_index tool_messages.append(DeltaToolCall( - index=base_index - 1, # Use the index of the current tool call + index=tool_call_index, function=DeltaFunctionCall( arguments=group_text), )) From c649c72d7ecab8104fd7616d6db1aaf7f5c38f00 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:34:03 +0000 Subject: [PATCH 4/7] Removed magic constant Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index fe49d3ccf4fb..45f6de5e27b2 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -759,7 +759,7 @@ async def chat_completion_stream_generator( harmony_parser.current_recipient, token_delta )) - delta_text = "".join(state[2] for state in token_states) + delta_text = "".join(state for _, _ state in token_states) else: delta_text = output.text From a8c2ef50fb1bb9042876fb5afe68ac5be2406e47 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:40:15 +0000 Subject: [PATCH 5/7] Fixed invalid syntax Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 45f6de5e27b2..6b18d31b2cd8 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -759,7 +759,7 @@ async def chat_completion_stream_generator( harmony_parser.current_recipient, token_delta )) - delta_text = "".join(state for _, _ state in token_states) + delta_text = "".join(state for _, _, state in token_states) else: delta_text = output.text From c2dee6fda72b36e77fb1c4ebd38c8a56eb698a14 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 11:55:21 +0000 Subject: [PATCH 6/7] Fix ruff/codex errors Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 6b18d31b2cd8..c4af7c9786ad 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -6,7 +6,7 @@ import time from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import Sequence as GenericSequence -from typing import Callable, Final, Optional, Union +from typing import Any, Callable, Final, Optional, Union import jinja2 import partial_json_parser @@ -790,11 +790,10 @@ async def chat_completion_stream_generator( if self.use_harmony: # Group consecutive tokens with same channel/recipient - groups = [] + groups: list[dict[str, str]] = [] for channel, recipient, text in token_states: - if not text: - continue - if groups and groups[-1]['channel'] == channel and groups[-1]['recipient'] == recipient: + if (groups and groups[-1]['channel'] == channel + and groups[-1]['recipient'] == recipient): groups[-1]['text'] += text else: groups.append({ @@ -849,14 +848,16 @@ async def chat_completion_stream_generator( index=next_tool_index, )) prev_recipient = group_recipient - # Increment for any subsequent new tool calls in this chunk + # Increment for subsequent new tool calls next_tool_index += 1 if group_text: # Stream arguments for the ongoing tool call - # The current call index is next_tool_index - 1 if we just - # opened it, OR base_index if continuing from prev chunk - tool_call_index = next_tool_index - 1 if next_tool_index > base_index else base_index + # Use next_tool_index - 1 if we opened a call + # this chunk, else base_index for ongoing + tool_call_index = (next_tool_index - 1 + if next_tool_index > base_index + else base_index) tool_messages.append(DeltaToolCall( index=tool_call_index, function=DeltaFunctionCall( @@ -865,7 +866,7 @@ async def chat_completion_stream_generator( # Combine all non-empty fields into a single message if combined_content or combined_reasoning or tool_messages: - delta_kwargs = {} + delta_kwargs: dict[str, Any] = {} if combined_content: delta_kwargs['content'] = combined_content if combined_reasoning: @@ -1117,7 +1118,8 @@ async def chat_completion_stream_generator( if delta_message.content: delta_content_parts.append(delta_message.content) if delta_message.reasoning_content: - delta_content_parts.append(f"[reasoning: {delta_message.reasoning_content}]") + reasoning = delta_message.reasoning_content + delta_content_parts.append(f"[reasoning: {reasoning}]") if delta_message.tool_calls: tool_args = "".join( tc.function.arguments From 3ad1d7b2dd1ad87cb21a4f8ad220800aebf41446 Mon Sep 17 00:00:00 2001 From: Aleksandr Samarin Date: Mon, 6 Oct 2025 12:17:24 +0000 Subject: [PATCH 7/7] Increment tool index when new call follows ongoing call Signed-off-by: Aleksandr Samarin --- vllm/entrypoints/openai/serving_chat.py | 139 +++++++++++++++--------- 1 file changed, 87 insertions(+), 52 deletions(-) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index c4af7c9786ad..c8485dc00d80 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -754,12 +754,14 @@ async def chat_completion_stream_generator( for token_id in output.token_ids: harmony_parser.process(token_id) token_delta = harmony_parser.last_content_delta or "" - token_states.append(( - harmony_parser.current_channel, - harmony_parser.current_recipient, - token_delta - )) - delta_text = "".join(state for _, _, state in token_states) + token_states.append( + ( + harmony_parser.current_channel, + harmony_parser.current_recipient, + token_delta, + ) + ) + delta_text = "".join(delta for _, _, delta in token_states) else: delta_text = output.text @@ -792,15 +794,20 @@ async def chat_completion_stream_generator( # Group consecutive tokens with same channel/recipient groups: list[dict[str, str]] = [] for channel, recipient, text in token_states: - if (groups and groups[-1]['channel'] == channel - and groups[-1]['recipient'] == recipient): - groups[-1]['text'] += text + if ( + groups + and groups[-1]["channel"] == channel + and groups[-1]["recipient"] == recipient + ): + groups[-1]["text"] += text else: - groups.append({ - 'channel': channel, - 'recipient': recipient, - 'text': text - }) + groups.append( + { + "channel": channel, + "recipient": recipient, + "text": text, + } + ) # Process each group and create delta messages delta_message = None @@ -809,70 +816,97 @@ async def chat_completion_stream_generator( tool_messages = [] # Calculate base_index once before the loop - # This represents the number of completed tool calls + # This counts completed tool calls in messages base_index = 0 for msg in harmony_parser.messages: - if (msg.channel == "commentary" - and msg.recipient - and msg.recipient.startswith( - "functions.")): + if ( + msg.channel == "commentary" + and msg.recipient + and msg.recipient.startswith("functions.") + ): base_index += 1 - # next_tool_index tracks the index for the next NEW tool call - next_tool_index = base_index + # If there's an ongoing tool call from previous chunk, + # the next new tool call starts at base_index + 1 + if prev_recipient and prev_recipient.startswith("functions."): + next_tool_index = base_index + 1 + # Ongoing call is at base_index + ongoing_tool_index = base_index + else: + # No ongoing call, next new call is at base_index + next_tool_index = base_index + ongoing_tool_index = None for group in groups: - group_channel = group['channel'] - group_recipient = group['recipient'] - group_text = group['text'] + group_channel = group["channel"] + group_recipient = group["recipient"] + group_text = group["text"] if group_channel == "final": combined_content += group_text elif group_channel == "analysis": if request.include_reasoning: combined_reasoning += group_text - elif (group_channel == "commentary" and group_recipient - and group_recipient.startswith("functions.")): - + elif ( + group_channel == "commentary" + and group_recipient + and group_recipient.startswith("functions.") + ): + opened_new_call = False if prev_recipient != group_recipient: # New tool call - emit the opening message - tool_name = group_recipient.split( - "functions.", 1)[1] - tool_messages.append(DeltaToolCall( - id=make_tool_call_id(), - type="function", - function=DeltaFunctionCall( - name=tool_name, - arguments="", - ), - index=next_tool_index, - )) + tool_name = group_recipient.split("functions.", 1)[ + 1 + ] + tool_messages.append( + DeltaToolCall( + id=make_tool_call_id(), + type="function", + function=DeltaFunctionCall( + name=tool_name, + arguments="", + ), + index=next_tool_index, + ) + ) + opened_new_call = True prev_recipient = group_recipient # Increment for subsequent new tool calls next_tool_index += 1 if group_text: # Stream arguments for the ongoing tool call - # Use next_tool_index - 1 if we opened a call - # this chunk, else base_index for ongoing - tool_call_index = (next_tool_index - 1 - if next_tool_index > base_index - else base_index) - tool_messages.append(DeltaToolCall( - index=tool_call_index, - function=DeltaFunctionCall( - arguments=group_text), - )) + if opened_new_call: + # Just opened in this group + tool_call_index = next_tool_index - 1 + else: + # Continuing from previous chunk + # If ongoing_tool_index is None here, it means + # we're continuing a call but prev_recipient + # wasn't a function. Use base_index. + tool_call_index = ( + ongoing_tool_index + if ongoing_tool_index is not None + else base_index + ) + tool_messages.append( + DeltaToolCall( + index=tool_call_index, + function=DeltaFunctionCall( + arguments=group_text + ), + ) + ) # Combine all non-empty fields into a single message if combined_content or combined_reasoning or tool_messages: delta_kwargs: dict[str, Any] = {} if combined_content: - delta_kwargs['content'] = combined_content + delta_kwargs["content"] = combined_content if combined_reasoning: - delta_kwargs['reasoning_content'] = combined_reasoning + delta_kwargs["reasoning_content"] = combined_reasoning if tool_messages: - delta_kwargs['tool_calls'] = tool_messages + delta_kwargs["tool_calls"] = tool_messages harmony_tools_streamed[i] = True delta_message = DeltaMessage(**delta_kwargs) @@ -1124,7 +1158,8 @@ async def chat_completion_stream_generator( tool_args = "".join( tc.function.arguments for tc in delta_message.tool_calls - if tc.function and tc.function.arguments) + if tc.function and tc.function.arguments + ) if tool_args: delta_content_parts.append(f"[tool_calls: {tool_args}]")