Skip to content

Commit db271eb

Browse files
committed
StreamedRunResultSync
1 parent b35a92a commit db271eb

File tree

3 files changed

+243
-92
lines changed

3 files changed

+243
-92
lines changed

pydantic_ai_slim/pydantic_ai/agent/abstract.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ def run_stream_sync(
597597
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
598598
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
599599
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
600-
) -> result.StreamedRunResult[AgentDepsT, OutputDataT]: ...
600+
) -> result.StreamedRunResultSync[AgentDepsT, OutputDataT]: ...
601601

602602
@overload
603603
def run_stream_sync(
@@ -616,7 +616,7 @@ def run_stream_sync(
616616
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
617617
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
618618
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
619-
) -> result.StreamedRunResult[AgentDepsT, RunOutputDataT]: ...
619+
) -> result.StreamedRunResultSync[AgentDepsT, RunOutputDataT]: ...
620620

621621
def run_stream_sync(
622622
self,
@@ -634,7 +634,7 @@ def run_stream_sync(
634634
toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
635635
builtin_tools: Sequence[AbstractBuiltinTool] | None = None,
636636
event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
637-
) -> result.StreamedRunResult[AgentDepsT, Any]:
637+
) -> result.StreamedRunResultSync[AgentDepsT, Any]:
638638
"""Run the agent with a user prompt in sync streaming mode.
639639
640640
This is a convenience method that wraps [`run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] with `loop.run_until_complete(...)`.
@@ -658,7 +658,7 @@ def run_stream_sync(
658658
659659
def main():
660660
response = agent.run_stream_sync('What is the capital of the UK?')
661-
print(response.get_output_sync())
661+
print(response.get_output())
662662
#> The capital of the UK is London.
663663
```
664664
@@ -704,7 +704,8 @@ async def _consume_stream():
704704
) as stream_result:
705705
yield stream_result
706706

707-
return _utils.get_event_loop().run_until_complete(anext(_consume_stream()))
707+
async_result = _utils.get_event_loop().run_until_complete(anext(_consume_stream()))
708+
return result.StreamedRunResultSync(async_result)
708709

709710
@overload
710711
def run_stream_events(

pydantic_ai_slim/pydantic_ai/result.py

Lines changed: 153 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
'OutputDataT_inv',
3636
'ToolOutput',
3737
'OutputValidatorFunc',
38+
'StreamedRunResultSync',
3839
)
3940

4041

@@ -420,26 +421,6 @@ async def stream_output(self, *, debounce_by: float | None = 0.1) -> AsyncIterat
420421
else:
421422
raise ValueError('No stream response or run result provided') # pragma: no cover
422423

423-
def stream_output_sync(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]:
424-
"""Stream the output as an iterable.
425-
426-
This is a convenience method that wraps [`stream_output()`][pydantic_ai.result.StreamedRunResult.stream_output] with `loop.run_until_complete(...)`.
427-
You therefore can't use this method inside async code or if there's an active event loop.
428-
429-
The pydantic validator for structured data will be called in
430-
[partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
431-
on each iteration.
432-
433-
Args:
434-
debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing.
435-
Debouncing is particularly important for long structured outputs to reduce the overhead of
436-
performing validation as each token is received.
437-
438-
Returns:
439-
An iterable of the response data.
440-
"""
441-
return _utils.sync_async_iterator(self.stream_output(debounce_by=debounce_by))
442-
443424
async def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> AsyncIterator[str]:
444425
"""Stream the text result as an async iterable.
445426
@@ -468,24 +449,6 @@ async def stream_text(self, *, delta: bool = False, debounce_by: float | None =
468449
else:
469450
raise ValueError('No stream response or run result provided') # pragma: no cover
470451

471-
def stream_text_sync(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]:
472-
"""Stream the text result as a sync iterable.
473-
474-
This is a convenience method that wraps [`stream_text()`][pydantic_ai.result.StreamedRunResult.stream_text] with `loop.run_until_complete(...)`.
475-
You therefore can't use this method inside async code or if there's an active event loop.
476-
477-
!!! note
478-
Result validators will NOT be called on the text result if `delta=True`.
479-
480-
Args:
481-
delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
482-
up to the current point.
483-
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
484-
Debouncing is particularly important for long structured responses to reduce the overhead of
485-
performing validation as each token is received.
486-
"""
487-
return _utils.sync_async_iterator(self.stream_text(delta=delta, debounce_by=debounce_by))
488-
489452
@deprecated('`StreamedRunResult.stream_structured` is deprecated, use `stream_responses` instead.')
490453
async def stream_structured(
491454
self, *, debounce_by: float | None = 0.1
@@ -521,24 +484,6 @@ async def stream_responses(
521484
else:
522485
raise ValueError('No stream response or run result provided') # pragma: no cover
523486

524-
def stream_responses_sync(
525-
self, *, debounce_by: float | None = 0.1
526-
) -> Iterator[tuple[_messages.ModelResponse, bool]]:
527-
"""Stream the response as an iterable of Structured LLM Messages.
528-
529-
This is a convenience method that wraps [`stream_responses()`][pydantic_ai.result.StreamedRunResult.stream_responses] with `loop.run_until_complete(...)`.
530-
You therefore can't use this method inside async code or if there's an active event loop.
531-
532-
Args:
533-
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
534-
Debouncing is particularly important for long structured responses to reduce the overhead of
535-
performing validation as each token is received.
536-
537-
Returns:
538-
An iterable of the structured response message and whether that is the last message.
539-
"""
540-
return _utils.sync_async_iterator(self.stream_responses(debounce_by=debounce_by))
541-
542487
async def get_output(self) -> OutputDataT:
543488
"""Stream the whole response, validate and return it."""
544489
if self._run_result is not None:
@@ -552,14 +497,6 @@ async def get_output(self) -> OutputDataT:
552497
else:
553498
raise ValueError('No stream response or run result provided') # pragma: no cover
554499

555-
def get_output_sync(self) -> OutputDataT:
556-
"""Stream the whole response, validate and return it.
557-
558-
This is a convenience method that wraps [`get_output()`][pydantic_ai.result.StreamedRunResult.get_output] with `loop.run_until_complete(...)`.
559-
You therefore can't use this method inside async code or if there's an active event loop.
560-
"""
561-
return _utils.get_event_loop().run_until_complete(self.get_output())
562-
563500
@property
564501
def response(self) -> _messages.ModelResponse:
565502
"""Return the current state of the response."""
@@ -611,18 +548,6 @@ async def validate_response_output(
611548
else:
612549
raise ValueError('No stream response or run result provided') # pragma: no cover
613550

614-
def validate_response_output_sync(
615-
self, message: _messages.ModelResponse, *, allow_partial: bool = False
616-
) -> OutputDataT:
617-
"""Validate a structured result message.
618-
619-
This is a convenience method that wraps [`validate_response_output()`][pydantic_ai.result.StreamedRunResult.validate_response_output] with `loop.run_until_complete(...)`.
620-
You therefore can't use this method inside async code or if there's an active event loop.
621-
"""
622-
return _utils.get_event_loop().run_until_complete(
623-
self.validate_response_output(message, allow_partial=allow_partial)
624-
)
625-
626551
async def _marked_completed(self, message: _messages.ModelResponse | None = None) -> None:
627552
self.is_complete = True
628553
if message is not None:
@@ -631,6 +556,158 @@ async def _marked_completed(self, message: _messages.ModelResponse | None = None
631556
await self._on_complete()
632557

633558

559+
@dataclass(init=False)
560+
class StreamedRunResultSync(Generic[AgentDepsT, OutputDataT]):
561+
"""Synchronous wrapper for [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult] that only exposes sync methods."""
562+
563+
_streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT]
564+
565+
def __init__(self, streamed_run_result: StreamedRunResult[AgentDepsT, OutputDataT]) -> None:
566+
self._streamed_run_result = streamed_run_result
567+
568+
def all_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
569+
"""Return the history of messages.
570+
571+
Args:
572+
output_tool_return_content: The return content of the tool call to set in the last message.
573+
This provides a convenient way to modify the content of the output tool call if you want to continue
574+
the conversation and want to set the response to the output tool call. If `None`, the last message will
575+
not be modified.
576+
577+
Returns:
578+
List of messages.
579+
"""
580+
return self._streamed_run_result.all_messages(output_tool_return_content=output_tool_return_content)
581+
582+
def all_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes: # pragma: no cover
583+
"""Return all messages from [`all_messages`][pydantic_ai.result.StreamedRunResultSync.all_messages] as JSON bytes.
584+
585+
Args:
586+
output_tool_return_content: The return content of the tool call to set in the last message.
587+
This provides a convenient way to modify the content of the output tool call if you want to continue
588+
the conversation and want to set the response to the output tool call. If `None`, the last message will
589+
not be modified.
590+
591+
Returns:
592+
JSON bytes representing the messages.
593+
"""
594+
return self._streamed_run_result.all_messages_json(output_tool_return_content=output_tool_return_content)
595+
596+
def new_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
597+
"""Return new messages associated with this run.
598+
599+
Messages from older runs are excluded.
600+
601+
Args:
602+
output_tool_return_content: The return content of the tool call to set in the last message.
603+
This provides a convenient way to modify the content of the output tool call if you want to continue
604+
the conversation and want to set the response to the output tool call. If `None`, the last message will
605+
not be modified.
606+
607+
Returns:
608+
List of new messages.
609+
"""
610+
return self._streamed_run_result.new_messages(output_tool_return_content=output_tool_return_content)
611+
612+
def new_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes: # pragma: no cover
613+
"""Return new messages from [`new_messages`][pydantic_ai.result.StreamedRunResultSync.new_messages] as JSON bytes.
614+
615+
Args:
616+
output_tool_return_content: The return content of the tool call to set in the last message.
617+
This provides a convenient way to modify the content of the output tool call if you want to continue
618+
the conversation and want to set the response to the output tool call. If `None`, the last message will
619+
not be modified.
620+
621+
Returns:
622+
JSON bytes representing the new messages.
623+
"""
624+
return self._streamed_run_result.new_messages_json(output_tool_return_content=output_tool_return_content)
625+
626+
def stream_output(self, *, debounce_by: float | None = 0.1) -> Iterator[OutputDataT]:
627+
"""Stream the output as an iterable.
628+
629+
The pydantic validator for structured data will be called in
630+
[partial mode](https://docs.pydantic.dev/dev/concepts/experimental/#partial-validation)
631+
on each iteration.
632+
633+
Args:
634+
debounce_by: by how much (if at all) to debounce/group the output chunks by. `None` means no debouncing.
635+
Debouncing is particularly important for long structured outputs to reduce the overhead of
636+
performing validation as each token is received.
637+
638+
Returns:
639+
An iterable of the response data.
640+
"""
641+
return _utils.sync_async_iterator(self._streamed_run_result.stream_output(debounce_by=debounce_by))
642+
643+
def stream_text(self, *, delta: bool = False, debounce_by: float | None = 0.1) -> Iterator[str]:
644+
"""Stream the text result as an iterable.
645+
646+
!!! note
647+
Result validators will NOT be called on the text result if `delta=True`.
648+
649+
Args:
650+
delta: if `True`, yield each chunk of text as it is received, if `False` (default), yield the full text
651+
up to the current point.
652+
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
653+
Debouncing is particularly important for long structured responses to reduce the overhead of
654+
performing validation as each token is received.
655+
"""
656+
return _utils.sync_async_iterator(self._streamed_run_result.stream_text(delta=delta, debounce_by=debounce_by))
657+
658+
def stream_responses(self, *, debounce_by: float | None = 0.1) -> Iterator[tuple[_messages.ModelResponse, bool]]:
659+
"""Stream the response as an iterable of Structured LLM Messages.
660+
661+
Args:
662+
debounce_by: by how much (if at all) to debounce/group the response chunks by. `None` means no debouncing.
663+
Debouncing is particularly important for long structured responses to reduce the overhead of
664+
performing validation as each token is received.
665+
666+
Returns:
667+
An iterable of the structured response message and whether that is the last message.
668+
"""
669+
return _utils.sync_async_iterator(self._streamed_run_result.stream_responses(debounce_by=debounce_by))
670+
671+
def get_output(self) -> OutputDataT:
672+
"""Stream the whole response, validate and return it."""
673+
return _utils.get_event_loop().run_until_complete(self._streamed_run_result.get_output())
674+
675+
@property
676+
def response(self) -> _messages.ModelResponse:
677+
"""Return the current state of the response."""
678+
return self._streamed_run_result.response
679+
680+
def usage(self) -> RunUsage:
681+
"""Return the usage of the whole run.
682+
683+
!!! note
684+
This won't return the full usage until the stream is finished.
685+
"""
686+
return self._streamed_run_result.usage()
687+
688+
def timestamp(self) -> datetime:
689+
"""Get the timestamp of the response."""
690+
return self._streamed_run_result.timestamp()
691+
692+
def validate_response_output(self, message: _messages.ModelResponse, *, allow_partial: bool = False) -> OutputDataT:
693+
"""Validate a structured result message."""
694+
return _utils.get_event_loop().run_until_complete(
695+
self._streamed_run_result.validate_response_output(message, allow_partial=allow_partial)
696+
)
697+
698+
@property
699+
def is_complete(self) -> bool:
700+
"""Whether the stream has all been received.
701+
702+
This is set to `True` when one of
703+
[`stream_output`][pydantic_ai.result.StreamedRunResultSync.stream_output],
704+
[`stream_text`][pydantic_ai.result.StreamedRunResultSync.stream_text],
705+
[`stream_responses`][pydantic_ai.result.StreamedRunResultSync.stream_responses] or
706+
[`get_output`][pydantic_ai.result.StreamedRunResultSync.get_output] completes.
707+
"""
708+
return self._streamed_run_result.is_complete
709+
710+
634711
@dataclass(repr=False)
635712
class FinalResult(Generic[OutputDataT]):
636713
"""Marker class storing the final output of an agent run and associated metadata."""

0 commit comments

Comments
 (0)