Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 141 additions & 146 deletions libs/agno/agno/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
from agno.run.requirement import RunRequirement
from agno.run.team import RunContentEvent as TeamRunContentEvent
from agno.run.team import TeamRunOutput, TeamRunOutputEvent
from agno.run.workflow import WorkflowRunOutputEvent
from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent
from agno.tools.function import (
Function,
FunctionCall,
FunctionExecutionResult,
ToolResult,
UserFeedbackOption,
UserFeedbackQuestion,
UserInputField,
Expand Down Expand Up @@ -2065,6 +2066,93 @@ def create_tool_call_limit_error_result(self, function_call: FunctionCall) -> Me
tool_call_error=True,
)

@staticmethod
def _process_generator_item(
item: Any,
function_call: FunctionCall,
event_types: tuple,
) -> Tuple[str, Optional[ModelResponse], bool, bool]:
"""Process a single item from a generator tool result.

Returns:
(output_delta, show_result_response, skip_raw_event, set_tool_call_id)
- output_delta: text to append to function_call_output
- show_result_response: ModelResponse to yield/enqueue if not None
- skip_raw_event: if True, do NOT yield/enqueue the raw item
- set_tool_call_id: if True, caller sets item.tool_call_id = function_call.call_id
"""
output_delta = ""
show_result_response: Optional[ModelResponse] = None
skip_raw_event = False
set_tool_call_id = False

if isinstance(item, event_types):
# Capture content from RunContentEvent / TeamRunContentEvent
if isinstance(item, (RunContentEvent, TeamRunContentEvent)):
if item.content is not None and isinstance(item.content, BaseModel):
output_delta = item.content.model_dump_json()
else:
output_delta = item.content or ""

if function_call.function.show_result and item.content is not None:
show_result_response = ModelResponse(content=item.content)
skip_raw_event = True

if isinstance(item, CustomEvent):
output_delta = str(item)
set_tool_call_id = True

# Extract content from WorkflowCompletedEvent
if isinstance(item, WorkflowCompletedEvent):
if item.content is not None:
if isinstance(item.content, BaseModel):
output_delta = item.content.model_dump_json()
else:
output_delta = str(item.content)
else:
# Non-event item: stringify and optionally show
output_delta = str(item)
if function_call.function.show_result and item is not None:
show_result_response = ModelResponse(content=str(item))

return output_delta, show_result_response, skip_raw_event, set_tool_call_id

@staticmethod
def _format_non_generator_result(
function_execution_result: FunctionExecutionResult,
function_call: FunctionCall,
) -> Tuple[str, Optional[ModelResponse]]:
"""Format a non-generator tool result.

Returns:
(function_call_output, show_result_response)
"""
function_call_output = ""
show_result_response: Optional[ModelResponse] = None

if isinstance(function_execution_result.result, ToolResult):
tool_result = function_execution_result.result
function_call_output = tool_result.content

# Transfer media from ToolResult to FunctionExecutionResult
if tool_result.images:
function_execution_result.images = tool_result.images
if tool_result.videos:
function_execution_result.videos = tool_result.videos
if tool_result.audios:
function_execution_result.audios = tool_result.audios
if tool_result.files:
function_execution_result.files = tool_result.files
else:
function_call_output = (
str(function_execution_result.result) if function_execution_result.result is not None else ""
)

if function_call.function.show_result and function_call_output is not None:
show_result_response = ModelResponse(content=function_call_output)

return function_call_output, show_result_response

