Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ccaf87f
Add run_stream_sync
ajac-zero Oct 12, 2025
64586b4
Merge branch 'pydantic:main' into sync_stream
ajac-zero Oct 13, 2025
8623cb9
add lazy implementation
ajac-zero Oct 13, 2025
bb5c7fe
Merge branch 'main' into sync_stream
ajac-zero Oct 13, 2025
6497f63
Merge branch 'pydantic:main' into sync_stream
ajac-zero Oct 16, 2025
6e74a2a
add _sync methods to StreamedRunResult
ajac-zero Oct 16, 2025
db860ed
fix doctest
ajac-zero Oct 17, 2025
1f1952c
Merge branch 'main' into sync_stream
ajac-zero Oct 24, 2025
e523987
Merge branch 'main' into sync_stream
ajac-zero Oct 25, 2025
5ec9b49
add disclaimers
ajac-zero Oct 25, 2025
e1765d3
add fixture override
ajac-zero Oct 27, 2025
4f0d2ad
update docs
ajac-zero Oct 27, 2025
842d0e9
Update tests/test_streaming.py
ajac-zero Oct 31, 2025
12e96a6
Update pydantic_ai_slim/pydantic_ai/result.py
ajac-zero Oct 31, 2025
00246b5
Merge branch 'main' into sync_stream
ajac-zero Oct 31, 2025
9896901
turn run_stream_sync into normal method
ajac-zero Oct 31, 2025
1d0b46f
add infer_name
ajac-zero Oct 31, 2025
344c510
update docstrings
ajac-zero Oct 31, 2025
584ae6a
Update pydantic_ai_slim/pydantic_ai/_utils.py
DouweM Oct 31, 2025
cf2b9b6
Tweaks, add validate_response_output_sync
DouweM Oct 31, 2025
b35a92a
Merge branch 'main' into pr/ajac-zero/3146
DouweM Oct 31, 2025
db271eb
StreamedRunResultSync
DouweM Oct 31, 2025
888ee18
test wrapper methods
ajac-zero Nov 1, 2025
83b46fd
fix strict-no-cover
ajac-zero Nov 1, 2025
4f533b9
Merge branch 'main' into sync_stream
DouweM Nov 3, 2025
cec5df6
Update docs/agents.md
DouweM Nov 3, 2025
1ebd165
Merge branch 'main' into sync_stream
DouweM Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions pydantic_ai_slim/pydantic_ai/agent/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,87 @@ async def on_complete() -> None:
if not yielded:
raise exceptions.AgentRunError('Agent run finished without producing a final result') # pragma: no cover

@contextmanager
def run_stream_sync(
self,
user_prompt: str | Sequence[_messages.UserContent] | None = None,
*,
output_type: OutputSpec[RunOutputDataT] | None = None,
message_history: Sequence[_messages.ModelMessage] | None = None,
deferred_tool_results: DeferredToolResults | None = None,
model: models.Model | models.KnownModelName | str | None = None,
deps: AgentDepsT = None,
model_settings: ModelSettings | None = None,
usage_limits: _usage.UsageLimits | None = None,
usage: _usage.RunUsage | None = None,
infer_name: bool = True,
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> Iterator[result.SyncStreamedRunResult[AgentDepsT, Any]]:
"""Run the agent with a user prompt in sync streaming mode.

This method builds an internal agent graph (using system prompts, tools and output schemas) and then
runs the graph until the model produces output matching the `output_type`, for example text or structured data.
At this point, a streaming run result object is yielded from which you can stream the output as it comes in,
and -- once this output has completed streaming -- get the complete output, message history, and usage.

As this method will consider the first output matching the `output_type` to be the final output,
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
If you want to always run the agent graph to completion and stream events and output at the same time,
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead.

Example:
```python
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

def main():
with agent.run_stream_sync('What is the capital of the UK?') as response:
print(response.get_output())
#> The capital of the UK is London.
```

Args:
user_prompt: User input to start/continue the conversation.
output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
output validators since output validators would expect an argument that matches the agent's output type.
message_history: History of the conversation so far.
deferred_tool_results: Optional results for deferred tool calls in the message history.
model: Optional model to use for this run, required if `model` was not set when creating the agent.
deps: Optional dependencies to use for this run.
model_settings: Optional settings to use for this model's request.
usage_limits: Optional limits on model request count or token usage.
usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
infer_name: Whether to try to infer the agent name from the call frame if it's not set.
toolsets: Optional additional toolsets for this run.
builtin_tools: Optional additional builtin tools for this run.
event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.
It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.
Note that it does _not_ receive any events after the final result is found.

Returns:
The result of the run.
"""
async_cm = self.run_stream(
user_prompt,
output_type=output_type,
message_history=message_history,
deferred_tool_results=deferred_tool_results,
model=model,
deps=deps,
model_settings=model_settings,
usage_limits=usage_limits,
usage=usage,
infer_name=infer_name,
toolsets=toolsets,
builtin_tools=builtin_tools,
event_stream_handler=event_stream_handler,
)
async_result = get_event_loop().run_until_complete(async_cm.__aenter__())
yield result.SyncStreamedRunResult.from_streamed_result(async_result) # type: ignore[reportReturnType]

@overload
def run_stream_events(
self,
Expand Down
88 changes: 87 additions & 1 deletion pydantic_ai_slim/pydantic_ai/result.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations as _annotations

from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Iterator
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime
Expand All @@ -9,6 +9,8 @@
from pydantic import ValidationError
from typing_extensions import TypeVar, deprecated

from pydantic_graph._utils import get_event_loop

from . import _utils, exceptions, messages as _messages, models
from ._output import (
OutputDataT_inv,
Expand Down Expand Up @@ -543,6 +545,90 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None
await self._on_complete()


@dataclass(init=False)
class SyncStreamedRunResult(StreamedRunResult[AgentDepsT, OutputDataT]):
"""Provides a synchronous API over 'StreamedRunResult'."""

@classmethod
def from_streamed_result(
cls, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT]
) -> SyncStreamedRunResult[AgentDepsT, OutputDataT]:
"""Create a 'SyncStreamedRunResult' from an existing 'StreamedRunResult'."""
instance = cls.__new__(cls)

instance._all_messages = streamed_run_result._all_messages
instance._new_message_index = streamed_run_result._new_message_index
instance._stream_response = streamed_run_result._stream_response
instance._on_complete = streamed_run_result._on_complete
instance._run_result = streamed_run_result._run_result
instance.is_complete = streamed_run_result.is_complete

return instance

def _lazy_async_iterator(self, async_iter: AsyncIterator[T]) -> Iterator[T]:
"""Lazily yield items from async iterator as they're requested."""
loop = get_event_loop()

while True:
try:
item = loop.run_until_complete(async_iter.__anext__())
yield item
except StopAsyncIteration:
break

def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]: # type: ignore[reportIncompatibleMethodOverride]
"""Stream the output as an iterable.

The pydantic validator for structured data will be called in
[partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
on each iteration.

Args:
debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured outputs to reduce the overhead of
performing validation as each token is received.

Returns:
An iterable of the response data.
"""
async_stream = super().stream_output(debounce_by=debounce_by)
yield from self._lazy_async_iterator(async_stream)

def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]: # type: ignore[reportIncompatibleMethodOverride]
"""Stream the text result as an iterable.

!!! note
Result validators will NOT be called on the text result if `delta=True`.

Args:
delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
up to the current point.
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured responses to reduce the overhead of
performing validation as each token is received.
"""
async_stream = super().stream_text(delta=delta, debounce_by=debounce_by)
yield from self._lazy_async_iterator(async_stream)

def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]: # type: ignore[reportIncompatibleMethodOverride]
"""Stream the response as an iterable of Structured LLM Messages.

Args:
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
Debouncing is particularly important for long structured responses to reduce the overhead of
performing validation as each token is received.

Returns:
An iterable of the structured response message and whether that is the last message.
"""
async_stream = super().stream_responses(debounce_by=debounce_by)
yield from self._lazy_async_iterator(async_stream)

def get_output(self) -> OutputDataT: # type: ignore[reportIncompatibleMethodOverride]
"""Stream the whole response, validate and return it."""
return get_event_loop().run_until_complete(super().get_output())


@dataclass(repr=False)
class FinalResult(Generic[OutputDataT]):
"""Marker class storing the final output of an agent run and associated metadata."""
Expand Down
Loading