Skip to content

Do output_type functions support streaming input?Β #3473

@Danipulok

Description

@Danipulok

Question

Start

Hey here!
Thank you very much for such a good library!

I've read the docs, but didn't seem to find any info on streaming in output_types function?
Is it supported, or what's the intended method to stream output result inside functions?

Our use case:

We always use output types (we call them output functions) to stream to UI

Example:

class CarouselItem(BaseModel):
    text: str | None = None
    url: HttpUrl | None = None

class CarouselMessage(BaseModel):
    text: str | None = None  # to enable streaming from the start
    items: list[CarouselItem] | None = None  # to enable streaming from the start

async def send_carousel_message(message: CarouselMessage, ctx: Ctx) -> None: ...
    await ctx.ui.send_message(message)

Is there a way to do similar but with PydanticAI?

MRE

Here's some MRE I wrote to show what I would like to get:

import asyncio

from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider


class CityLocation(BaseModel):
    city: str | None = None
    country: str | None = None


async def show_message(
    data: CityLocation,
) -> None:
    """Display the city location information."""
    print(f"City: {data.city}, Country: {data.country}")  # currently there's only one print and result is not streamed to here


async def main() -> None:
    """Run the agent."""
    api_key = "..."
    model = OpenAIChatModel(
        "gpt-4o",
        provider=OpenAIProvider(api_key=api_key),
    )
    agent = Agent(model, output_type=show_message)
    await agent.run("Where were the olympics held in 2012?")


if __name__ == "__main__":
    asyncio.run(main())

Motivation

IMHO it would be really useful to incapsulate this logic inside functions, and do not have something like:

output_objects = list[Function1, Function2, Function3]
output_type = Union[*output_objects]
async with agent.run_stream(...) as result:
    output_message = TypeAdapter(output_type).movel_validate(result.output)
    await channel.send(output_message)

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions