Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ccaf87f
Add run_stream_sync
ajac-zero Oct 12, 2025
64586b4
Merge branch 'pydantic:main' into sync_stream
ajac-zero Oct 13, 2025
8623cb9
add lazy implementation
ajac-zero Oct 13, 2025
bb5c7fe
Merge branch 'main' into sync_stream
ajac-zero Oct 13, 2025
6497f63
Merge branch 'pydantic:main' into sync_stream
ajac-zero Oct 16, 2025
6e74a2a
add _sync methods to StreamedRunResult
ajac-zero Oct 16, 2025
db860ed
fix doctest
ajac-zero Oct 17, 2025
1f1952c
Merge branch 'main' into sync_stream
ajac-zero Oct 24, 2025
e523987
Merge branch 'main' into sync_stream
ajac-zero Oct 25, 2025
5ec9b49
add disclaimers
ajac-zero Oct 25, 2025
e1765d3
add fixture override
ajac-zero Oct 27, 2025
4f0d2ad
update docs
ajac-zero Oct 27, 2025
842d0e9
Update tests/test_streaming.py
ajac-zero Oct 31, 2025
12e96a6
Update pydantic_ai_slim/pydantic_ai/result.py
ajac-zero Oct 31, 2025
00246b5
Merge branch 'main' into sync_stream
ajac-zero Oct 31, 2025
9896901
turn run_stream_sync into normal method
ajac-zero Oct 31, 2025
1d0b46f
add infer_name
ajac-zero Oct 31, 2025
344c510
update docstrings
ajac-zero Oct 31, 2025
584ae6a
Update pydantic_ai_slim/pydantic_ai/_utils.py
DouweM Oct 31, 2025
cf2b9b6
Tweaks, add validate_response_output_sync
DouweM Oct 31, 2025
b35a92a
Merge branch 'main' into pr/ajac-zero/3146
DouweM Oct 31, 2025
db271eb
StreamedRunResultSync
DouweM Oct 31, 2025
888ee18
test wrapper methods
ajac-zero Nov 1, 2025
83b46fd
fix strict-no-cover
ajac-zero Nov 1, 2025
4f533b9
Merge branch 'main' into sync_stream
DouweM Nov 3, 2025
cec5df6
Update docs/agents.md
DouweM Nov 3, 2025
1ebd165
Merge branch 'main' into sync_stream
DouweM Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ There are five ways to run an agent:

1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response.
2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`).
3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable.
3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable. [`agent.run_stream_sync()`][pydantic_ai.agent.AbstractAgent.run_stream_sync] is a synchronous context manager variation with the same return type.
4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result.
5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].

Expand Down
111 changes: 110 additions & 1 deletion pydantic_ai_slim/pydantic_ai/agent/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import inspect
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterator, Mapping, Sequence
from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager
from contextlib import AbstractAsyncContextManager, AbstractContextManager, asynccontextmanager, contextmanager
from types import FrameType
from typing import TYPE_CHECKING, Any, Generic, TypeAlias, cast, overload

Expand Down Expand Up @@ -581,6 +581,115 @@ async def on_complete() -> None:
if not yielded:
raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover

@overload
def run_stream_sync(
self,
user_prompt: str | Sequence[_messages.UserContent] | None = None,
*,
output_type: None = None,
message_history: Sequence[_messages.ModelMessage] | None = None,
deferred_tool_results: DeferredToolResults | None = None,
model: models.Model | models.KnownModelName | str | None = None,
deps: AgentDepsT = None,
model_settings: ModelSettings | None = None,
usage_limits: _usage.UsageLimits | None = None,
usage: _usage.RunUsage | None = None,
infer_name: bool = True,
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, OutputDataT]]: ...

@overload
def run_stream_sync(
self,
user_prompt: str | Sequence[_messages.UserContent] | None = None,
*,
output_type: OutputSpec[RunOutputDataT],
message_history: Sequence[_messages.ModelMessage] | None = None,
deferred_tool_results: DeferredToolResults | None = None,
model: models.Model | models.KnownModelName | str | None = None,
deps: AgentDepsT = None,
model_settings: ModelSettings | None = None,
usage_limits: _usage.UsageLimits | None = None,
usage: _usage.RunUsage | None = None,
infer_name: bool = True,
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> AbstractContextManager[result.StreamedRunResult[AgentDepsT, RunOutputDataT]]: ...

@contextmanager
def run_stream_sync(
self,
user_prompt: str | Sequence[_messages.UserContent] | None = None,
*,
output_type: OutputSpec[RunOutputDataT] | None = None,
message_history: Sequence[_messages.ModelMessage] | None = None,
deferred_tool_results: DeferredToolResults | None = None,
model: models.Model | models.KnownModelName | str | None = None,
deps: AgentDepsT = None,
model_settings: ModelSettings | None = None,
usage_limits: _usage.UsageLimits | None = None,
usage: _usage.RunUsage | None = None,
infer_name: bool = True,
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> Iterator[result.StreamedRunResult[AgentDepsT, Any]]:
"""Run the agent with a user prompt in sync streaming mode.

This is a convenience method that wraps [`self.run_stream`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`.
You therefore can't use this method inside async code or if there's an active event loop.

