Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ impl OpenAIPreprocessor {
let jail = JailedStream::builder()
.tool_call_parser(tool_call_parser)
.build();
jail.apply(stream)
jail.apply_with_finish_reason(stream)
}

// Motivation: Each transformation on the stream should be a separate step to allow for more flexibility
Expand Down
81 changes: 69 additions & 12 deletions lib/llm/src/protocols/openai/chat_completions/jail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use dynamo_parsers::tool_calling::{
};
use dynamo_runtime::protocols::annotated::Annotated;
use futures::{Stream, StreamExt};
use std::collections::HashMap;

use crate::utils::{MarkerMatcher, MatchResult};

Expand Down Expand Up @@ -72,6 +73,8 @@ struct ChoiceJailState {
accumulated_content: String,
/// Buffer for partial marker matches across chunks
partial_match_buffer: String,
/// Stream finish reason
stream_finish_reason: Option<FinishReason>,
}

fn create_choice_stream(
Expand Down Expand Up @@ -106,6 +109,7 @@ impl ChoiceJailState {
is_jailed: false,
accumulated_content: String::new(),
partial_match_buffer: String::new(),
stream_finish_reason: None,
}
}

Expand All @@ -130,7 +134,6 @@ impl ChoiceJailState {
jail_stream: &JailedStream,
) -> Vec<ChoiceEmission> {
let mut emissions = Vec::new();

if !self.is_jailed {
// Use the marker matcher to detect complete/partial markers
let match_result = jail_stream
Expand All @@ -152,7 +155,7 @@ impl ChoiceJailState {
choice.delta.role,
&prefix,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(prefix_choice));
Expand Down Expand Up @@ -192,7 +195,7 @@ impl ChoiceJailState {
choice.delta.role,
trailing_part,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::Trailing(trailing_choice));
Expand Down Expand Up @@ -224,7 +227,7 @@ impl ChoiceJailState {
choice.delta.role,
&prefix,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(prefix_choice));
Expand Down Expand Up @@ -267,7 +270,7 @@ impl ChoiceJailState {
choice.delta.role,
&content,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::PassThrough(pass_through_choice));
Expand Down Expand Up @@ -312,7 +315,7 @@ impl ChoiceJailState {
choice.delta.role,
trailing_part,
None,
None,
choice.finish_reason,
choice.logprobs.clone(),
);
emissions.push(ChoiceEmission::Trailing(trailing_choice));
Expand All @@ -323,7 +326,6 @@ impl ChoiceJailState {
}
// If not unjailing, don't emit anything (still accumulating)
}

emissions
}

Expand All @@ -342,7 +344,7 @@ impl ChoiceJailState {
Some(Role::Assistant),
&self.accumulated_content,
None,
None,
self.stream_finish_reason, // For the accumulated content, assign the original stream finish reason, otherwise it will get lost
None,
);

Expand Down Expand Up @@ -428,6 +430,19 @@ impl JailedStream {
JailedStreamBuilder::new()
}

/// Apply jail stream transformation with finish_reason fix
/// This is a convenience method that applies both apply() and fix_finish_reason()
pub fn apply_with_finish_reason<S>(
self,
stream: S,
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{
let jailed_stream = self.apply(stream);
JailedStream::fix_finish_reason(jailed_stream)
}

/// Apply the jail transformation to a stream of chat completion responses
/// Consumes self and returns the transformed stream
pub fn apply<S>(
Expand All @@ -449,6 +464,7 @@ impl JailedStream {
// Pin the stream for iteration (stack pinning is more efficient)
tokio::pin!(stream);


// Process each item in the stream
while let Some(response) = stream.next().await {
if let Some(chat_response) = response.data.as_ref() {
Expand All @@ -467,6 +483,9 @@ impl JailedStream {
last_annotated_comment = response.comment.clone();
}

// Track actual stream finish reason in the choice state
choice_state.stream_finish_reason = choice.finish_reason;

// Process this choice and get emissions
let emissions = choice_state.process_content(choice, content, &self).await;
all_emissions.extend(emissions);
Expand Down Expand Up @@ -707,16 +726,16 @@ impl JailedStream {
}),
})
.collect();

// Create choice with tool calls
return create_choice_stream(
let choice = create_choice_stream(
choice_index,
Some(Role::Assistant),
normal_text.as_deref().unwrap_or(""),
Some(tool_call_chunks),
Some(FinishReason::ToolCalls),
None,
None,
);
return choice;
}

// No tool calls found or parsing failed, return content choice
Expand All @@ -725,7 +744,7 @@ impl JailedStream {
Some(Role::Assistant),
accumulated_content,
None,
None,
base_choice.finish_reason,
base_choice.logprobs.clone(),
)
}
Expand All @@ -745,6 +764,44 @@ impl JailedStream {
}
false
}

/// Post-processor that sets finish_reason to ToolCalls when tool calls were emitted
/// This should be called after apply() to fix the finish_reason for tool call chunks
pub fn fix_finish_reason<S>(
input_stream: S,
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
where
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
{
stream! {
tokio::pin!(input_stream);
let mut has_tool_calls_per_choice: HashMap<u32, bool> = HashMap::new();

while let Some(mut response) = input_stream.next().await {
// Track if any choice emitted tool calls
if let Some(ref data) = response.data {
for choice in &data.choices {
if choice.delta.tool_calls.is_some() {
has_tool_calls_per_choice.insert(choice.index, true);
}
}
}

// If this chunk has finish_reason and the choice had tool calls, override to ToolCalls
if let Some(ref mut data) = response.data {
for choice in &mut data.choices {
if choice.finish_reason.is_some() && choice.finish_reason == Some(FinishReason::Stop)
&& has_tool_calls_per_choice.get(&choice.index).copied().unwrap_or(false)
{
choice.finish_reason = Some(FinishReason::ToolCalls);
}
}
}

yield response;
}
}
}
}

/// Builder for configuring a JailedStream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"request_id": "8f33c28b-cb52-4272-9ac5-0cb9f80386d3",
"expected_output": {
"normal_content": " the requested format.\n</think>\n\n<tool_call>\n\n{\"name\":\"get"
},
"input_stream": [
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" requested","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" format","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":".\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"</think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"<tool_call>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"{\"","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"name","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\":","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" \"","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"get","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}, "finish_reason":"length"}]}}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"request_id": "8f33c28b-cb52-4272-9ac5-0cb9f80386d3",
"expected_output": {
"normal_content": "<think>\nOkay, the user is asking for the weather in San Francisco in"
},
"input_stream": [
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"<think>","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"\n","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":"Okay","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":",","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" user","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" is","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" asking","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" for","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" the","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" weather","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null}}]}},
{"data":{"id":"chatcmpl-8f33c28b-cb52-4272-9ac5-0cb9f80386d3","choices":[{"index":0,"delta":{"content":" in","function_call":null,"tool_calls":null,"role":"assistant","refusal":null,"reasoning_content":null},"finish_reason":"length"}]}}
]
}
Loading
Loading