Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions lib/llm/src/protocols/openai/chat_completions/jail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,17 @@ impl ChoiceJailState {
struct ChoiceJailStateCollection {
/// Vec of states, always kept sorted by choice index for deterministic iteration
states: Vec<ChoiceJailState>,
/// Track if any choice has emitted a finish_reason (per choice index)
finish_reason_emitted: std::collections::HashMap<u32, bool>,
}

impl ChoiceJailStateCollection {
/// Create a new empty collection
fn new() -> Self {
Self { states: Vec::new() }
Self {
states: Vec::new(),
finish_reason_emitted: std::collections::HashMap::new(),
}
}

/// Get or create state for a choice index
Expand All @@ -394,6 +399,19 @@ impl ChoiceJailStateCollection {
}
}
}

/// Check if a finish_reason has already been emitted for this choice
fn has_emitted_finish_reason(&self, index: u32) -> bool {
self.finish_reason_emitted
.get(&index)
.copied()
.unwrap_or(false)
}

/// Mark that a finish_reason has been emitted for this choice
fn mark_finish_reason_emitted(&mut self, index: u32) {
self.finish_reason_emitted.insert(index, true);
}
}

/// Emission mode for handling multiple choices
Expand Down Expand Up @@ -456,6 +474,17 @@ impl JailedStream {

// Process each choice independently using the new architecture
for choice in &chat_response.choices {
// if we've already emitted a finish_reason for this choice,
// skip any subsequent chunks with finish_reason
if choice.finish_reason.is_some() && choice_states.has_emitted_finish_reason(choice.index) {
tracing::debug!(
"Skipping chunk with finish_reason {:?} for choice {} - already emitted finish_reason",
choice.finish_reason,
choice.index
);
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a glance, the logic in vllm seems to be that they actually emit finish_reason=stop, but they replace it with finish_reason=tool_calls if it came from a tool calls response: https://github.com/vllm-project/vllm/blob/a404e2c0f1bf100d28453a5a2ab7bd2a29d9aa31/vllm/entrypoints/openai/serving_chat.py#L1512-L1529

Here we seem to generate an extra chunk even though we've already "finished" the response - are we doing something wrong? Are we hiding a bug by doing this marking/filtering? Why are we emitting 2 "finish" responses in general?

Copy link
Contributor

@rmccorm4 rmccorm4 Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR;

  • It seems like vllm emits 1 final response and modifies finish_reason contextually for tool calling
  • It seems like we are emitting 2 or more final responses, and just skipping/hiding it here. This feels off to me, unless I'm missing something and this is also what other solutions are doing. Why don't we (aggregator, postprocessor, engine, etc.) know that we're done after emitting the finish_reason=tool_calls response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually yes. There is always one empty chunk that gets emitted in the stream and for some cases, they are trailing leftover markers that are there, that gets emitted.

I haven't seen any stream being continued after tool calls. So for now it can be a safe fix to have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, Sglang/vLLM returns a package with empty content containing a finish_reason=stop at the end of a streaming request. (In special cases, such as when speculative sampling is enabled or when the output reaches max_tokens, the content of the last package is not empty.)

When we enable tool-call and JailedStream, packages with non-empty content are processed by JailedStream and merged into a single tool-call chunk with finish_reason == "tool_calls". But the package with empty content (containing finish_reason == "stop") is returned directly. This is why we observe two packages with finish_reason.
@rmccorm4 @ayushag-nv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image Is there a simpler way to solve this problem by skipping directly to the empty packet?


if let Some(ref content) = choice.delta.content {
let choice_state = choice_states.get_or_create_state(choice.index);

Expand Down Expand Up @@ -509,8 +538,16 @@ impl JailedStream {
last_annotated_event.clone(),
last_annotated_comment.clone(),
);
let responses = self.emit_choice_emissions(tool_content_emissions, chat_response, preserved_metadata);
let responses = self.emit_choice_emissions(tool_content_emissions.clone(), chat_response, preserved_metadata);
for emitted_response in responses {
// Mark finish_reason as emitted for choices that have it
if let Some(ref data) = emitted_response.data {
for choice in &data.choices {
if choice.finish_reason.is_some() {
choice_states.mark_finish_reason_emitted(choice.index);
}
}
}
yield emitted_response;
}
}
Expand All @@ -524,6 +561,14 @@ impl JailedStream {
);
let responses = self.emit_choice_emissions(trailing_emissions, chat_response, preserved_metadata);
for emitted_response in responses {
// Mark finish_reason as emitted for choices that have it
if let Some(ref data) = emitted_response.data {
for choice in &data.choices {
if choice.finish_reason.is_some() {
choice_states.mark_finish_reason_emitted(choice.index);
}
}
}
yield emitted_response;
}
}
Expand All @@ -533,6 +578,14 @@ impl JailedStream {
let current_metadata = (response.id.clone(), response.event.clone(), response.comment.clone());
let responses = self.emit_choice_emissions(passthrough_emissions, chat_response, current_metadata);
for emitted_response in responses {
// Mark finish_reason as emitted for choices that have it
if let Some(ref data) = emitted_response.data {
for choice in &data.choices {
if choice.finish_reason.is_some() {
choice_states.mark_finish_reason_emitted(choice.index);
}
}
}
yield emitted_response;
}
}
Expand Down Expand Up @@ -568,6 +621,14 @@ impl JailedStream {
let final_metadata = (last_annotated_id, last_annotated_event, last_annotated_comment);
let responses = self.emit_choice_emissions(final_emissions, &dummy_response, final_metadata);
for emitted_response in responses {
// Mark finish_reason as emitted for choices that have it
if let Some(ref data) = emitted_response.data {
for choice in &data.choices {
if choice.finish_reason.is_some() {
choice_states.mark_finish_reason_emitted(choice.index);
}
}
}
yield emitted_response;
}
}
Expand Down
133 changes: 132 additions & 1 deletion lib/llm/tests/test_streaming_tool_parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ across backends.

*/

use dynamo_async_openai::types::ChatChoiceStream;
use dynamo_async_openai::types::{ChatChoiceStream, FinishReason};
use dynamo_llm::preprocessor::OpenAIPreprocessor;
use dynamo_llm::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use dynamo_runtime::protocols::annotated::Annotated;
Expand Down Expand Up @@ -304,6 +304,18 @@ mod tests {
aggregated.has_tool_calls, expected_has_tool_calls,
"Tool calls presence should match expected value"
);

// Verify last chunk has Stop finish_reason for no-tool cases
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::Stop),
"Last chunk should have Stop finish_reason for non-tool call case"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -360,6 +372,22 @@ mod tests {

// Verify tool calls
assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify the last chunk has ToolCalls finish_reason (empty Stop chunks should be filtered)
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::ToolCalls),
"Last chunk should have ToolCalls finish_reason (empty Stop chunks should be filtered)"
);
assert!(
choice.delta.tool_calls.is_some(),
"Last chunk with ToolCalls finish_reason must have tool_calls data"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -403,6 +431,18 @@ mod tests {
aggregated.has_tool_calls, expected_has_tool_calls,
"Tool calls presence should match expected value"
);

// Verify last chunk has Stop finish_reason for no-tool cases
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::Stop),
"Last chunk should have Stop finish_reason for non-tool call case"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -455,6 +495,22 @@ mod tests {

// Verify tool calls
assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify the last chunk has ToolCalls finish_reason (empty Stop chunks should be filtered)
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::ToolCalls),
"Last chunk should have ToolCalls finish_reason (empty Stop chunks should be filtered)"
);
assert!(
choice.delta.tool_calls.is_some(),
"Last chunk with ToolCalls finish_reason must have tool_calls data"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -511,6 +567,18 @@ mod tests {
);

assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify last chunk has Stop finish_reason for no-tool cases
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::Stop),
"Last chunk should have Stop finish_reason for non-tool call case"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -567,6 +635,23 @@ mod tests {
);

assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify there is a chunk with ToolCalls finish_reason and tool call data
let has_tool_calls_chunk = output_chunks.iter().any(|chunk| {
chunk
.data
.as_ref()
.and_then(|d| d.choices.first())
.map(|c| {
c.finish_reason == Some(FinishReason::ToolCalls)
&& c.delta.tool_calls.is_some()
})
.unwrap_or(false)
});
assert!(
has_tool_calls_chunk,
"Should have a chunk with ToolCalls finish_reason and tool_calls data"
);
}

#[tokio::test]
Expand Down Expand Up @@ -620,6 +705,18 @@ mod tests {
);

assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify last chunk has Stop finish_reason for no-tool cases
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::Stop),
"Last chunk should have Stop finish_reason for non-tool call case"
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -674,6 +771,23 @@ mod tests {
"Tool calls presence should match expected value"
);
assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify there is a chunk with ToolCalls finish_reason and tool call data
let has_tool_calls_chunk = output_chunks.iter().any(|chunk| {
chunk
.data
.as_ref()
.and_then(|d| d.choices.first())
.map(|c| {
c.finish_reason == Some(FinishReason::ToolCalls)
&& c.delta.tool_calls.is_some()
})
.unwrap_or(false)
});
assert!(
has_tool_calls_chunk,
"Should have a chunk with ToolCalls finish_reason and tool_calls data"
);
}

#[tokio::test]
Expand Down Expand Up @@ -726,5 +840,22 @@ mod tests {

// Verify tool calls
assert_tool_calls(&aggregated.tool_calls, &test_data.expected_tool_calls);

// Verify the last chunk has ToolCalls finish_reason (empty Stop chunks should be filtered)
let last_chunk = output_chunks.last().expect("Should have at least one chunk");
if let Some(data) = &last_chunk.data
&& let Some(choice) = data.choices.first()
{
assert_eq!(
choice.finish_reason,
Some(FinishReason::ToolCalls),
"Last chunk should have ToolCalls finish_reason (empty Stop chunks should be filtered)"
);
assert!(
choice.delta.tool_calls.is_some(),
"Last chunk with ToolCalls finish_reason must have tool_calls data"
);
}
}

}
Loading