From 789575f27f02a6d0bc58f37ad9e14143638ed18d Mon Sep 17 00:00:00 2001 From: Jacin Woo <41990342+Wujiaxuan007@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:28:53 +0800 Subject: [PATCH 1/2] fix(streaming): #1712 push processed_response.new_items (including HandoffCallItem) to event_queue (#1703) --- src/agents/run.py | 49 ++++++++++++++++++++++-------- tests/test_stream_events.py | 60 +++++++++++++++++++++++++++++++++++-- 2 files changed, 94 insertions(+), 15 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index ee08ad134..5056758fb 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -45,6 +45,7 @@ ) from .handoffs import Handoff, HandoffInputFilter, handoff from .items import ( + HandoffCallItem, ItemHelpers, ModelResponse, RunItem, @@ -60,7 +61,12 @@ from .models.multi_provider import MultiProvider from .result import RunResult, RunResultStreaming from .run_context import RunContextWrapper, TContext -from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent +from .stream_events import ( + AgentUpdatedStreamEvent, + RawResponsesStreamEvent, + RunItemStreamEvent, + StreamEvent, +) from .tool import Tool from .tracing import Span, SpanError, agent_span, get_current_trace, trace from .tracing.span_data import AgentSpanData @@ -1095,14 +1101,19 @@ async def _run_single_turn_streamed( context_wrapper=context_wrapper, run_config=run_config, tool_use_tracker=tool_use_tracker, + event_queue=streamed_result._event_queue, ) - if emitted_tool_call_ids: - import dataclasses as _dc + import dataclasses as _dc + + # Filter out items that have already been sent to avoid duplicates + items_to_filter = single_step_result.new_step_items - filtered_items = [ + if emitted_tool_call_ids: + # Filter out tool call items that were already emitted during streaming + items_to_filter = [ item - for item in single_step_result.new_step_items + for item in items_to_filter if not ( isinstance(item, ToolCallItem) and ( @@ -1114,15 +1125,17 @@ async def _run_single_turn_streamed( ) ] - single_step_result_filtered = _dc.replace( - single_step_result, new_step_items=filtered_items - ) + # Filter out HandoffCallItem to avoid duplicates (already sent earlier) + items_to_filter = [ + item for item in items_to_filter + if not isinstance(item, HandoffCallItem) + ] - RunImpl.stream_step_result_to_queue( - single_step_result_filtered, streamed_result._event_queue - ) - else: - RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) + # Create filtered result and send to queue + filtered_result = _dc.replace( + single_step_result, new_step_items=items_to_filter + ) + RunImpl.stream_step_result_to_queue(filtered_result, streamed_result._event_queue) return single_step_result @classmethod @@ -1207,6 +1220,7 @@ async def _get_single_step_result_from_response( context_wrapper: RunContextWrapper[TContext], run_config: RunConfig, tool_use_tracker: AgentToolUseTracker, + event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] | None = None, ) -> SingleStepResult: processed_response = RunImpl.process_model_response( agent=agent, @@ -1218,6 +1232,15 @@ async def _get_single_step_result_from_response( tool_use_tracker.add_tool_use(agent, processed_response.tools_used) + # Send handoff items immediately for streaming, but avoid duplicates + if event_queue is not None and processed_response.new_items: + handoff_items = [ + item for item in processed_response.new_items + if isinstance(item, HandoffCallItem) + ] + if handoff_items: + RunImpl.stream_step_items_to_queue(cast(list[RunItem], handoff_items), event_queue) + return await RunImpl.execute_tools_and_side_effects( agent=agent, original_input=original_input, diff --git a/tests/test_stream_events.py b/tests/test_stream_events.py index 0f85b63f8..a2f0338d6 100644 --- a/tests/test_stream_events.py +++ b/tests/test_stream_events.py @@ -3,10 +3,12 @@ import pytest -from agents import Agent, Runner, function_tool +from agents import Agent, HandoffCallItem, Runner, function_tool +from agents.extensions.handoff_filters import remove_all_tools +from agents.handoffs import handoff from .fake_model import FakeModel -from .test_responses import get_function_tool_call, get_text_message +from .test_responses import get_function_tool_call, get_handoff_tool_call, get_text_message @function_tool @@ -52,3 +54,57 @@ async def test_stream_events_main(): assert tool_call_start_time > 0, "tool_call_item was not observed" assert tool_call_end_time > 0, "tool_call_output_item was not observed" assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?" + + +@pytest.mark.asyncio +async def test_stream_events_main_with_handoff(): + @function_tool + async def foo(args: str) -> str: + return f"foo_result_{args}" + + english_agent = Agent( + name="EnglishAgent", + instructions="You only speak English.", + model=FakeModel(), + ) + + model = FakeModel() + model.add_multiple_turn_outputs( + [ + [ + get_text_message("Hello"), + get_function_tool_call("foo", '{"args": "arg1"}'), + get_handoff_tool_call(english_agent), + ], + [get_text_message("Done")], + ] + ) + + triage_agent = Agent( + name="TriageAgent", + instructions="Handoff to the appropriate agent based on the language of the request.", + handoffs=[ + handoff(english_agent, input_filter=remove_all_tools), + ], + tools=[foo], + model=model, + ) + + result = Runner.run_streamed( + triage_agent, + input="Start", + ) + + handoff_requested_seen = False + agent_switched_to_english = False + + async for event in result.stream_events(): + if event.type == "run_item_stream_event": + if isinstance(event.item, HandoffCallItem): + handoff_requested_seen = True + elif event.type == "agent_updated_stream_event": + if hasattr(event, "new_agent") and event.new_agent.name == "EnglishAgent": + agent_switched_to_english = True + + assert handoff_requested_seen, "handoff_requested event not observed" + assert agent_switched_to_english, "Agent did not switch to EnglishAgent" From 581111c8914c5befe701bf083d96915e2e1c1d36 Mon Sep 17 00:00:00 2001 From: Hassan Abu Alhaj <136383052+habema@users.noreply.github.com> Date: Fri, 12 Sep 2025 08:45:32 +0300 Subject: [PATCH 2/2] fix: #1704 Preserve thinking blocks in Anthropic conversations with tool calls (#1706) Co-authored-by: Kazuhiro Sera --- src/agents/extensions/models/litellm_model.py | 26 ++++- src/agents/models/chatcmpl_converter.py | 22 ++-- tests/test_anthropic_thinking_blocks.py | 101 ++++++++++++++++++ 3 files changed, 141 insertions(+), 8 deletions(-) create mode 100644 tests/test_anthropic_thinking_blocks.py diff --git a/src/agents/extensions/models/litellm_model.py b/src/agents/extensions/models/litellm_model.py index 5b2a6a104..f13793ac9 100644 --- a/src/agents/extensions/models/litellm_model.py +++ b/src/agents/extensions/models/litellm_model.py @@ -53,10 +53,11 @@ class InternalChatCompletionMessage(ChatCompletionMessage): """ - An internal subclass to carry reasoning_content without modifying the original model. - """ + An internal subclass to carry reasoning_content and thinking_blocks without modifying the original model. + """ # noqa: E501 reasoning_content: str + thinking_blocks: list[dict[str, Any]] | None = None class LitellmModel(Model): @@ -401,6 +402,26 @@ def convert_message_to_openai( if hasattr(message, "reasoning_content") and message.reasoning_content: reasoning_content = message.reasoning_content + # Extract full thinking blocks including signatures (for Anthropic) + thinking_blocks: list[dict[str, Any]] | None = None + if hasattr(message, "thinking_blocks") and message.thinking_blocks: + # Convert thinking blocks to dict format for compatibility + thinking_blocks = [] + for block in message.thinking_blocks: + if isinstance(block, dict): + thinking_blocks.append(cast(dict[str, Any], block)) + else: + # Convert object to dict by accessing its attributes + block_dict: dict[str, Any] = {} + if hasattr(block, '__dict__'): + block_dict = dict(block.__dict__.items()) + elif hasattr(block, 'model_dump'): + block_dict = block.model_dump() + else: + # Last resort: convert to string representation + block_dict = {"thinking": str(block)} + thinking_blocks.append(block_dict) + return InternalChatCompletionMessage( content=message.content, refusal=refusal, @@ -409,6 +430,7 @@ def convert_message_to_openai( audio=message.get("audio", None), # litellm deletes audio if not present tool_calls=tool_calls, reasoning_content=reasoning_content, + thinking_blocks=thinking_blocks, ) @classmethod diff --git a/src/agents/models/chatcmpl_converter.py b/src/agents/models/chatcmpl_converter.py index 61bbbb30b..1c9f826de 100644 --- a/src/agents/models/chatcmpl_converter.py +++ b/src/agents/models/chatcmpl_converter.py @@ -95,14 +95,24 @@ def message_to_output_items(cls, message: ChatCompletionMessage) -> list[TRespon # Handle reasoning content if available if hasattr(message, "reasoning_content") and message.reasoning_content: - items.append( - ResponseReasoningItem( - id=FAKE_RESPONSES_ID, - summary=[Summary(text=message.reasoning_content, type="summary_text")], - type="reasoning", - ) + reasoning_item = ResponseReasoningItem( + id=FAKE_RESPONSES_ID, + summary=[Summary(text=message.reasoning_content, type="summary_text")], + type="reasoning", ) + # Store full thinking blocks for Anthropic compatibility + if hasattr(message, "thinking_blocks") and message.thinking_blocks: + # Store thinking blocks in the reasoning item's content + # Convert thinking blocks to Content objects + from openai.types.responses.response_reasoning_item import Content + reasoning_item.content = [ + Content(text=str(block.get("thinking", "")), type="reasoning_text") + for block in message.thinking_blocks + ] + + items.append(reasoning_item) + message_item = ResponseOutputMessage( id=FAKE_RESPONSES_ID, content=[], diff --git a/tests/test_anthropic_thinking_blocks.py b/tests/test_anthropic_thinking_blocks.py new file mode 100644 index 000000000..9513c7833 --- /dev/null +++ b/tests/test_anthropic_thinking_blocks.py @@ -0,0 +1,101 @@ +""" +Test for Anthropic thinking blocks in conversation history. + +This test validates the fix for issue #1704: +- Thinking blocks are properly preserved from Anthropic responses +- Reasoning items are stored in session but not sent back in conversation history +- Non-reasoning models are unaffected +- Token usage is not increased for non-reasoning scenarios +""" + +from __future__ import annotations + +from typing import Any + +from agents.extensions.models.litellm_model import InternalChatCompletionMessage +from agents.models.chatcmpl_converter import Converter + + +def create_mock_anthropic_response_with_thinking() -> InternalChatCompletionMessage: + """Create a mock Anthropic response with thinking blocks (like real response).""" + message = InternalChatCompletionMessage( + role="assistant", + content="I'll check the weather in Paris for you.", + reasoning_content="I need to call the weather function for Paris", + thinking_blocks=[ + { + "type": "thinking", + "thinking": "I need to call the weather function for Paris", + "signature": "EqMDCkYIBxgCKkBAFZO8EyZwN1hiLctq0YjZnP0KeKgprr+C0PzgDv4GSggnFwrPQHIZ9A5s+paH+DrQBI1+Vnfq3mLAU5lJnoetEgzUEWx/Cv1022ieAvcaDCXdmg1XkMK0tZ8uCCIwURYAAX0uf2wFdnWt9n8whkhmy8ARQD5G2za4R8X5vTqBq8jpJ15T3c1Jcf3noKMZKooCWFVf0/W5VQqpZTgwDkqyTau7XraS+u48YlmJGSfyWMPO8snFLMZLGaGmVJgHfEI5PILhOEuX/R2cEeLuC715f51LMVuxTNzlOUV/037JV6P2ten7D66FnWU9JJMMJJov+DjMb728yQFHwHz4roBJ5ePHaaFP6mDwpqYuG/hai6pVv2TAK1IdKUui/oXrYtU+0gxb6UF2kS1bspqDuN++R8JdL7CMSU5l28pQ8TsH1TpVF4jZpsFbp1Du4rQIULFsCFFg+Edf9tPgyKZOq6xcskIjT7oylAPO37/jhdNknDq2S82PaSKtke3ViOigtM5uJfG521ZscBJQ1K3kwoI/repIdV9PatjOYdsYAQ==", # noqa: E501 + } + ], + ) + return message + + +def test_converter_skips_reasoning_items(): + """ + Unit test to verify that reasoning items are skipped when converting items to messages. + """ + # Create test items including a reasoning item + test_items: list[dict[str, Any]] = [ + {"role": "user", "content": "Hello"}, + { + "id": "reasoning_123", + "type": "reasoning", + "summary": [{"text": "User said hello", "type": "summary_text"}], + }, + { + "id": "msg_123", + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hi there!"}], + "status": "completed", + }, + ] + + # Convert to messages + messages = Converter.items_to_messages(test_items) # type: ignore[arg-type] + + # Should have user message and assistant message, but no reasoning content + assert len(messages) == 2 + assert messages[0]["role"] == "user" + assert messages[1]["role"] == "assistant" + + # Verify no thinking blocks in assistant message + assistant_msg = messages[1] + content = assistant_msg.get("content") + if isinstance(content, list): + for part in content: + assert part.get("type") != "thinking" + + +def test_reasoning_items_preserved_in_message_conversion(): + """ + Test that reasoning content and thinking blocks are properly extracted + from Anthropic responses and stored in reasoning items. + """ + # Create mock message with thinking blocks + mock_message = create_mock_anthropic_response_with_thinking() + + # Convert to output items + output_items = Converter.message_to_output_items(mock_message) + + # Should have reasoning item, message item, and tool call items + reasoning_items = [ + item for item in output_items if hasattr(item, "type") and item.type == "reasoning" + ] + assert len(reasoning_items) == 1 + + reasoning_item = reasoning_items[0] + assert reasoning_item.summary[0].text == "I need to call the weather function for Paris" + + # Verify thinking blocks are stored if we preserve them + if ( + hasattr(reasoning_item, "content") + and reasoning_item.content + and len(reasoning_item.content) > 0 + ): + thinking_block = reasoning_item.content[0] + assert thinking_block.type == "reasoning_text" + assert thinking_block.text == "I need to call the weather function for Paris"