Skip to content

Commit 1dc08fb

Browse files
feat: add emit_event method to RunContextWrapper
1 parent a7c539f commit 1dc08fb

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

src/agents/run.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,13 @@ def run_streamed(
887887
context_wrapper=context_wrapper,
888888
)
889889

890+
# Provide an emitter to the context so tools can stream custom events.
891+
async def _emit(evt: StreamEvent) -> None:
892+
streamed_result._event_queue.put_nowait(evt)
893+
894+
context_wrapper._emit_fn = _emit
895+
context_wrapper._current_agent = starting_agent
896+
890897
# Kick off the actual agent loop in the background and return the streamed result object.
891898
streamed_result._run_impl_task = asyncio.create_task(
892899
self._start_streaming(
@@ -1059,6 +1066,8 @@ async def _start_streaming(
10591066
await AgentRunner._save_result_to_session(session, starting_input, [])
10601067

10611068
while True:
1069+
# Keep current agent reference up-to-date for tool event emission.
1070+
context_wrapper._current_agent = current_agent
10621071
# Check for soft cancel before starting new turn
10631072
if streamed_result._cancel_mode == "after_turn":
10641073
streamed_result.is_complete = True

src/agents/run_context.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass, field
2-
from typing import Any, Generic
2+
from typing import Any, Awaitable, Callable, Generic, Mapping, Optional
33

44
from typing_extensions import TypeVar
55

@@ -24,3 +24,25 @@ class RunContextWrapper(Generic[TContext]):
2424
"""The usage of the agent run so far. For streamed responses, the usage will be stale until the
2525
last chunk of the stream is processed.
2626
"""
27+
28+
# Internal emitter for streaming custom tool events; set by the Runner in streaming mode.
29+
_emit_fn: Optional[Callable[[Any], Awaitable[None]]] = field(default=None, repr=False)
30+
# Current agent reference for constructing RunItem wrappers; set by the Runner.
31+
_current_agent: Any = field(default=None, repr=False)
32+
33+
async def emit_event(self, event: Mapping[str, Any]) -> None:
34+
"""
35+
Emit a developer-defined event dict via the run's main stream.
36+
The dict should include at least a 'type' key. The event will be forwarded
37+
as a RunItemStreamEvent(name='tool_event', item.raw_item=event).
38+
39+
No-op if not in streaming mode.
40+
"""
41+
if not self._emit_fn or not isinstance(event, Mapping) or not event.get("type"):
42+
return
43+
# Lazy import to avoid circular dependencies at module import time
44+
from .items import ToolCallItem
45+
from .stream_events import RunItemStreamEvent
46+
47+
item = ToolCallItem(raw_item=dict(event), agent=self._current_agent)
48+
await self._emit_fn(RunItemStreamEvent(name="tool_event", item=item))

src/agents/stream_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class RunItemStreamEvent:
3939
"mcp_approval_requested",
4040
"mcp_approval_response",
4141
"mcp_list_tools",
42+
# Custom tool events emitted by tools via RunContextWrapper.emit_event
43+
"tool_event",
4244
]
4345
"""The name of the event."""
4446

0 commit comments

Comments
 (0)