def run_function_call(
self,
function_call: FunctionCall,
Expand Down Expand Up @@ -2112,46 +2200,23 @@ def run_function_call(
function_call_output: str = ""

if isinstance(function_execution_result.result, (GeneratorType, collections.abc.Iterator)):
event_types = (
tuple(get_args(RunOutputEvent))
+ tuple(get_args(TeamRunOutputEvent))
+ tuple(get_args(WorkflowRunOutputEvent))
)
try:
for item in function_execution_result.result:
# This function yields agent/team/workflow run events
if (
isinstance(item, tuple(get_args(RunOutputEvent)))
or isinstance(item, tuple(get_args(TeamRunOutputEvent)))
or isinstance(item, tuple(get_args(WorkflowRunOutputEvent)))
):
# We only capture content events for output accumulation
if isinstance(item, RunContentEvent) or isinstance(item, TeamRunContentEvent):
if item.content is not None and isinstance(item.content, BaseModel):
function_call_output += item.content.model_dump_json()
else:
# Capture output
function_call_output += item.content or ""

if function_call.function.show_result and item.content is not None:
yield ModelResponse(content=item.content)

if isinstance(item, CustomEvent):
function_call_output += str(item)
item.tool_call_id = function_call.call_id

# For WorkflowCompletedEvent, extract content for final output
from agno.run.workflow import WorkflowCompletedEvent

if isinstance(item, WorkflowCompletedEvent):
if item.content is not None:
if isinstance(item.content, BaseModel):
function_call_output += item.content.model_dump_json()
else:
function_call_output += str(item.content)

# Yield the event itself to bubble it up
output_delta, show_response, skip_raw, set_tcid = self._process_generator_item(
item, function_call, event_types
)
function_call_output += output_delta
if set_tcid:
item.tool_call_id = function_call.call_id
if show_response is not None:
yield show_response
if not skip_raw:
yield item

else:
function_call_output += str(item)
if function_call.function.show_result and item is not None:
yield ModelResponse(content=str(item))
except Exception as e:
log_error(f"Error while iterating function result generator for {function_call.function.name}: {e}")
function_call.error = str(e)
Expand All @@ -2166,27 +2231,11 @@ def run_function_call(
):
function_execution_result.updated_session_state = function_call.function._run_context.session_state
else:
from agno.tools.function import ToolResult

if isinstance(function_execution_result.result, ToolResult):
# Extract content and media from ToolResult
tool_result = function_execution_result.result
function_call_output = tool_result.content

# Transfer media from ToolResult to FunctionExecutionResult
if tool_result.images:
function_execution_result.images = tool_result.images
if tool_result.videos:
function_execution_result.videos = tool_result.videos
if tool_result.audios:
function_execution_result.audios = tool_result.audios
if tool_result.files:
function_execution_result.files = tool_result.files
else:
function_call_output = str(function_execution_result.result) if function_execution_result.result else ""

if function_call.function.show_result and function_call_output is not None:
yield ModelResponse(content=function_call_output)
function_call_output, show_response = self._format_non_generator_result(
function_execution_result, function_call
)
if show_response is not None:
yield show_response

# Create ToolCallMetrics for the tool execution
tool_metrics = None
Expand Down Expand Up @@ -2640,51 +2689,25 @@ async def arun_function_calls(
async def process_async_generator(result, generator_id):
function_call_success, function_call_timer, function_call, function_execution_result = result
function_call_output = ""
event_types = (
tuple(get_args(RunOutputEvent))
+ tuple(get_args(TeamRunOutputEvent))
+ tuple(get_args(WorkflowRunOutputEvent))
)

try:
async for item in function_call.result:
# This function yields agent/team/workflow run events
if isinstance(
item,
tuple(get_args(RunOutputEvent))
+ tuple(get_args(TeamRunOutputEvent))
+ tuple(get_args(WorkflowRunOutputEvent)),
):
# We only capture content events
if isinstance(item, RunContentEvent) or isinstance(item, TeamRunContentEvent):
if item.content is not None and isinstance(item.content, BaseModel):
function_call_output += item.content.model_dump_json()
else:
# Capture output
function_call_output += item.content or ""

if function_call.function.show_result and item.content is not None:
await event_queue.put(ModelResponse(content=item.content))
continue

if isinstance(item, CustomEvent):
function_call_output += str(item)
item.tool_call_id = function_call.call_id

# For WorkflowCompletedEvent, extract content for final output
from agno.run.workflow import WorkflowCompletedEvent

if isinstance(item, WorkflowCompletedEvent):
if item.content is not None:
if isinstance(item.content, BaseModel):
function_call_output += item.content.model_dump_json()
else:
function_call_output += str(item.content)

# Put the event into the queue to be yielded
output_delta, show_response, skip_raw, set_tcid = self._process_generator_item(
item, function_call, event_types
)
function_call_output += output_delta
if set_tcid:
item.tool_call_id = function_call.call_id
if show_response is not None:
await event_queue.put(show_response)
if not skip_raw:
await event_queue.put(item)

# Yield custom events emitted by the tool
else:
function_call_output += str(item)
if function_call.function.show_result and item is not None:
await event_queue.put(ModelResponse(content=str(item)))

# Store the final output for this generator
async_generator_outputs[generator_id] = (result, function_call_output, None)

Expand Down Expand Up @@ -2773,37 +2796,23 @@ async def process_async_generator(result, generator_id):
function_call_output = async_function_call_output
# Events from async generators were already yielded in real-time above
elif isinstance(function_call.result, (GeneratorType, collections.abc.Iterator)):
event_types = (
tuple(get_args(RunOutputEvent))
+ tuple(get_args(TeamRunOutputEvent))
+ tuple(get_args(WorkflowRunOutputEvent))
)
try:
for item in function_call.result:
# This function yields agent/team/workflow run events
if isinstance(
item,
tuple(get_args(RunOutputEvent))
+ tuple(get_args(TeamRunOutputEvent))
+ tuple(get_args(WorkflowRunOutputEvent)),
):
# We only capture content events
if isinstance(item, RunContentEvent) or isinstance(item, TeamRunContentEvent):
if item.content is not None and isinstance(item.content, BaseModel):
function_call_output += item.content.model_dump_json()
else:
# Capture output
function_call_output += item.content or ""

if function_call.function.show_result and item.content is not None:
yield ModelResponse(content=item.content)
continue

elif isinstance(item, CustomEvent):
function_call_output += str(item)
item.tool_call_id = function_call.call_id

# Yield the event itself to bubble it up
output_delta, show_response, skip_raw, set_tcid = self._process_generator_item(
item, function_call, event_types
)
function_call_output += output_delta
if set_tcid:
item.tool_call_id = function_call.call_id
if show_response is not None:
yield show_response
if not skip_raw:
yield item
else:
function_call_output += str(item)
if function_call.function.show_result and item is not None:
yield ModelResponse(content=str(item))
except Exception as e:
log_error(f"Error while iterating function result generator for {function_call.function.name}: {e}")
function_call.error = str(e)
Expand All @@ -2829,25 +2838,11 @@ async def process_async_generator(result, generator_id):
(GeneratorType, collections.abc.Iterator, AsyncGeneratorType, collections.abc.AsyncIterator),
)
):
from agno.tools.function import ToolResult

if isinstance(function_execution_result.result, ToolResult):
tool_result = function_execution_result.result
function_call_output = tool_result.content

if tool_result.images:
function_execution_result.images = tool_result.images
if tool_result.videos:
function_execution_result.videos = tool_result.videos
if tool_result.audios:
function_execution_result.audios = tool_result.audios
if tool_result.files:
function_execution_result.files = tool_result.files
else:
function_call_output = str(function_call.result)

if function_call.function.show_result and function_call_output is not None:
yield ModelResponse(content=function_call_output)
function_call_output, show_response = self._format_non_generator_result(
function_execution_result, function_call
)
if show_response is not None:
yield show_response

# Create ToolCallMetrics for the tool execution
tool_metrics = None
Expand Down
Loading
Loading