Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic_ai.fastapi.agent_router import AgentAPIRouter
from pydantic_ai.fastapi.registry import AgentRegistry

__all__ = [
'AgentRegistry',
'AgentAPIRouter',
]
84 changes: 84 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/agent_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Any

try:
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.model import Model
from openai.types.responses import Response as OpenAIResponse
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai.fastapi.api import AgentChatCompletionsAPI, AgentModelsAPI, AgentResponsesAPI
from pydantic_ai.fastapi.data_models import (
ChatCompletionRequest,
ModelsResponse,
ResponsesRequest,
)
from pydantic_ai.fastapi.registry import AgentRegistry


class AgentAPIRouter(APIRouter):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to implement our own APIRouter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use the registry, the registry needs to be set in the APIs structs, to access the correct agent based on the defined model.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you can create a factory that creates the APIRouter and includes the routes based on conditionals.

"""FastAPI Router for Pydantic Agent."""

def __init__(
self,
agent_registry: AgentRegistry,
disable_response_api: bool = False,
disable_completions_api: bool = False,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self.registry = agent_registry
self.responses_api = AgentResponsesAPI(self.registry)
self.completions_api = AgentChatCompletionsAPI(self.registry)
self.models_api = AgentModelsAPI(self.registry)
self.enable_responses_api = not disable_response_api
self.enable_completions_api = not disable_completions_api

# Registers OpenAI/v1 API routes
self._register_routes()

def _register_routes(self) -> None:
if self.enable_completions_api:

@self.post('/v1/chat/completions', response_model=ChatCompletion)
async def chat_completions(
request: ChatCompletionRequest,
) -> ChatCompletion | StreamingResponse:
if getattr(request, 'stream', False):
return StreamingResponse(
self.completions_api.create_streaming_completion(request),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'text/plain; charset=utf-8',
},
)
else:
return await self.completions_api.create_completion(request)

if self.enable_responses_api:

@self.post('/v1/responses', response_model=OpenAIResponse)
async def responses(
request: ResponsesRequest,
) -> OpenAIResponse:
if getattr(request, 'stream', False):
# TODO: add streaming support for responses api
raise HTTPException(status_code=501)
else:
return await self.responses_api.create_response(request)

@self.get('/v1/models', response_model=ModelsResponse)
async def get_models() -> ModelsResponse:
return await self.models_api.list_models()

@self.get('/v1/models' + '/{model_id}', response_model=Model)
async def get_model(model_id: str) -> Model:
return await self.models_api.get_model(model_id)
9 changes: 9 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/__init__.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module shouldn't be called fastapi. The underlying package doesn't matter.

Copy link

@ion-elgreco ion-elgreco Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are open to better name suggestions! Internally we called this differently

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic_ai.fastapi.api.completions import AgentChatCompletionsAPI
from pydantic_ai.fastapi.api.models import AgentModelsAPI
from pydantic_ai.fastapi.api.responses import AgentResponsesAPI

__all__ = [
'AgentChatCompletionsAPI',
'AgentModelsAPI',
'AgentResponsesAPI',
]
121 changes: 121 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/completions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import time
from collections.abc import AsyncGenerator
from typing import Any

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.chat.chat_completion import ChatCompletion
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice as Chunkhoice, ChoiceDelta
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic import TypeAdapter

from pydantic_ai import Agent, _utils
from pydantic_ai.fastapi.convert import (
openai_chat_completions_2pai,
pai_result_to_openai_completions,
)
from pydantic_ai.fastapi.data_models import ChatCompletionRequest, ErrorResponse
from pydantic_ai.fastapi.registry import AgentRegistry
from pydantic_ai.settings import ModelSettings


class AgentChatCompletionsAPI:
"""Chat completions API openai <-> pydantic-ai conversion."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

def get_agent(self, name: str) -> Agent:
"""Retrieves agent."""
try:
agent = self.registry.get_completions_agent(name)
except KeyError:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
message=f'Model {name} is not available as chat completions API',
type='not_found_error',
),
).model_dump(),
)

return agent

async def create_completion(self, request: ChatCompletionRequest) -> ChatCompletion:
"""Create a non-streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)

model_settings_ta = TypeAdapter(ModelSettings)
messages = openai_chat_completions_2pai(messages=request.messages)

async with agent:
result = await agent.run(
message_history=messages,
model_settings=model_settings_ta.validate_python(
{k: v for k, v in request.model_dump().items() if v is not None},
),
)

return pai_result_to_openai_completions(
result=result,
model=model_name,
)

