Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/agents/extensions/models/litellm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@

class InternalChatCompletionMessage(ChatCompletionMessage):
"""
An internal subclass to carry reasoning_content without modifying the original model.
"""
An internal subclass to carry reasoning_content and thinking_blocks without modifying the original model.
""" # noqa: E501

reasoning_content: str
thinking_blocks: list[dict[str, Any]] | None = None


class LitellmModel(Model):
Expand Down Expand Up @@ -401,6 +402,26 @@ def convert_message_to_openai(
if hasattr(message, "reasoning_content") and message.reasoning_content:
reasoning_content = message.reasoning_content

# Extract full thinking blocks including signatures (for Anthropic)
thinking_blocks: list[dict[str, Any]] | None = None
if hasattr(message, "thinking_blocks") and message.thinking_blocks:
# Convert thinking blocks to dict format for compatibility
thinking_blocks = []
for block in message.thinking_blocks:
if isinstance(block, dict):
thinking_blocks.append(cast(dict[str, Any], block))
else:
# Convert object to dict by accessing its attributes
block_dict: dict[str, Any] = {}
if hasattr(block, '__dict__'):
block_dict = dict(block.__dict__.items())
elif hasattr(block, 'model_dump'):
block_dict = block.model_dump()
else:
# Last resort: convert to string representation
block_dict = {"thinking": str(block)}
thinking_blocks.append(block_dict)

return InternalChatCompletionMessage(
content=message.content,
refusal=refusal,
Expand All @@ -409,6 +430,7 @@ def convert_message_to_openai(
audio=message.get("audio", None), # litellm deletes audio if not present
tool_calls=tool_calls,
reasoning_content=reasoning_content,
thinking_blocks=thinking_blocks,
)

@classmethod
Expand Down
22 changes: 16 additions & 6 deletions src/agents/models/chatcmpl_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,24 @@ def message_to_output_items(cls, message: ChatCompletionMessage) -> list[TRespon

# Handle reasoning content if available
if hasattr(message, "reasoning_content") and message.reasoning_content:
items.append(
ResponseReasoningItem(
id=FAKE_RESPONSES_ID,
summary=[Summary(text=message.reasoning_content, type="summary_text")],
type="reasoning",
)
reasoning_item = ResponseReasoningItem(
id=FAKE_RESPONSES_ID,
summary=[Summary(text=message.reasoning_content, type="summary_text")],
type="reasoning",
)

# Store full thinking blocks for Anthropic compatibility
if hasattr(message, "thinking_blocks") and message.thinking_blocks:
# Store thinking blocks in the reasoning item's content
# Convert thinking blocks to Content objects
from openai.types.responses.response_reasoning_item import Content
reasoning_item.content = [
Content(text=str(block.get("thinking", "")), type="reasoning_text")
for block in message.thinking_blocks
]

items.append(reasoning_item)

message_item = ResponseOutputMessage(
id=FAKE_RESPONSES_ID,
content=[],
Expand Down
49 changes: 36 additions & 13 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from .handoffs import Handoff, HandoffInputFilter, handoff
from .items import (
HandoffCallItem,
ItemHelpers,
ModelResponse,
RunItem,
Expand All @@ -60,7 +61,12 @@
from .models.multi_provider import MultiProvider
from .result import RunResult, RunResultStreaming
from .run_context import RunContextWrapper, TContext
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent
from .stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
RunItemStreamEvent,
StreamEvent,
)
from .tool import Tool
from .tracing import Span, SpanError, agent_span, get_current_trace, trace
from .tracing.span_data import AgentSpanData
Expand Down Expand Up @@ -1095,14 +1101,19 @@ async def _run_single_turn_streamed(
context_wrapper=context_wrapper,
run_config=run_config,
tool_use_tracker=tool_use_tracker,
event_queue=streamed_result._event_queue,
)

if emitted_tool_call_ids:
import dataclasses as _dc
import dataclasses as _dc

# Filter out items that have already been sent to avoid duplicates
items_to_filter = single_step_result.new_step_items

filtered_items = [
if emitted_tool_call_ids:
# Filter out tool call items that were already emitted during streaming
items_to_filter = [
item
for item in single_step_result.new_step_items
for item in items_to_filter
if not (
isinstance(item, ToolCallItem)
and (
Expand All @@ -1114,15 +1125,17 @@ async def _run_single_turn_streamed(
)
]

single_step_result_filtered = _dc.replace(
single_step_result, new_step_items=filtered_items
)
# Filter out HandoffCallItem to avoid duplicates (already sent earlier)
items_to_filter = [
item for item in items_to_filter
if not isinstance(item, HandoffCallItem)
]

RunImpl.stream_step_result_to_queue(
single_step_result_filtered, streamed_result._event_queue
)
else:
RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
# Create filtered result and send to queue
filtered_result = _dc.replace(
single_step_result, new_step_items=items_to_filter
)
RunImpl.stream_step_result_to_queue(filtered_result, streamed_result._event_queue)
return single_step_result

@classmethod
Expand Down Expand Up @@ -1207,6 +1220,7 @@ async def _get_single_step_result_from_response(
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
tool_use_tracker: AgentToolUseTracker,
event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] | None = None,
) -> SingleStepResult:
processed_response = RunImpl.process_model_response(
agent=agent,
Expand All @@ -1218,6 +1232,15 @@ async def _get_single_step_result_from_response(

tool_use_tracker.add_tool_use(agent, processed_response.tools_used)

# Send handoff items immediately for streaming, but avoid duplicates
if event_queue is not None and processed_response.new_items:
handoff_items = [
item for item in processed_response.new_items
if isinstance(item, HandoffCallItem)
]
if handoff_items:
RunImpl.stream_step_items_to_queue(cast(list[RunItem], handoff_items), event_queue)

return await RunImpl.execute_tools_and_side_effects(
agent=agent,
original_input=original_input,
Expand Down
101 changes: 101 additions & 0 deletions tests/test_anthropic_thinking_blocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Test for Anthropic thinking blocks in conversation history.

This test validates the fix for issue #1704:
- Thinking blocks are properly preserved from Anthropic responses
- Reasoning items are stored in session but not sent back in conversation history
- Non-reasoning models are unaffected
- Token usage is not increased for non-reasoning scenarios
"""

from __future__ import annotations

from typing import Any

from agents.extensions.models.litellm_model import InternalChatCompletionMessage
from agents.models.chatcmpl_converter import Converter


def create_mock_anthropic_response_with_thinking() -> InternalChatCompletionMessage:
"""Create a mock Anthropic response with thinking blocks (like real response)."""
message = InternalChatCompletionMessage(
role="assistant",
content="I'll check the weather in Paris for you.",
reasoning_content="I need to call the weather function for Paris",
thinking_blocks=[
{
"type": "thinking",
"thinking": "I need to call the weather function for Paris",
"signature": "EqMDCkYIBxgCKkBAFZO8EyZwN1hiLctq0YjZnP0KeKgprr+C0PzgDv4GSggnFwrPQHIZ9A5s+paH+DrQBI1+Vnfq3mLAU5lJnoetEgzUEWx/Cv1022ieAvcaDCXdmg1XkMK0tZ8uCCIwURYAAX0uf2wFdnWt9n8whkhmy8ARQD5G2za4R8X5vTqBq8jpJ15T3c1Jcf3noKMZKooCWFVf0/W5VQqpZTgwDkqyTau7XraS+u48YlmJGSfyWMPO8snFLMZLGaGmVJgHfEI5PILhOEuX/R2cEeLuC715f51LMVuxTNzlOUV/037JV6P2ten7D66FnWU9JJMMJJov+DjMb728yQFHwHz4roBJ5ePHaaFP6mDwpqYuG/hai6pVv2TAK1IdKUui/oXrYtU+0gxb6UF2kS1bspqDuN++R8JdL7CMSU5l28pQ8TsH1TpVF4jZpsFbp1Du4rQIULFsCFFg+Edf9tPgyKZOq6xcskIjT7oylAPO37/jhdNknDq2S82PaSKtke3ViOigtM5uJfG521ZscBJQ1K3kwoI/repIdV9PatjOYdsYAQ==", # noqa: E501
}
],
)
return message


