Skip to content
Open
2 changes: 1 addition & 1 deletion lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ impl OpenAIPreprocessor {
let jail = JailedStream::builder()
.tool_call_parser(tool_call_parser)
.build();
jail.apply(stream)
jail.apply_with_finish_reason(stream)
}

// Motivation: Each transformation on the stream should be a separate step to allow for more flexibility
Expand Down
81 changes: 69 additions & 12 deletions lib/llm/src/protocols/openai/chat_completions/jail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use dynamo_parsers::tool_calling::{
};
use dynamo_runtime::protocols::annotated::Annotated;
use futures::{Stream, StreamExt};
use std::collections::HashMap;

use crate::utils::{MarkerMatcher, MatchResult};

Expand Down Expand Up @@ -72,6 +73,8 @@ struct ChoiceJailState {
accumulated_content: String,
/// Buffer for partial marker matches across chunks
partial_match_buffer: String,
/// Stream finish reason
stream_finish_reason: Option<FinishReason>,
}

fn create_choice_stream(
Expand Down Expand Up @@ -106,6 +109,7 @@ impl ChoiceJailState {
is_jailed: false,
accumulated_content: String::new(),
partial_match_buffer: String::new(),
stream_finish_reason: None,
}
}

Expand All @@ -130,7 +134,6 @@ impl ChoiceJailState {
jail_stream: &JailedStream,
) -> Vec<ChoiceEmission> {
let mut emissions = Vec::new();

if !self.is_jailed {
// Use the marker matcher to detect complete/partial markers
let match_result = jail_stream
Expand All @@ -152,7 +155,7 @@ impl ChoiceJailState {
choice.delta.role,
&prefix,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(prefix_choice));
Expand Down Expand Up @@ -192,7 +195,7 @@ impl ChoiceJailState {
choice.delta.role,
trailing_part,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::Trailing(trailing_choice));
Expand Down Expand Up @@ -224,7 +227,7 @@ impl ChoiceJailState {
choice.delta.role,
&prefix,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(prefix_choice));
Expand Down Expand Up @@ -267,7 +270,7 @@ impl ChoiceJailState {
choice.delta.role,
&content,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(pass_through_choice));
Expand Down Expand Up @@ -312,7 +315,7 @@ impl ChoiceJailState {
choice.delta.role,
trailing_part,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::Trailing(trailing_choice));
Expand All @@ -323,7 +326,6 @@ impl ChoiceJailState {
}
// If not unjailing, don't emit anything (still accumulating)
}

emissions
}

Expand All @@ -342,7 +344,7 @@ impl ChoiceJailState {
Some(Role::Assistant),
&self.accumulated_content,
None,
None,
self.stream_finish_reason, // For the accumulated content, assign the original stream finish reason, otherwise it will get lost
None,
);

Expand Down Expand Up @@ -428,6 +430,19 @@ impl JailedStream {
JailedStreamBuilder::new()
}

/// Apply jail stream transformation with finish_reason fix
/// This is a convenience method that applies both apply() and fix_finish_reason()
pub fn apply_with_finish_reason<S>(
self,
stream: S,
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{
let jailed_stream = self.apply(stream);
JailedStream::fix_finish_reason(jailed_stream)
}

/// Apply the jail transformation to a stream of chat completion responses
/// Consumes self and returns the transformed stream
pub fn apply<S>(
Expand All @@ -449,6 +464,7 @@ impl JailedStream {
// Pin the stream for iteration (stack pinning is more efficient)
tokio::pin!(stream);


// Process each item in the stream
while let Some(response) = stream.next().await {
if let Some(chat_response) = response.data.as_ref() {
Expand All @@ -467,6 +483,9 @@ impl JailedStream {
last_annotated_comment = response.comment.clone();
}

// Track actual stream finish reason in the choice state
choice_state.stream_finish_reason = choice.finish_reason;

// Process this choice and get emissions
let emissions = choice_state.process_content(choice, content, &self).await;
all_emissions.extend(emissions);
Expand Down Expand Up @@ -707,16 +726,16 @@ impl JailedStream {
}),
})
.collect();

// Create choice with tool calls
return create_choice_stream(
let choice = create_choice_stream(
choice_index,
Some(Role::Assistant),
normal_text.as_deref().unwrap_or(""),
Some(tool_call_chunks),
Some(FinishReason::ToolCalls),
None,
None,
);
return choice;
}

// No tool calls found or parsing failed, return content choice
Expand All @@ -725,7 +744,7 @@ impl JailedStream {
Some(Role::Assistant),
accumulated_content,
None,
None,
base_choice.finish_reason,
base_choice.logprobs.clone(),
)
}
Expand All @@ -745,6 +764,44 @@ impl JailedStream {
}
false
}

/// Post-processor that sets finish_reason to ToolCalls when tool calls were emitted
/// This should be called after apply() to fix the finish_reason for tool call chunks
pub fn fix_finish_reason<S>(
input_stream: S,
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{
stream! {
tokio::pin!(input_stream);
let mut has_tool_calls_per_choice: HashMap<u32, bool> = HashMap::new();

while let Some(mut response) = input_stream.next().await {
// Track if any choice emitted tool calls
if let Some(ref data) = response.data {
for choice in &data.choices {
if choice.delta.tool_calls.is_some() {
has_tool_calls_per_choice.insert(choice.index, true);
}
}
}

// If this chunk has finish_reason and the choice had tool calls, override to ToolCalls
if let Some(ref mut data) = response.data {
for choice in &mut data.choices {
if choice.finish_reason.is_some() && choice.finish_reason == Some(FinishReason::Stop)
&& has_tool_calls_per_choice.get(&choice.index).copied().unwrap_or(false)
{
choice.finish_reason = Some(FinishReason::ToolCalls);
}
}
}

yield response;
}
}
}
}

/// Builder for configuring a JailedStream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request_id": "8f33c28b-cb52-4272-9ac5-0cb9f80386d3",
"expected_output": {
"normal_content": " the requested format.\n</think>\n\n<tool_call>\n\n{\"name\":\"get"
},
"input_stream": [
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" requested","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" format","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":".\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"</think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"<tool_call>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"{\"","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"name","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\":","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" \"","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"get","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}, "finish_reason":"length"}]}}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"request_id": "8f33c28b-cb52-4272-9ac5-0cb9f80386d3",
"expected_output": {
"normal_content": "<think>\nOkay, the user is asking for the weather in San Francisco in"
},
"input_stream": [
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"<think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"Okay","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":",","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" user","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" is","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" asking","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" for","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" weather","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" in","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null},"finish_reason":"length"}]}}
]
}
Loading
Loading