This method builds an internal agent graph (using system prompts, tools and output schemas) and then
runs the graph until the model produces output matching the `output_type`, for example text or structured data.
At this point, a streaming run result object is yielded from which you can stream the output as it comes in,
and -- once this output has completed streaming -- get the complete output, message history, and usage.

As this method will consider the first output matching the `output_type` to be the final output,
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
If you want to always run the agent graph to completion and stream events and output at the same time,
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead.

Args:
user_prompt: User input to start/continue the conversation.
output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
output validators since output validators would expect an argument that matches the agent's output type.
message_history: History of the conversation so far.
deferred_tool_results: Optional results for deferred tool calls in the message history.
model: Optional model to use for this run, required if `model` was not set when creating the agent.
deps: Optional dependencies to use for this run.
model_settings: Optional settings to use for this model's request.
usage_limits: Optional limits on model request count or token usage.
usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
infer_name: Whether to try to infer the agent name from the call frame if it's not set.
toolsets: Optional additional toolsets for this run.
builtin_tools: Optional additional builtin tools for this run.
event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.
It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.
Note that it does _not_ receive any events after the final result is found.

Returns:
The result of the run.
"""
async_cm = self.run_stream(
user_prompt,
output_type=output_type,
message_history=message_history,
deferred_tool_results=deferred_tool_results,
model=model,
deps=deps,
model_settings=model_settings,
usage_limits=usage_limits,
usage=usage,
infer_name=infer_name,
toolsets=toolsets,
builtin_tools=builtin_tools,
event_stream_handler=event_stream_handler,
)
yield get_event_loop().run_until_complete(async_cm.__aenter__())

@overload
def run_stream_events(
self,
Expand Down
82 changes: 81 additions & 1 deletion pydantic_ai_slim/pydantic_ai/result.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations as _annotations

from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Iterator
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime
Expand All @@ -9,6 +9,8 @@
from pydantic import ValidationError
from typing_extensions import TypeVar, deprecated

from pydantic_graph._utils import get_event_loop

from . import _utils, exceptions, messages as _messages, models
from ._output import (
OutputDataT_inv,
Expand Down Expand Up @@ -408,6 +410,27 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat
else:
raise ValueError('No stream response or run result provided') # pragma: no cover

def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]:
"""Stream the output as an iterable.

This is a convenience method that wraps [`self.stream_output`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`.
You therefore can't use this method inside async code or if there's an active event loop.

The pydantic validator for structured data will be called in
[partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
on each iteration.

Args:
debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured outputs to reduce the overhead of
performing validation as each token is received.

Returns:
An iterable of the response data.
"""
async_stream = self.stream_output(debounce_by=debounce_by)
yield from _blocking_async_iterator(async_stream)

async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]:
"""Stream the text result as an async iterable.

Expand Down Expand Up @@ -436,6 +459,25 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None =
else:
raise ValueError('No stream response or run result provided') # pragma: no cover

def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]:
"""Stream the text result as a sync iterable.

This is a convenience method that wraps [`self.stream_text`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`.
You therefore can't use this method inside async code or if there's an active event loop.

!!! note
Result validators will NOT be called on the text result if `delta=True`.

Args:
delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
up to the current point.
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured responses to reduce the overhead of
performing validation as each token is received.
"""
async_stream = self.stream_text(delta=delta, debounce_by=debounce_by)
yield from _blocking_async_iterator(async_stream)

@deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.')
async def stream_structured(
self, *, debounce_by: float | None = 0.1
Expand Down Expand Up @@ -471,6 +513,25 @@ async def stream_responses(
else:
raise ValueError('No stream response or run result provided') # pragma: no cover

def stream_responses_sync(
self, *, debounce_by: float | None = 0.1
) -> Iterator[tuple[_messages.ModelResponse, bool]]:
"""Stream the response as an iterable of Structured LLM Messages.

This is a convenience method that wraps [`self.stream_responses`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`.
You therefore can't use this method inside async code or if there's an active event loop.

Args:
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured responses to reduce the overhead of
performing validation as each token is received.

Returns:
An iterable of the structured response message and whether that is the last message.
"""
async_stream = self.stream_responses(debounce_by=debounce_by)
yield from _blocking_async_iterator(async_stream)

async def get_output(self) -> OutputDataT:
"""Stream the whole response, validate and return it."""
if self._run_result is not None:
Expand All @@ -484,6 +545,14 @@ async def get_output(self) -> OutputDataT:
else:
raise ValueError('No stream response or run result provided') # pragma: no cover

def get_output_sync(self) -> OutputDataT:
"""Stream the whole response, validate and return it.

This is a convenience method that wraps [`self.get_output`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`.
You therefore can't use this method inside async code or if there's an active event loop.
"""
return get_event_loop().run_until_complete(self.get_output())

@property
def response(self) -> _messages.ModelResponse:
"""Return the current state of the response."""
Expand Down Expand Up @@ -559,6 +628,17 @@ class FinalResult(Generic[OutputDataT]):
__repr__ = _utils.dataclasses_no_defaults_repr


def _blocking_async_iterator(async_iter: AsyncIterator[T]) -> Iterator[T]:
loop = get_event_loop()

while True:
try:
item = loop.run_until_complete(async_iter.__anext__())
yield item
except StopAsyncIteration:
break


def _get_usage_checking_stream_response(
stream_response: models.StreamedResponse,
limits: UsageLimits | None,
Expand Down
Loading