From ba8efb91505e7758ac90b3b9304dad7df9cf4421 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:26:38 +0900 Subject: [PATCH 01/11] fix: AbstractAgent._run_stream_events --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index c7c1cb2b5c..e1a39698a3 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -867,16 +867,25 @@ async def _run_stream_events( usage: _usage.RunUsage | None = None, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: send_stream, receive_stream = anyio.create_memory_object_stream[ _messages.AgentStreamEvent | AgentRunResultEvent[Any] ]() - async def event_stream_handler( - _: RunContext[AgentDepsT], events: AsyncIterable[_messages.AgentStreamEvent] - ) -> None: + async def _yield_event_stream( + events: AsyncIterable[_messages.AgentStreamEvent], + ) -> AsyncIterator[_messages.AgentStreamEvent]: async for event in events: await send_stream.send(event) + yield event + + async def _event_stream_handler( + context: RunContext[AgentDepsT], events: AsyncIterable[_messages.AgentStreamEvent] + ) -> None: + events = _yield_event_stream(events) + if event_stream_handler is not None: + await event_stream_handler(context, events) async def run_agent() -> AgentRunResult[Any]: async with send_stream: @@ -894,7 +903,7 @@ async def run_agent() -> AgentRunResult[Any]: infer_name=False, toolsets=toolsets, builtin_tools=builtin_tools, - event_stream_handler=event_stream_handler, + event_stream_handler=_event_stream_handler, ) task = asyncio.create_task(run_agent()) From 146e4ccf219ace95005afa08debaa3b57d109007 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:27:33 +0900 Subject: [PATCH 02/11] fix: run_stream_events --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index e1a39698a3..f8862dd103 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -739,6 +739,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... @overload @@ -758,6 +759,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... def run_stream_events( @@ -776,6 +778,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -826,6 +829,7 @@ async def main(): infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. Returns: An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final From 6f7c94654b680b5e3f4e0c0d8d3e3ae0886c28a9 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:29:13 +0900 Subject: [PATCH 03/11] fix: run_stream_native --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index a1ca12cd6e..27d6202c97 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -20,7 +20,7 @@ from pydantic_ai import DeferredToolRequests, DeferredToolResults from pydantic_ai.agent import AbstractAgent -from pydantic_ai.agent.abstract import Instructions +from pydantic_ai.agent.abstract import EventStreamHandler, Instructions from pydantic_ai.builtin_tools import AbstractBuiltinTool from pydantic_ai.messages import ModelMessage from pydantic_ai.models import KnownModelName, Model @@ -209,6 +209,7 @@ def run_stream_native( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[NativeEvent]: """Run the agent with the protocol-specific run input and stream Pydantic AI events. @@ -226,6 +227,7 @@ def run_stream_native( infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools to use for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. """ message_history = [*(message_history or []), *self.messages] @@ -262,6 +264,7 @@ def run_stream_native( infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) def run_stream( From fb5b59542531a822159537570a7a7f6124a4e249 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:29:53 +0900 Subject: [PATCH 04/11] fix: run_stream --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index 27d6202c97..72db6fcc05 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -282,6 +282,7 @@ def run_stream( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> AsyncIterator[EventT]: """Run the agent with the protocol-specific run input and stream protocol-specific events. @@ -300,6 +301,7 @@ def run_stream( infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools to use for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. on_complete: Optional callback function called when the agent run completes successfully. The callback receives the completed [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] and can optionally yield additional protocol-specific events. """ @@ -317,6 +319,7 @@ def run_stream( infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ), on_complete=on_complete, ) From 97cbab163c3111f4e55d08a73ee200e86c940947 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:30:30 +0900 Subject: [PATCH 05/11] fix: dispatch_request --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index 72db6fcc05..8d5f6e3f31 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -342,6 +342,7 @@ async def dispatch_request( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> Response: """Handle a protocol-specific HTTP request by running the agent and returning a streaming response of protocol-specific events. @@ -362,6 +363,7 @@ async def dispatch_request( infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. builtin_tools: Optional additional builtin tools to use for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. on_complete: Optional callback function called when the agent run completes successfully. The callback receives the completed [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] and can optionally yield additional protocol-specific events. @@ -399,6 +401,7 @@ async def dispatch_request( infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, on_complete=on_complete, ), ) From 4c8752f83f4210951cb25f1bf96b6abfccf1e64a Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:31:41 +0900 Subject: [PATCH 06/11] fix: run_stream_events --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index f8862dd103..5221e16cfd 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -854,6 +854,7 @@ async def main(): usage=usage, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) async def _run_stream_events( From b53686ca4a480f4d02d5d5e3c16ce8e01405fc79 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:32:57 +0900 Subject: [PATCH 07/11] fix: order --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 8 ++++---- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 5221e16cfd..469199c929 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -777,8 +777,8 @@ def run_stream_events( usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -828,8 +828,8 @@ async def main(): usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. - builtin_tools: Optional additional builtin tools for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. + builtin_tools: Optional additional builtin tools for this run. Returns: An async iterable of stream events `AgentStreamEvent` and finally a `AgentRunResultEvent` with the final @@ -853,8 +853,8 @@ async def main(): usage_limits=usage_limits, usage=usage, toolsets=toolsets, - builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, + builtin_tools=builtin_tools, ) async def _run_stream_events( @@ -871,8 +871,8 @@ async def _run_stream_events( usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: send_stream, receive_stream = anyio.create_memory_object_stream[ _messages.AgentStreamEvent | AgentRunResultEvent[Any] diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index 8d5f6e3f31..8dff1f806a 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -208,8 +208,8 @@ def run_stream_native( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, ) -> AsyncIterator[NativeEvent]: """Run the agent with the protocol-specific run input and stream Pydantic AI events. @@ -226,8 +226,8 @@ def run_stream_native( usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. - builtin_tools: Optional additional builtin tools to use for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. + builtin_tools: Optional additional builtin tools to use for this run. """ message_history = [*(message_history or []), *self.messages] @@ -263,8 +263,8 @@ def run_stream_native( usage=usage, infer_name=infer_name, toolsets=toolsets, - builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, + builtin_tools=builtin_tools, ) def run_stream( @@ -281,8 +281,8 @@ def run_stream( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> AsyncIterator[EventT]: """Run the agent with the protocol-specific run input and stream protocol-specific events. @@ -300,8 +300,8 @@ def run_stream( usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. - builtin_tools: Optional additional builtin tools to use for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. + builtin_tools: Optional additional builtin tools to use for this run. on_complete: Optional callback function called when the agent run completes successfully. The callback receives the completed [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] and can optionally yield additional protocol-specific events. """ @@ -318,8 +318,8 @@ def run_stream( usage=usage, infer_name=infer_name, toolsets=toolsets, - builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, + builtin_tools=builtin_tools, ), on_complete=on_complete, ) @@ -341,8 +341,8 @@ async def dispatch_request( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - builtin_tools: Sequence[AbstractBuiltinTool] | None = None, event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, + builtin_tools: Sequence[AbstractBuiltinTool] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> Response: """Handle a protocol-specific HTTP request by running the agent and returning a streaming response of protocol-specific events. @@ -362,8 +362,8 @@ async def dispatch_request( usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. - builtin_tools: Optional additional builtin tools to use for this run. event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. + builtin_tools: Optional additional builtin tools to use for this run. on_complete: Optional callback function called when the agent run completes successfully. The callback receives the completed [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] and can optionally yield additional protocol-specific events. @@ -400,8 +400,8 @@ async def dispatch_request( usage=usage, infer_name=infer_name, toolsets=toolsets, - builtin_tools=builtin_tools, event_stream_handler=event_stream_handler, + builtin_tools=builtin_tools, on_complete=on_complete, ), ) From 1d08ab9d8cc7937058b2d5af9dc535ff18b76376 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:38:27 +0900 Subject: [PATCH 08/11] fix: ensure stream --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index 469199c929..d7492abdc6 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -891,6 +891,8 @@ async def _event_stream_handler( events = _yield_event_stream(events) if event_stream_handler is not None: await event_stream_handler(context, events) + async for _ in events: + pass async def run_agent() -> AgentRunResult[Any]: async with send_stream: From 7723089b05e9e3cb19e6ddefa65b6991ee407969 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:45:54 +0900 Subject: [PATCH 09/11] fix: order --- pydantic_ai_slim/pydantic_ai/agent/abstract.py | 6 +++--- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/agent/abstract.py b/pydantic_ai_slim/pydantic_ai/agent/abstract.py index d7492abdc6..eae242b920 100644 --- a/pydantic_ai_slim/pydantic_ai/agent/abstract.py +++ b/pydantic_ai_slim/pydantic_ai/agent/abstract.py @@ -777,8 +777,8 @@ def run_stream_events( usage: _usage.RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -853,8 +853,8 @@ async def main(): usage_limits=usage_limits, usage=usage, toolsets=toolsets, - event_stream_handler=event_stream_handler, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) async def _run_stream_events( @@ -871,8 +871,8 @@ async def _run_stream_events( usage_limits: _usage.UsageLimits | None = None, usage: _usage.RunUsage | None = None, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: send_stream, receive_stream = anyio.create_memory_object_stream[ _messages.AgentStreamEvent | AgentRunResultEvent[Any] diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index 8dff1f806a..bdf44a08de 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -208,8 +208,8 @@ def run_stream_native( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[NativeEvent]: """Run the agent with the protocol-specific run input and stream Pydantic AI events. @@ -263,8 +263,8 @@ def run_stream_native( usage=usage, infer_name=infer_name, toolsets=toolsets, - event_stream_handler=event_stream_handler, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) def run_stream( @@ -281,8 +281,8 @@ def run_stream( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> AsyncIterator[EventT]: """Run the agent with the protocol-specific run input and stream protocol-specific events. @@ -318,8 +318,8 @@ def run_stream( usage=usage, infer_name=infer_name, toolsets=toolsets, - event_stream_handler=event_stream_handler, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ), on_complete=on_complete, ) @@ -341,8 +341,8 @@ async def dispatch_request( usage: RunUsage | None = None, infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, - event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, on_complete: OnCompleteFunc[EventT] | None = None, ) -> Response: """Handle a protocol-specific HTTP request by running the agent and returning a streaming response of protocol-specific events. @@ -400,8 +400,8 @@ async def dispatch_request( usage=usage, infer_name=infer_name, toolsets=toolsets, - event_stream_handler=event_stream_handler, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, on_complete=on_complete, ), ) From e6dc9d76648b789f9ded6733d5d13e827bd27dad Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 00:51:37 +0900 Subject: [PATCH 10/11] fix: lint errors --- pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py | 4 ++++ pydantic_ai_slim/pydantic_ai/durable_exec/prefect/_agent.py | 5 +++++ pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py index 9e1c8ee3c0..4e641f2c55 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/dbos/_agent.py @@ -603,6 +603,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... @overload @@ -622,6 +623,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... def run_stream_events( @@ -640,6 +642,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -689,6 +692,7 @@ async def main(): usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. builtin_tools: Optional additional builtin tools for this run. Returns: diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/prefect/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/prefect/_agent.py index 8b1b6af44a..5fe4e9aaa6 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/prefect/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/prefect/_agent.py @@ -557,6 +557,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... @overload @@ -576,6 +577,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... def run_stream_events( @@ -594,6 +596,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -643,6 +646,7 @@ async def main(): usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. builtin_tools: Optional additional builtin tools for this run. Returns: @@ -669,6 +673,7 @@ async def main(): infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) @overload diff --git a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py index 6e964c8d08..369787458e 100644 --- a/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py +++ b/pydantic_ai_slim/pydantic_ai/durable_exec/temporal/_agent.py @@ -628,6 +628,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[OutputDataT]]: ... @overload @@ -647,6 +648,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[RunOutputDataT]]: ... def run_stream_events( @@ -665,6 +667,7 @@ def run_stream_events( infer_name: bool = True, toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None, builtin_tools: Sequence[AbstractBuiltinTool] | None = None, + event_stream_handler: EventStreamHandler[AgentDepsT] | None = None, ) -> AsyncIterator[_messages.AgentStreamEvent | AgentRunResultEvent[Any]]: """Run the agent with a user prompt in async mode and stream events from the run. @@ -714,6 +717,7 @@ async def main(): usage: Optional usage to start with, useful for resuming a conversation or agents used in tools. infer_name: Whether to try to infer the agent name from the call frame if it's not set. toolsets: Optional additional toolsets for this run. + event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run. builtin_tools: Optional additional builtin tools for this run. Returns: @@ -740,6 +744,7 @@ async def main(): infer_name=infer_name, toolsets=toolsets, builtin_tools=builtin_tools, + event_stream_handler=event_stream_handler, ) @overload From 04f3cd33edb24b343dbf40ed54a7829f4dda7b31 Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 23 Nov 2025 01:27:17 +0900 Subject: [PATCH 11/11] test: add test case --- tests/test_agent.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_agent.py b/tests/test_agent.py index 8cc6b8b38c..3f9afcdfc3 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -19,6 +19,7 @@ from pydantic_ai import ( AbstractToolset, Agent, + AgentRunResultEvent, AgentStreamEvent, AudioUrl, BinaryContent, @@ -6225,3 +6226,36 @@ def llm(messages: list[ModelMessage], _info: AgentInfo) -> ModelResponse: ] ) assert run.all_messages_json().startswith(b'[{"parts":[{"content":"Hello",') + + +async def test_run_stream_events_with_event_stream_handler(): + async def llm(messages: list[ModelMessage], _info: AgentInfo) -> AsyncIterator[str]: + yield 'ok here is ' + yield 'text' + + messages: list[list[ModelMessage]] = [] + stream_events: list[Any] = [] + + async def event_stream_handler(context: RunContext[Any], events: AsyncIterable[Any]) -> None: + messages.append(context.messages) + async for event in events: + stream_events.append(event) + + agent = Agent(FunctionModel(stream_function=llm)) + agent_events = [ + event + async for event in agent.run_stream_events( + message_history=[ + ModelRequest(parts=[UserPromptPart(content='Hello')]), + ], + event_stream_handler=event_stream_handler, + ) + ] + + assert len(stream_events) == len(agent_events) - 1 + assert stream_events == agent_events[: len(stream_events)] + result_event = next((event for event in agent_events if isinstance(event, AgentRunResultEvent)), None) + assert result_event is not None + all_messages = result_event.result.all_messages() + assert messages + assert all_messages == messages[-1]