Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions src/agentex/lib/core/services/adk/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from openai.types.responses import (
ResponseCompletedEvent,
ResponseFunctionToolCall,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseTextDeltaEvent,
)
Expand Down Expand Up @@ -593,34 +594,11 @@ async def run_agent_streamed_auto_send(
heartbeat_if_in_workflow(
"processing stream event with auto send"
)

if event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
tool_call_item = event.item.raw_item
tool_call_map[tool_call_item.call_id] = tool_call_item

tool_request_content = ToolRequestContent(
author="agent",
tool_call_id=tool_call_item.call_id,
name=tool_call_item.name,
arguments=json.loads(tool_call_item.arguments),
)

# Create tool request using streaming context (immediate completion)
async with (
self.streaming_service.streaming_task_message_context(
task_id=task_id,
initial_content=tool_request_content,
) as streaming_context
):
# The message has already been persisted, but we still need to send an upda
await streaming_context.stream_update(
update=StreamTaskMessageFull(
parent_task_message=streaming_context.task_message,
content=tool_request_content,
),
)
Comment on lines -602 to -622
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to stream back a ToolRequestContent when event.item.type=="tool_call_item". This caused unexpected behavior when using MCP servers as a tool provider.

The unexpected behavior was that tool requests and tool responses would be streamed back at the same time, after the tool had been completed. I.e If a tool request was initiated, you would see nothing streamed back until it was completed, At which point both the request and response would be streamed back nearly simultaneously.


elif event.item.type == "tool_call_output_item":
tool_output_item = event.item.raw_item

Expand Down Expand Up @@ -649,8 +627,33 @@ async def run_agent_streamed_auto_send(
)

elif event.type == "raw_response_event":
if isinstance(event.data, ResponseTextDeltaEvent):
# Handle text delta
if isinstance(event.data, ResponseOutputItemAddedEvent):
# Handle tool call initiation - stream tool request immediately when tool call starts
if (event.data.item.type == "function_call" and
hasattr(event.data.item, 'call_id') and
hasattr(event.data.item, 'name')):

tool_request_content = ToolRequestContent(
author="agent",
tool_call_id=event.data.item.call_id,
name=event.data.item.name,
arguments={}, # Empty arguments at initiation time
)

# Create tool request using streaming context (immediate completion)
async with self.streaming_service.streaming_task_message_context(
task_id=task_id,
initial_content=tool_request_content,
) as streaming_context:
await streaming_context.stream_update(
update=StreamTaskMessageFull(
parent_task_message=streaming_context.task_message,
content=tool_request_content,
),
)
Comment on lines +630 to +653
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now move the ToolRequestContent event when event.data.item.type == "function_call". The events we receive from OpenAI when a tool request is made seem to be:

  1. function call event: The agent decides to use this tool, and the tool request is initiated
  2. argument delta events: The agent streams back the arguments it wants to pass to the tool
  3. argument done event: The agent finishes streaming the argument, and hands it off to the MCP server to execute
  4. tool_call_item event: The mcp server completes execution and sends this event before the actual output
  5. tool_call_output_item event: The mcp sends back the tool call output

In a nutshell, the previous logic sent the tool call request event at step 4, after the tool had already executed and completed. Whereas this new logic sends the request at step 1, before the tool call is executed.

Here are the potential concerns with making this change:

  • I'm not exactly sure how this would affect OpenAI tool call events, since the events sent during the tool execution cycle is handled differently between it and MCP servers.
  • Now, the tool call request contains none of the arguments. That might be okay for some applications, but we probably should handle the argument delta events in the future if we want to make this change.


elif isinstance(event.data, ResponseTextDeltaEvent):
# Handle text delta - skip tool argument deltas, only handle actual text responses
item_id = event.data.item_id

# Check if we already have a streaming context for this item
Expand Down Expand Up @@ -701,7 +704,6 @@ async def run_agent_streamed_auto_send(
]
await streaming_context.close()
unclosed_item_ids.remove(item_id)

finally:
# Cleanup: ensure all streaming contexts for this session are properly finished
for item_id in unclosed_item_ids:
Expand Down
Loading