async def create_streaming_completion(self, request: ChatCompletionRequest) -> AsyncGenerator[str]:
"""Create a streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)
messages = openai_chat_completions_2pai(messages=request.messages)

role_sent = False

async with (
agent,
agent.run_stream(
message_history=messages,
) as result,
):
async for chunk in result.stream_text(delta=True):
delta = ChoiceDelta(
role='assistant' if not role_sent else None,
content=chunk,
)
role_sent = True

stream_response = ChatCompletionChunk(
id=f'chatcmpl-{_utils.now_utc().isoformat()}',
created=int(_utils.now_utc().timestamp()),
model=model_name,
object='chat.completion.chunk',
choices=[
Chunkhoice(
index=0,
delta=delta,
),
],
)

yield f'data: {stream_response.model_dump_json()}\n\n'

final_chunk: dict[str, Any] = {
'id': f'chatcmpl-{int(time.time())}',
'object': 'chat.completion.chunk',
'model': model_name,
'choices': [
{
'index': 0,
'delta': {},
'finish_reason': 'stop',
},
],
}
yield f'data: {json.dumps(final_chunk)}\n\n'
54 changes: 54 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import time

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.model import Model
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai.fastapi.data_models import (
ErrorResponse,
ModelsResponse,
)
from pydantic_ai.fastapi.registry import AgentRegistry


class AgentModelsAPI:
"""Models API for pydantic-ai agents."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

async def list_models(self) -> ModelsResponse:
"""List available models (OpenAI-compatible endpoint)."""
agents = self.registry.all_agents

models = [
Model(
id=name,
object='model',
created=int(time.time()),
owned_by='model_owner',
)
for name in agents
]
return ModelsResponse(data=models)

async def get_model(self, name: str) -> Model:
"""Get information about a specific model (OpenAI-compatible endpoint)."""
if name in self.registry.all_agents:
return Model(id=name, object='model', created=int(time.time()), owned_by='NDIA')
else:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
type='not_found_error',
message=f"Model '{name}' not found",
),
).model_dump(),
)
68 changes: 68 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/api/responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from collections.abc import AsyncGenerator

try:
from fastapi import HTTPException
from openai.types import ErrorObject
from openai.types.responses import Response
except ImportError as _import_error: # pragma: no cover
raise ImportError(
'Please install the `openai` and `fastapi` packages to enable the fastapi openai compatible endpoint, '
'you can use the `chat-completion` optional group — `pip install "pydantic-ai-slim[chat-completion]"`'
) from _import_error

from pydantic_ai import Agent
from pydantic_ai.fastapi.convert import (
openai_responses_input_to_pai,
pai_result_to_openai_responses,
)
from pydantic_ai.fastapi.data_models import ErrorResponse, ResponsesRequest
from pydantic_ai.fastapi.registry import AgentRegistry
from pydantic_ai.models.openai import (
OpenAIResponsesModelSettings,
)


class AgentResponsesAPI:
"""Responses API openai <-> pydantic-ai conversion."""

def __init__(self, registry: AgentRegistry) -> None:
self.registry = registry

def get_agent(self, name: str) -> Agent:
"""Retrieves agent."""
try:
agent = self.registry.get_responses_agent(name)
except KeyError:
raise HTTPException(
status_code=404,
detail=ErrorResponse(
error=ErrorObject(
message=f'Model {name} is not available as responses API',
type='not_found_error',
),
).model_dump(),
)

return agent

async def create_response(self, request: ResponsesRequest) -> Response:
"""Create a non-streaming chat completion."""
model_name = request.model
agent = self.get_agent(model_name)

model_settings = OpenAIResponsesModelSettings(openai_previous_response_id='auto')
messages = openai_responses_input_to_pai(items=request.input)

async with agent:
result = await agent.run(
message_history=messages,
model_settings=model_settings,
)
return pai_result_to_openai_responses(
result=result,
model=model_name,
)

async def create_streaming_response(self, request: ResponsesRequest) -> AsyncGenerator[str]:
"""Create a streaming chat completion."""
raise NotImplementedError
13 changes: 13 additions & 0 deletions pydantic_ai_slim/pydantic_ai/fastapi/convert/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pydantic_ai.fastapi.convert.convert_messages import (
openai_chat_completions_2pai,
openai_responses_input_to_pai,
pai_result_to_openai_completions,
pai_result_to_openai_responses,
)

__all__ = [
'openai_chat_completions_2pai',
'openai_responses_input_to_pai',
'pai_result_to_openai_completions',
'pai_result_to_openai_responses',
]
Loading
Loading