def test_converter_skips_reasoning_items():
"""
Unit test to verify that reasoning items are skipped when converting items to messages.
"""
# Create test items including a reasoning item
test_items: list[dict[str, Any]] = [
{"role": "user", "content": "Hello"},
{
"id": "reasoning_123",
"type": "reasoning",
"summary": [{"text": "User said hello", "type": "summary_text"}],
},
{
"id": "msg_123",
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": "Hi there!"}],
"status": "completed",
},
]

# Convert to messages
messages = Converter.items_to_messages(test_items) # type: ignore[arg-type]

# Should have user message and assistant message, but no reasoning content
assert len(messages) == 2
assert messages[0]["role"] == "user"
assert messages[1]["role"] == "assistant"

# Verify no thinking blocks in assistant message
assistant_msg = messages[1]
content = assistant_msg.get("content")
if isinstance(content, list):
for part in content:
assert part.get("type") != "thinking"


def test_reasoning_items_preserved_in_message_conversion():
"""
Test that reasoning content and thinking blocks are properly extracted
from Anthropic responses and stored in reasoning items.
"""
# Create mock message with thinking blocks
mock_message = create_mock_anthropic_response_with_thinking()

# Convert to output items
output_items = Converter.message_to_output_items(mock_message)

# Should have reasoning item, message item, and tool call items
reasoning_items = [
item for item in output_items if hasattr(item, "type") and item.type == "reasoning"
]
assert len(reasoning_items) == 1

reasoning_item = reasoning_items[0]
assert reasoning_item.summary[0].text == "I need to call the weather function for Paris"

# Verify thinking blocks are stored if we preserve them
if (
hasattr(reasoning_item, "content")
and reasoning_item.content
and len(reasoning_item.content) > 0
):
thinking_block = reasoning_item.content[0]
assert thinking_block.type == "reasoning_text"
assert thinking_block.text == "I need to call the weather function for Paris"
60 changes: 58 additions & 2 deletions tests/test_stream_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

import pytest

from agents import Agent, Runner, function_tool
from agents import Agent, HandoffCallItem, Runner, function_tool
from agents.extensions.handoff_filters import remove_all_tools
from agents.handoffs import handoff

from .fake_model import FakeModel
from .test_responses import get_function_tool_call, get_text_message
from .test_responses import get_function_tool_call, get_handoff_tool_call, get_text_message


@function_tool
Expand Down Expand Up @@ -52,3 +54,57 @@ async def test_stream_events_main():
assert tool_call_start_time > 0, "tool_call_item was not observed"
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"


@pytest.mark.asyncio
async def test_stream_events_main_with_handoff():
@function_tool
async def foo(args: str) -> str:
return f"foo_result_{args}"

english_agent = Agent(
name="EnglishAgent",
instructions="You only speak English.",
model=FakeModel(),
)

model = FakeModel()
model.add_multiple_turn_outputs(
[
[
get_text_message("Hello"),
get_function_tool_call("foo", '{"args": "arg1"}'),
get_handoff_tool_call(english_agent),
],
[get_text_message("Done")],
]
)

triage_agent = Agent(
name="TriageAgent",
instructions="Handoff to the appropriate agent based on the language of the request.",
handoffs=[
handoff(english_agent, input_filter=remove_all_tools),
],
tools=[foo],
model=model,
)

result = Runner.run_streamed(
triage_agent,
input="Start",
)

handoff_requested_seen = False
agent_switched_to_english = False

async for event in result.stream_events():
if event.type == "run_item_stream_event":
if isinstance(event.item, HandoffCallItem):
handoff_requested_seen = True
elif event.type == "agent_updated_stream_event":
if hasattr(event, "new_agent") and event.new_agent.name == "EnglishAgent":
agent_switched_to_english = True

assert handoff_requested_seen, "handoff_requested event not observed"
assert agent_switched_to_english, "Agent did not switch to EnglishAgent"