From 1dc08fb693988decd546059bfba8643f94e1a89a Mon Sep 17 00:00:00 2001 From: vincenzodomina Date: Mon, 24 Nov 2025 21:36:29 +0100 Subject: [PATCH] feat: add emit_event method to RunContextWrapper --- src/agents/run.py | 9 +++++++++ src/agents/run_context.py | 24 +++++++++++++++++++++++- src/agents/stream_events.py | 2 ++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/agents/run.py b/src/agents/run.py index fce7b4840..837acac45 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -887,6 +887,13 @@ def run_streamed( context_wrapper=context_wrapper, ) + # Provide an emitter to the context so tools can stream custom events. + async def _emit(evt: StreamEvent) -> None: + streamed_result._event_queue.put_nowait(evt) + + context_wrapper._emit_fn = _emit + context_wrapper._current_agent = starting_agent + # Kick off the actual agent loop in the background and return the streamed result object. streamed_result._run_impl_task = asyncio.create_task( self._start_streaming( @@ -1059,6 +1066,8 @@ async def _start_streaming( await AgentRunner._save_result_to_session(session, starting_input, []) while True: + # Keep current agent reference up-to-date for tool event emission. + context_wrapper._current_agent = current_agent # Check for soft cancel before starting new turn if streamed_result._cancel_mode == "after_turn": streamed_result.is_complete = True diff --git a/src/agents/run_context.py b/src/agents/run_context.py index 579a215f2..def15b667 100644 --- a/src/agents/run_context.py +++ b/src/agents/run_context.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Any, Generic +from typing import Any, Awaitable, Callable, Generic, Mapping, Optional from typing_extensions import TypeVar @@ -24,3 +24,25 @@ class RunContextWrapper(Generic[TContext]): """The usage of the agent run so far. For streamed responses, the usage will be stale until the last chunk of the stream is processed. """ + + # Internal emitter for streaming custom tool events; set by the Runner in streaming mode. + _emit_fn: Optional[Callable[[Any], Awaitable[None]]] = field(default=None, repr=False) + # Current agent reference for constructing RunItem wrappers; set by the Runner. + _current_agent: Any = field(default=None, repr=False) + + async def emit_event(self, event: Mapping[str, Any]) -> None: + """ + Emit a developer-defined event dict via the run's main stream. + The dict should include at least a 'type' key. The event will be forwarded + as a RunItemStreamEvent(name='tool_event', item.raw_item=event). + + No-op if not in streaming mode. + """ + if not self._emit_fn or not isinstance(event, Mapping) or not event.get("type"): + return + # Lazy import to avoid circular dependencies at module import time + from .items import ToolCallItem + from .stream_events import RunItemStreamEvent + + item = ToolCallItem(raw_item=dict(event), agent=self._current_agent) + await self._emit_fn(RunItemStreamEvent(name="tool_event", item=item)) diff --git a/src/agents/stream_events.py b/src/agents/stream_events.py index c0e9807a1..e5d7f3a35 100644 --- a/src/agents/stream_events.py +++ b/src/agents/stream_events.py @@ -39,6 +39,8 @@ class RunItemStreamEvent: "mcp_approval_requested", "mcp_approval_response", "mcp_list_tools", + # Custom tool events emitted by tools via RunContextWrapper.emit_event + "tool_event", ] """The name of the event."""