Skip to content

Commit 6bf8174

Browse files
Final hopefully
1 parent 90446e8 commit 6bf8174

File tree

1 file changed

+31
-46
lines changed
  • src/agentex/lib/core/services/adk/providers

1 file changed

+31
-46
lines changed

src/agentex/lib/core/services/adk/providers/openai.py

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# Standard library imports
22
import json
3-
from contextlib import AsyncExitStack, asynccontextmanager
43
from typing import Any, Literal
54
from contextlib import AsyncExitStack, asynccontextmanager
65

@@ -16,15 +15,12 @@
1615
ResponseTextDeltaEvent,
1716
ResponseFunctionWebSearch,
1817
ResponseCodeInterpreterToolCall,
19-
ResponseFunctionCallArgumentsDeltaEvent,
2018
ResponseFunctionToolCall,
21-
ResponseOutputItemAddedEvent,
2219
ResponseOutputItemDoneEvent,
2320
ResponseTextDeltaEvent,
2421
ResponseReasoningSummaryPartAddedEvent,
2522
ResponseReasoningSummaryTextDeltaEvent,
2623
ResponseReasoningSummaryPartDoneEvent,
27-
2824
)
2925

3026
# Local imports
@@ -768,8 +764,8 @@ async def run_agent_streamed_auto_send(
768764
try:
769765
# Process streaming events with TaskMessage creation
770766
async for event in result.stream_events():
771-
772767
heartbeat_if_in_workflow("processing stream event with auto send")
768+
773769
if event.type == "run_item_stream_event":
774770
if event.item.type == "tool_call_item":
775771
tool_call_item = event.item.raw_item
@@ -802,13 +798,16 @@ async def run_agent_streamed_auto_send(
802798
elif event.item.type == "tool_call_output_item":
803799
tool_output_item = event.item.raw_item
804800

801+
# Extract tool response information using the helper method
802+
call_id, tool_name, content = self._extract_tool_response_info(
803+
tool_call_map, tool_output_item
804+
)
805+
805806
tool_response_content = ToolResponseContent(
806807
author="agent",
807-
tool_call_id=tool_output_item["call_id"],
808-
name=tool_call_map[
809-
tool_output_item["call_id"]
810-
].name,
811-
content=tool_output_item["output"],
808+
tool_call_id=call_id,
809+
name=tool_name,
810+
content=content,
812811
)
813812

814813
# Create tool response using streaming context (immediate completion)
@@ -926,33 +925,25 @@ async def run_agent_streamed_auto_send(
926925
elif isinstance(event.data, ResponseOutputItemDoneEvent):
927926
# Handle item completion
928927
item_id = event.data.item.id
928+
929+
# Finish the streaming context (sends DONE event and updates message)
929930
if item_id in item_id_to_streaming_context:
930-
streaming_context = (
931-
item_id_to_streaming_context[
932-
item_id
933-
]
934-
)
931+
streaming_context = item_id_to_streaming_context[item_id]
932+
await streaming_context.close()
933+
if item_id in unclosed_item_ids:
934+
unclosed_item_ids.remove(item_id)
935935

936936
elif isinstance(event.data, ResponseCompletedEvent):
937-
# All items complete, finish all remaining
938-
# streaming contexts for this session
939-
# Create copy to avoid modifying set in iteration
937+
# All items complete, finish all remaining streaming contexts for this session
938+
# Create a copy to avoid modifying set during iteration
940939
remaining_items = list(unclosed_item_ids)
941940
for item_id in remaining_items:
942-
if (item_id in unclosed_item_ids and
943-
item_id in
944-
item_id_to_streaming_context):
945-
streaming_context = (
946-
item_id_to_streaming_context[
947-
item_id
948-
]
949-
)
950-
951-
# Don't close streaming contexts for text or reasoning
952-
# They should just end with their last delta
953-
if not isinstance(streaming_context.task_message.content, (TextContent, ReasoningContent)):
954-
await streaming_context.close()
955-
unclosed_item_ids.discard(item_id)
941+
if (
942+
item_id in unclosed_item_ids and item_id in item_id_to_streaming_context
943+
): # Check if still unclosed
944+
streaming_context = item_id_to_streaming_context[item_id]
945+
await streaming_context.close()
946+
unclosed_item_ids.discard(item_id)
956947

957948
except InputGuardrailTripwireTriggered as e:
958949
# Handle guardrail trigger by sending a rejection message
@@ -1031,22 +1022,16 @@ async def run_agent_streamed_auto_send(
10311022
raise
10321023

10331024
finally:
1034-
# Cleanup: ensure all streaming contexts for
1035-
# this session are properly finished
1036-
# Create copy to avoid modifying set in iteration
1025+
# Cleanup: ensure all streaming contexts for this session are properly finished
1026+
# Create a copy to avoid modifying set during iteration
10371027
remaining_items = list(unclosed_item_ids)
10381028
for item_id in remaining_items:
1039-
if (item_id in unclosed_item_ids and
1040-
item_id in item_id_to_streaming_context):
1041-
streaming_context = (
1042-
item_id_to_streaming_context[item_id]
1043-
)
1044-
1045-
# Don't close streaming contexts for text or reasoning
1046-
# They should just end with their last delta
1047-
if not isinstance(streaming_context.task_message.content, (TextContent, ReasoningContent)):
1048-
await streaming_context.close()
1049-
unclosed_item_ids.discard(item_id)
1029+
if (
1030+
item_id in unclosed_item_ids and item_id in item_id_to_streaming_context
1031+
): # Check if still unclosed
1032+
streaming_context = item_id_to_streaming_context[item_id]
1033+
await streaming_context.close()
1034+
unclosed_item_ids.discard(item_id)
10501035

10511036
if span:
10521037
span.output = {

0 commit comments

Comments
 (0)