Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,13 @@ def run_streamed(
context_wrapper=context_wrapper,
)

# Provide an emitter to the context so tools can stream custom events.
async def _emit(evt: StreamEvent) -> None:
streamed_result._event_queue.put_nowait(evt)

context_wrapper._emit_fn = _emit
context_wrapper._current_agent = starting_agent

# Kick off the actual agent loop in the background and return the streamed result object.
streamed_result._run_impl_task = asyncio.create_task(
self._start_streaming(
Expand Down Expand Up @@ -1059,6 +1066,8 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(session, starting_input, [])

while True:
# Keep current agent reference up-to-date for tool event emission.
context_wrapper._current_agent = current_agent
# Check for soft cancel before starting new turn
if streamed_result._cancel_mode == "after_turn":
streamed_result.is_complete = True
Expand Down
24 changes: 23 additions & 1 deletion src/agents/run_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Any, Generic
from typing import Any, Awaitable, Callable, Generic, Mapping, Optional

from typing_extensions import TypeVar

Expand All @@ -24,3 +24,25 @@ class RunContextWrapper(Generic[TContext]):
"""The usage of the agent run so far. For streamed responses, the usage will be stale until the
last chunk of the stream is processed.
"""

# Internal emitter for streaming custom tool events; set by the Runner in streaming mode.
_emit_fn: Optional[Callable[[Any], Awaitable[None]]] = field(default=None, repr=False)
# Current agent reference for constructing RunItem wrappers; set by the Runner.
_current_agent: Any = field(default=None, repr=False)

async def emit_event(self, event: Mapping[str, Any]) -> None:
"""
Emit a developer-defined event dict via the run's main stream.
The dict should include at least a 'type' key. The event will be forwarded
as a RunItemStreamEvent(name='tool_event', item.raw_item=event).

No-op if not in streaming mode.
"""
if not self._emit_fn or not isinstance(event, Mapping) or not event.get("type"):
return
# Lazy import to avoid circular dependencies at module import time
from .items import ToolCallItem
from .stream_events import RunItemStreamEvent

item = ToolCallItem(raw_item=dict(event), agent=self._current_agent)
await self._emit_fn(RunItemStreamEvent(name="tool_event", item=item))
2 changes: 2 additions & 0 deletions src/agents/stream_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class RunItemStreamEvent:
"mcp_approval_requested",
"mcp_approval_response",
"mcp_list_tools",
# Custom tool events emitted by tools via RunContextWrapper.emit_event
"tool_event",
]
"""The name of the event."""

Expand Down
Loading