From 23eb91d1952f8d4eb7e2af10538ef07177d8098b Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Mon, 28 Jul 2025 21:22:46 +0500 Subject: [PATCH 1/9] add openai batch support --- .../pydantic_ai/batches/__init__.py | 10 + .../pydantic_ai/batches/openai.py | 444 ++++++++++++++++++ 2 files changed, 454 insertions(+) create mode 100644 pydantic_ai_slim/pydantic_ai/batches/__init__.py create mode 100644 pydantic_ai_slim/pydantic_ai/batches/openai.py diff --git a/pydantic_ai_slim/pydantic_ai/batches/__init__.py b/pydantic_ai_slim/pydantic_ai/batches/__init__.py new file mode 100644 index 000000000..8378520f2 --- /dev/null +++ b/pydantic_ai_slim/pydantic_ai/batches/__init__.py @@ -0,0 +1,10 @@ +# Re-export key components from submodules for convenience +from .openai import BatchJob, BatchRequest, BatchResult, OpenAIBatchModel, create_chat_request + +__all__ = ( + 'OpenAIBatchModel', + 'BatchRequest', + 'BatchJob', + 'BatchResult', + 'create_chat_request', +) diff --git a/pydantic_ai_slim/pydantic_ai/batches/openai.py b/pydantic_ai_slim/pydantic_ai/batches/openai.py new file mode 100644 index 000000000..5425a73f2 --- /dev/null +++ b/pydantic_ai_slim/pydantic_ai/batches/openai.py @@ -0,0 +1,444 @@ +from __future__ import annotations as _annotations + +import io +import json +from dataclasses import dataclass, field +from typing import Any, Literal, cast + +from .. import ModelHTTPError +from ..messages import UserPromptPart +from ..models import KnownModelName, Model, check_allow_model_requests +from ..models.openai import OpenAIModel +from ..models.wrapper import WrapperModel + +try: + from openai import APIStatusError, AsyncOpenAI +except ImportError as _import_error: + raise ImportError('Please install "pydantic-ai-slim[openai]"`') from _import_error + + +def create_chat_request( + custom_id: str, + prompt: str | UserPromptPart | list[UserPromptPart], + model: str, + max_tokens: int | None = None, + temperature: float | None = None, + system_prompt: str | None = None, +) -> BatchRequest: + """Create a chat completion batch request with pydantic-ai style parameters. + + Args: + custom_id: Unique identifier for this request + prompt: User prompt (string or UserPromptPart) + model: Model name (e.g., "gpt-4o-mini") + max_tokens: Maximum tokens to generate + temperature: Sampling temperature + system_prompt: Optional system prompt + + Returns: + BatchRequest: Configured batch request + + Example: + ```python + from pydantic_ai.batches.openai import create_chat_request + + requests = [ + create_chat_request("req-1", "What is 2+2?", "gpt-4o-mini", max_tokens=50), + create_chat_request("req-2", "Write a haiku", "gpt-4o-mini", max_tokens=100), + ] + ``` + """ + # Build messages list + messages: list[dict[str, Any]] = [] + + if system_prompt: + messages.append({'role': 'system', 'content': system_prompt}) + + # Handle different prompt types + if isinstance(prompt, str): + messages.append({'role': 'user', 'content': prompt}) + elif isinstance(prompt, UserPromptPart): + # Convert UserPromptPart to message format (simplified) + messages.append({'role': 'user', 'content': str(prompt)}) + else: + # Handle list of UserPromptParts (prompt is list[UserPromptPart]) + content_parts: list[str] = [] + for part in prompt: + content_parts.append(str(part)) + messages.append({'role': 'user', 'content': ' '.join(content_parts)}) + + # Build request body + body: dict[str, Any] = { + 'model': model, + 'messages': messages, + 'max_tokens': max_tokens, + } + + if temperature is not None: + body['temperature'] = temperature + + return BatchRequest(custom_id=custom_id, body=body) + + +__all__ = ( + 'BatchRequest', + 'BatchJob', + 'BatchResult', + 'OpenAIBatchModel', + 'create_chat_request', +) + + +@dataclass +class BatchRequest: + """Single request for batch processing.""" + + custom_id: str + method: str = 'POST' + url: str = '/v1/chat/completions' + body: dict[str, Any] = field(default_factory=lambda: {}) + + +@dataclass +class BatchJob: + """Batch job information returned by OpenAI.""" + + id: str + object: str + endpoint: str + errors: dict[str, Any] | None + input_file_id: str + completion_window: str + status: Literal[ + 'validating', + 'failed', + 'in_progress', + 'finalizing', + 'completed', + 'expired', + 'cancelling', + 'cancelled', + ] + output_file_id: str | None + error_file_id: str | None + created_at: int + in_progress_at: int | None = None + expires_at: int | None = None + finalizing_at: int | None = None + completed_at: int | None = None + failed_at: int | None = None + expired_at: int | None = None + cancelling_at: int | None = None + cancelled_at: int | None = None + request_counts: dict[str, int] | None = None + metadata: dict[str, str] | None = None + + +@dataclass +class BatchResult: + """Single result from a batch job.""" + + id: str + custom_id: str | None + response: dict[str, Any] | None + error: dict[str, Any] | None + + +@dataclass(init=False) +class OpenAIBatchModel(WrapperModel): + """A wrapper that adds batch processing capabilities to OpenAI models. + + This model wraps any OpenAI model and adds batch processing methods while preserving + all the original functionality. Provides 50% cost savings compared to synchronous + API calls with a 24-hour processing window. + + Example: + ```python + import asyncio + from pydantic_ai.batches.openai import OpenAIBatchModel, BatchRequest + from pydantic_ai.messages import UserPrompt + + async def main(): + # Create directly from model name + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + # Use as regular model + messages = [UserPrompt("Hello")] + response = await batch_model.request(messages) + + # Or use batch functionality + requests = [BatchRequest(custom_id="1", body={"model": "gpt-4o-mini", "messages": []})] + batch_id = await batch_model.batch_create_job(requests) + ``` + """ + + def __init__(self, wrapped: Model | KnownModelName): + """Initialize OpenAI batch model. + + Args: + wrapped: OpenAI model to wrap, or model name string like 'openai:gpt-4o' + + Raises: + ValueError: If the wrapped model is not an OpenAI model + """ + super().__init__(wrapped) + + # Verify this is an OpenAI model that has a client + if not isinstance(self.wrapped, OpenAIModel): + raise ValueError( + f'OpenAIBatchModel requires an OpenAI model, got {type(self.wrapped).__name__}. ' + f"Use models from pydantic_ai.models.openai or model strings like 'openai:gpt-4o'." + ) + + @property + def client(self) -> AsyncOpenAI: + """Get the OpenAI client from the wrapped model.""" + return cast(OpenAIModel, self.wrapped).client + + async def batch_create_job( + self, + requests: list[BatchRequest], + endpoint: str = '/v1/chat/completions', + completion_window: str = '24h', + metadata: dict[str, str] | None = None, + ) -> str: + """Create a batch job with multiple requests. + + Args: + requests: List of batch requests to process + endpoint: OpenAI API endpoint (default: "/v1/chat/completions") + completion_window: Processing window (default: "24h") + metadata: Optional metadata for the batch + + Returns: + batch_id: The ID of the created batch job + + Raises: + ModelHTTPError: If the API request fails + """ + check_allow_model_requests() + + # Convert requests to JSONL format + jsonl_lines: list[str] = [] + for req in requests: + jsonl_lines.append( + json.dumps( + { + 'custom_id': req.custom_id, + 'method': req.method, + 'url': req.url, + 'body': req.body, + } + ) + ) + + jsonl_content = '\n'.join(jsonl_lines) + + try: + # Upload file + batch_file = await self.client.files.create(file=io.BytesIO(jsonl_content.encode('utf-8')), purpose='batch') + + # Create batch job + batch_job = await self.client.batches.create( + input_file_id=batch_file.id, + endpoint=cast(Any, endpoint), # OpenAI SDK has strict Literal types + completion_window=cast(Any, completion_window), + metadata=metadata or {}, + ) + + return batch_job.id + + except APIStatusError as e: + if (status_code := e.status_code) >= 400: + raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e + raise # pragma: no cover + + async def batch_retrieve_job(self, batch_id: str) -> BatchJob: + """Retrieve the status and details of a batch job. + + Args: + batch_id: The ID of the batch job + + Returns: + BatchJob: Complete batch job information + + Raises: + ModelHTTPError: If the API request fails + """ + check_allow_model_requests() + + try: + batch = await self.client.batches.retrieve(batch_id) + + return BatchJob( + id=batch.id, + object=batch.object, + endpoint=batch.endpoint, + errors=batch.errors.model_dump() if batch.errors else None, + input_file_id=batch.input_file_id, + completion_window=batch.completion_window, + status=batch.status, + output_file_id=batch.output_file_id, + error_file_id=batch.error_file_id, + created_at=batch.created_at, + in_progress_at=batch.in_progress_at, + expires_at=batch.expires_at, + finalizing_at=batch.finalizing_at, + completed_at=batch.completed_at, + failed_at=batch.failed_at, + expired_at=batch.expired_at, + cancelling_at=batch.cancelling_at, + cancelled_at=batch.cancelled_at, + request_counts=(batch.request_counts.model_dump() if batch.request_counts else None), + metadata=batch.metadata, + ) + + except APIStatusError as e: + if (status_code := e.status_code) >= 400: + raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e + raise # pragma: no cover + + async def batch_get_results(self, batch_id: str) -> list[BatchResult]: + """Get the results of a completed batch job. + + Args: + batch_id: The ID of the batch job + + Returns: + list[BatchResult]: List of batch results + + Raises: + ValueError: If batch is not completed or has no output file + ModelHTTPError: If the API request fails + """ + check_allow_model_requests() + + # First check if batch is completed + batch_info = await self.batch_retrieve_job(batch_id) + + if batch_info.status != 'completed': + raise ValueError(f'Batch {batch_id} is not completed. Status: {batch_info.status}') + + if batch_info.output_file_id is None: + raise ValueError(f'Batch {batch_id} has no output file') + + try: + # Download and parse results + file_response = await self.client.files.content(batch_info.output_file_id) + file_content = file_response.read() + + results: list[BatchResult] = [] + for line in file_content.decode('utf-8').strip().split('\n'): + if line.strip(): + result_data = json.loads(line) + results.append( + BatchResult( + id=result_data.get('id', ''), + custom_id=result_data.get('custom_id'), + response=result_data.get('response'), + error=result_data.get('error'), + ) + ) + + return results + + except APIStatusError as e: + if (status_code := e.status_code) >= 400: + raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e + raise # pragma: no cover + + async def batch_cancel_job(self, batch_id: str) -> BatchJob: + """Cancel a batch job. + + Args: + batch_id: The ID of the batch job to cancel + + Returns: + BatchJob: Updated batch job information + + Raises: + ModelHTTPError: If the API request fails + """ + check_allow_model_requests() + + try: + batch = await self.client.batches.cancel(batch_id) + + return BatchJob( + id=batch.id, + object=batch.object, + endpoint=batch.endpoint, + errors=batch.errors.model_dump() if batch.errors else None, + input_file_id=batch.input_file_id, + completion_window=batch.completion_window, + status=batch.status, + output_file_id=batch.output_file_id, + error_file_id=batch.error_file_id, + created_at=batch.created_at, + in_progress_at=batch.in_progress_at, + expires_at=batch.expires_at, + finalizing_at=batch.finalizing_at, + completed_at=batch.completed_at, + failed_at=batch.failed_at, + expired_at=batch.expired_at, + cancelling_at=batch.cancelling_at, + cancelled_at=batch.cancelled_at, + request_counts=(batch.request_counts.model_dump() if batch.request_counts else None), + metadata=batch.metadata, + ) + + except APIStatusError as e: + if (status_code := e.status_code) >= 400: + raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e + raise # pragma: no cover + + async def batch_list_jobs(self, limit: int = 20) -> list[BatchJob]: + """List all batch jobs. + + Args: + limit: Maximum number of jobs to return + + Returns: + list[BatchJob]: List of batch jobs + + Raises: + ModelHTTPError: If the API request fails + """ + check_allow_model_requests() + + try: + batches = await self.client.batches.list(limit=limit) + + jobs: list[BatchJob] = [] + for batch in batches.data: + jobs.append( + BatchJob( + id=batch.id, + object=batch.object, + endpoint=batch.endpoint, + errors=batch.errors.model_dump() if batch.errors else None, + input_file_id=batch.input_file_id, + completion_window=batch.completion_window, + status=batch.status, + output_file_id=batch.output_file_id, + error_file_id=batch.error_file_id, + created_at=batch.created_at, + in_progress_at=batch.in_progress_at, + expires_at=batch.expires_at, + finalizing_at=batch.finalizing_at, + completed_at=batch.completed_at, + failed_at=batch.failed_at, + expired_at=batch.expired_at, + cancelling_at=batch.cancelling_at, + cancelled_at=batch.cancelled_at, + request_counts=(batch.request_counts.model_dump() if batch.request_counts else None), + metadata=batch.metadata, + ) + ) + + return jobs + + except APIStatusError as e: + if (status_code := e.status_code) >= 400: + raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e + raise # pragma: no cover From 758bcfcc4debb7397e8b981df8c4c1e66b12b1b2 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 00:33:23 +0500 Subject: [PATCH 2/9] add documentation, tests, refactor bathes/openai code --- docs/batches/index.md | 48 ++ docs/batches/openai.md | 433 ++++++++++++++++++ docs/models/openai.md | 6 + mkdocs.yml | 3 + .../pydantic_ai/batches/openai.py | 239 ++++++++-- tests/batches/test_openai_batch.py | 380 +++++++++++++++ 6 files changed, 1075 insertions(+), 34 deletions(-) create mode 100644 docs/batches/index.md create mode 100644 docs/batches/openai.md create mode 100644 tests/batches/test_openai_batch.py diff --git a/docs/batches/index.md b/docs/batches/index.md new file mode 100644 index 000000000..034892afa --- /dev/null +++ b/docs/batches/index.md @@ -0,0 +1,48 @@ +# Batch Processing + +Pydantic AI supports batch processing through provider-specific implementations that allow you to process multiple requests cost-effectively with only one request. + +!!! info "About batch processing" + *Batch processing is very useful when you want to process large datasets and make multiple calls to an LLM provider. You don't need to process them with loops - just create a batch, save `batch_id` somewhere and forget about it until it's ready. Alternatively, it's also useful if you're willing to wait up to 24 hours for a response.* + +## OpenAI Batch API + +The OpenAI Batch API provides 50% cost savings compared to default API calls with a 24-hour processing window. Batch jobs require at least 2 requests. + +[Learn more about OpenAI Batch API →](openai.md) + +## Key Benefits + +- **Cost Savings**: Up to 50% reduction in API costs +- **Bulk Processing**: Handle hundreds or thousands of requests efficiently +- **Async Processing**: Submit jobs and retrieve results when ready +- **Tool Support**: Full support for tools +- **Structured Output**: All output modes (native, tool, prompted) supported + +!!! warning "Important: Tool Usage with Batch API" + When using tools with batch processing, the AI model returns tool call **requests** rather than executing tools automatically. You need to: + + 1. **Check batch results** for `tool_calls` instead of direct responses + 2. **Execute the tools manually** in your application code + 3. **Submit follow-up requests** with tool responses to get final AI answers + + This differs from the regular Agent API which handles tool execution automatically. + +## Use Cases + +Batch processing is ideal for: + +- **Data Analysis**: Processing large datasets with LLM analysis +- **Content Generation**: Bulk content creation and transformation +- **Evaluation**: Running evaluations across multiple test cases +- **A/B Testing**: Comparing different prompts or models +- **Bulk Translation**: Translating multiple documents +- **Report Generation**: Creating reports from structured data + +## Available Providers + +Currently supported batch processing providers: + +- **OpenAI**: batch API support with tools and structured outputs + +Additional providers may be added in future releases. diff --git a/docs/batches/openai.md b/docs/batches/openai.md new file mode 100644 index 000000000..ef2398b41 --- /dev/null +++ b/docs/batches/openai.md @@ -0,0 +1,433 @@ +# OpenAI Batch API + +The OpenAI Batch API allows you to process multiple requests efficiently with 50% cost savings compared to synchronous API calls. Batch jobs are processed within a 24-hour window and require at least 2 requests. + +## Installation + +The batch API is included with the OpenAI integration: + +```bash +pip/uv-add "pydantic-ai-slim[openai]" +``` + +### Jupyter Notebook Setup + +If you're working in Jupyter notebooks, you'll need `nest_asyncio` to run async code in cells: + +```bash +pip/uv-add nest-asyncio +``` + +Then add this at the start of your notebook: + +```python +import nest_asyncio +nest_asyncio.apply() +``` +(**Recommendation:** Use dated model versions (e.g., `gpt-4.1-2025-04-14`) instead of aliases (`gpt-4.1`) for faster batch processing) + +## Basic Usage + +### Creating and Submitting a Batch Job + +```python +import asyncio +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request + +async def basic_batch_example(): + # Initialize batch model + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + # Create batch requests (minimum 2 required) + requests = [ + create_chat_request( + custom_id='math-question', + prompt='What is 2+2?', + model='gpt-4o-mini', + max_tokens=50 + ), + create_chat_request( + custom_id='creative-writing', + prompt='Write a short poem about coding', + model='gpt-4o-mini', + max_tokens=100, + temperature=0.8 + ), + create_chat_request( + custom_id='explanation', + prompt='Explain quantum computing in one sentence', + model='gpt-4o-mini', + max_tokens=75 + ), + ] + + # Submit batch job + batch_id = await batch_model.batch_create_job( + requests=requests, + # provide metadata (optional, if supported) + metadata={'project': 'my-batch-job', 'version': '1.0'} + ) + print(f'Batch job created: {batch_id}') + + return batch_id + +asyncio.run(basic_batch_example()) +``` +_(This example is complete, it can be run "as is" )_ + +### Checking Job Status + +```python +async def check_batch_status(batch_id: str): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + # Get job information + job_info = await batch_model.batch_get_status(batch_id) + + print(f'Status: {job_info.status}') + print(f'Created at: {job_info.created_at}') + print(f'Completion window: {job_info.completion_window}') + + if job_info.request_counts: + print(f'Progress: {job_info.request_counts}') + + return job_info.status +``` +_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ + +### Getting Results + +```python +from pydantic_ai.batches.openai import OpenAIBatchModel +import asyncio + +async def get_batch_results(batch_id: str): + batch_model = OpenAIBatchModel('openai:gpt-4.1-mini') + + # Check if completed + job_info = await batch_model.batch_get_status(batch_id) + if job_info.status != 'completed': + print(f'Job not ready. Status: {job_info.status}') + return + + # Get results + results = await batch_model.batch_retrieve_job(batch_id) + + for result in results: + print(f'\n{result.custom_id}:') + if result.error: + print(f' Error: {result.error}') + elif result.response: + # Extract content from nested response structure + content = result.output + print(f' Response: {content}') +``` +_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(get_batch_results(batch_id))` to run it with your `batch_id`)_ + +## Using Tools + +You can include tools in batch requests by extracting `ToolDefinition` objects from pydantic-ai `Tool` instances: + +```python +import asyncio +from pydantic_ai import RunContext +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request +from pydantic_ai.tools import Tool + +# Define tool functions +def get_weather(ctx: RunContext[None], location: str, units: str = "celsius") -> str: + """Get current weather information for a location.""" + # In real implementation, this would call a weather API + return f"Weather in {location}: 22°{units[0].upper()}, sunny, 60% humidity" + +def calculate(ctx: RunContext[None], expression: str) -> str: + """Perform mathematical calculations.""" + try: + # In real implementation, use safe evaluation + result = eval(expression) # Don't use eval in production! + return f"Result: {result}" + except Exception as e: + return f"Error: {str(e)}" + +async def batch_with_tools(): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + # Create tools and extract definitions + weather_tool = Tool(get_weather) + calc_tool = Tool(calculate) + tools = [weather_tool.tool_def, calc_tool.tool_def] + + # Create requests with tools + requests = [ + create_chat_request( + custom_id='weather-tokyo', + prompt="What's the weather like in Tokyo?", + model='gpt-4o-mini', + tools=tools, + max_tokens=150 + ), + create_chat_request( + custom_id='calculation', + prompt="Calculate 15 * 23 + 7", + model='gpt-4o-mini', + tools=tools, + max_tokens=100 + ), + create_chat_request( + custom_id='weather-london', + prompt="Get weather for London, UK in fahrenheit", + model='gpt-4o-mini', + tools=tools, + max_tokens=150 + ), + ] + + # Submit batch job + batch_id = await batch_model.batch_create_job(requests) + print(f'Batch with tools submitted: {batch_id}') + + return batch_id + +asyncio.run(batch_with_tools()) +``` +_(This example is complete, it can be run "as is" )_ + +## Structured Output + +The batch API supports all structured output modes available in pydantic-ai: + +### Native Mode (Recommended) + +```python +from pydantic import BaseModel + +class WeatherResult(BaseModel): + location: str + temperature: float + condition: str + humidity: int + +async def batch_with_structured_output(): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + requests = [ + create_chat_request( + custom_id='structured-paris', + prompt='Get weather information for Paris and format it properly', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='native', # Uses OpenAI's structured output + max_tokens=200 + ), + create_chat_request( + custom_id='structured-tokyo', + prompt='Check weather in Tokyo and return structured data', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='native', + max_tokens=200 + ), + ] + + batch_id = await batch_model.batch_create_job(requests) + return batch_id +``` + +### Tool Mode + +```python +async def batch_with_tool_mode(): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + requests = [ + create_chat_request( + custom_id='tool-mode-1', + prompt='Analyze the weather in Berlin', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='tool', # Forces output through a tool call + max_tokens=200 + ), + create_chat_request( + custom_id='tool-mode-2', + prompt='Get weather data for Madrid', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='tool', + max_tokens=200 + ), + ] + + batch_id = await batch_model.batch_create_job(requests) + return batch_id +``` + +### Prompted Mode + +```python +async def batch_with_prompted_mode(): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + requests = [ + create_chat_request( + custom_id='prompted-1', + prompt='Get weather for Miami and return as JSON', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='prompted', # Adds schema to system prompt + max_tokens=250 + ), + create_chat_request( + custom_id='prompted-2', + prompt='Analyze Rome weather data', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='prompted', + max_tokens=250 + ), + ] + + batch_id = await batch_model.batch_create_job(requests) + return batch_id +``` + +### Getting result for tools (tool requests by LLM) + +```python +async def get_batch_results_with_tools(batch_id: str): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + # Check if completed + job_info = await batch_model.batch_get_status(batch_id) + if job_info.status != 'completed': + print(f'Job not ready. Status: {job_info.status}') + return + + # Get results + results = await batch_model.batch_retrieve_job(batch_id) + + for result in results: + print(f'\n{result.custom_id}:') + if result.error: + print(f' Error: {result.error}') + else: + # Use convenience properties instead of deep nested access + if result.output: + print(f' Response: {result.output}') + elif result.tool_calls: + print(f' Made {len(result.tool_calls)} tool calls:') + for i, tool_call in enumerate(result.tool_calls): + print(f'{tool_call["function"]["name"]}') +``` +_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(get_batch_results_with_tools(batch_id))` to run it with your `batch_id`)_ + +## Batch Management Methods + +The `OpenAIBatchModel` class provides several methods for managing batch jobs: + +### Core Methods + +- **`batch_create_job(requests, endpoint='...', completion_window='24h', metadata=None)`** + - Submit a new batch job (requires ≥2 requests) + - Returns the batch ID for tracking + +- **`batch_get_status(batch_id)`** + - Get current job status and details + - Returns `BatchJob` object with status, timestamps, and counts + +- **`batch_retrieve_job(batch_id)`** + - Download results from completed jobs + - Returns list of `BatchResult` objects + +- **`batch_cancel_job(batch_id)`** + - Cancel a pending or in-progress job + - Returns updated `BatchJob` information + +- **`batch_list_jobs(limit=20)`** + - List recent batch jobs + - Returns list of `BatchJob` objects + +### Job Status Values + +- `validating`: Job is being validated +- `in_progress`: Job is being processed +- `finalizing`: Job is being finalized +- `completed`: Job completed successfully +- `failed`: Job failed due to errors +- `expired`: Job expired before completion +- `cancelled`: Job was cancelled + +## Best Practices + +### Request Design + +- **Minimum Requirements**: Always include at least 2 requests +- **Custom IDs**: Use descriptive custom IDs for easy result identification + +### Error Handling + +```python +async def robust_batch_processing(): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + + try: + # Submit batch + batch_id = await batch_model.batch_create_job(requests) + + # Monitor with timeout + max_wait_time = 25 * 60 * 60 # 25 hours + start_time = time.time() + + while time.time() - start_time < max_wait_time: + job_info = await batch_model.batch_retrieve_job(batch_id) + + if job_info.status == 'completed': + results = await batch_model.batch_get_results(batch_id) + return results + elif job_info.status == 'failed': + print(f'Batch failed: {job_info.errors}') + break + + await asyncio.sleep(300) # Check every 5 minutes + + except Exception as e: + print(f'Batch processing error: {e}') + # Handle cleanup if needed +``` +## Troubleshooting + +### Common Issues + +1. **Minimum Request Error**: Ensure at least 2 requests in each batch +2. **Tool Definition Errors**: Extract `tool_def` from `Tool` instances correctly +3. **Response Structure**: Use correct nested path for response content +4. **Timeout Handling**: Jobs can take up to 24 hours to complete + +### Response Structure + +Batch results have a nested structure: + +```python +# Correct way to extract content +content = result.output # Use the convenience property + +# Alternative way to make it +content =result.response['body']['choices'][0]['message']['content'] # Manual access +``` + +### Debugging Tools + +```python +# Inspect request structure +print(f"Request body: {request.body}") + +# Check tool definitions +for tool in tools: + print(f"Tool: {tool.name} - {tool.description}") + +# Monitor job progress +if job_info.request_counts: + completed = job_info.request_counts.get('completed', 0) + total = job_info.request_counts.get('total', 0) + print(f"Progress: {completed}/{total}") +``` diff --git a/docs/models/openai.md b/docs/models/openai.md index c26e42ab3..863f51c58 100644 --- a/docs/models/openai.md +++ b/docs/models/openai.md @@ -142,6 +142,12 @@ As of 7:48 AM on Wednesday, April 2, 2025, in Tokyo, Japan, the weather is cloud You can learn more about the differences between the Responses API and Chat Completions API in the [OpenAI API docs](https://platform.openai.com/docs/guides/responses-vs-chat-completions). +## OpenAI Batch API + +Pydantic AI supports OpenAI's [Batch API](https://platform.openai.com/docs/guides/batch) for cost-effective bulk processing with 50% savings and 24-hour processing windows. + +[Learn more about Batch API →](../batches/openai.md) + ## OpenAI-compatible Models Many providers and models are compatible with the OpenAI API, and can be used with `OpenAIModel` in Pydantic AI. diff --git a/mkdocs.yml b/mkdocs.yml index 1860a00c0..cde4be64d 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -29,6 +29,9 @@ nav: - models/groq.md - models/mistral.md - models/huggingface.md + - Batches: + - batches/index.md + - batches/openai.md - dependencies.md - tools.md - toolsets.md diff --git a/pydantic_ai_slim/pydantic_ai/batches/openai.py b/pydantic_ai_slim/pydantic_ai/batches/openai.py index 5425a73f2..a00d3e668 100644 --- a/pydantic_ai_slim/pydantic_ai/batches/openai.py +++ b/pydantic_ai_slim/pydantic_ai/batches/openai.py @@ -6,15 +6,121 @@ from typing import Any, Literal, cast from .. import ModelHTTPError +from .._output import OutputSchema from ..messages import UserPromptPart from ..models import KnownModelName, Model, check_allow_model_requests -from ..models.openai import OpenAIModel +from ..models.openai import APIStatusError, AsyncOpenAI, OpenAIModel from ..models.wrapper import WrapperModel +from ..output import OutputSpec, StructuredOutputMode +from ..tools import ToolDefinition + +__all__ = ( + 'BatchRequest', + 'BatchJob', + 'BatchResult', + 'OpenAIBatchModel', + 'create_chat_request', +) + + +def _map_tool_definition(tool_def: ToolDefinition) -> dict[str, Any]: + """Convert a ToolDefinition to OpenAI tool parameter format.""" + return { + 'type': 'function', + 'function': { + 'name': tool_def.name, + 'description': tool_def.description or '', + 'parameters': tool_def.parameters_json_schema, + }, + } + + +def _build_messages( + prompt: str | UserPromptPart | list[UserPromptPart], + system_prompt: str | None = None, +) -> list[dict[str, Any]]: + """Build messages list from prompt and system prompt.""" + messages: list[dict[str, Any]] = [] + + if system_prompt: + messages.append({'role': 'system', 'content': system_prompt}) + + # Handle different prompt types + if isinstance(prompt, str): + messages.append({'role': 'user', 'content': prompt}) + elif isinstance(prompt, UserPromptPart): + # Convert UserPromptPart to message format (simplified) + messages.append({'role': 'user', 'content': str(prompt)}) + else: + # Handle list of UserPromptParts (prompt is list[UserPromptPart]) + content_parts: list[str] = [] + for part in prompt: + content_parts.append(str(part)) + messages.append({'role': 'user', 'content': ' '.join(content_parts)}) + + return messages + + +def _handle_native_output(output_schema: OutputSchema, body: dict[str, Any]) -> None: + """Handle native structured output mode.""" + from .._output import StructuredTextOutputSchema + + if isinstance(output_schema, StructuredTextOutputSchema): + object_def = output_schema.object_def + json_schema_dict: dict[str, Any] = { + 'name': object_def.name or 'response', + 'schema': object_def.json_schema, + } + if object_def.description: + json_schema_dict['description'] = object_def.description + if object_def.strict: + json_schema_dict['strict'] = True + + response_format = { + 'type': 'json_schema', + 'json_schema': json_schema_dict, + } + + body['response_format'] = response_format -try: - from openai import APIStatusError, AsyncOpenAI -except ImportError as _import_error: - raise ImportError('Please install "pydantic-ai-slim[openai]"`') from _import_error + +def _handle_tool_output(output_schema: OutputSchema, body: dict[str, Any]) -> None: + """Handle tool-based structured output mode.""" + if output_schema.toolset: + # Access tool definitions through the internal attribute (needed for batch mode) + # This is safe since we're in the same package + tool_defs = getattr(output_schema.toolset, '_tool_defs', []) + output_tools = [_map_tool_definition(tool_def) for tool_def in tool_defs] + if 'tools' in body: + body['tools'].extend(output_tools) + else: + body['tools'] = output_tools + + # Force tool usage for output + if len(output_tools) == 1: + body['tool_choice'] = { + 'type': 'function', + 'function': {'name': output_tools[0]['function']['name']}, + } + + +def _handle_prompted_output(output_schema: OutputSchema, body: dict[str, Any], system_prompt: str | None) -> None: + """Handle prompted structured output mode.""" + from .._output import PromptedOutputSchema + + if isinstance(output_schema, PromptedOutputSchema): + schema_instructions = output_schema.instructions('Respond with JSON that matches this schema:\n{schema}') + + # Add to system prompt or create one + if system_prompt: + enhanced_system_prompt = f'{system_prompt}\n\n{schema_instructions}' + else: + enhanced_system_prompt = schema_instructions + + # Update messages with enhanced system prompt + messages = [msg for msg in body['messages'] if msg.get('role') != 'system'] + messages.insert(0, {'role': 'system', 'content': enhanced_system_prompt}) + body['messages'] = messages def create_chat_request( @@ -24,6 +130,9 @@ def create_chat_request( max_tokens: int | None = None, temperature: float | None = None, system_prompt: str | None = None, + output_type: OutputSpec[Any] | None = None, + output_mode: StructuredOutputMode | None = None, + tools: list[ToolDefinition] | None = None, ) -> BatchRequest: """Create a chat completion batch request with pydantic-ai style parameters. @@ -34,38 +143,41 @@ def create_chat_request( max_tokens: Maximum tokens to generate temperature: Sampling temperature system_prompt: Optional system prompt + output_type: Structured output specification (Pydantic models, functions, etc.) + output_mode: Mode for structured output ('tool', 'native', 'prompted') + tools: List of tool definitions for the model to use Returns: BatchRequest: Configured batch request Example: ```python + from pydantic import BaseModel from pydantic_ai.batches.openai import create_chat_request + class Response(BaseModel): + answer: int + explanation: str + + # Simple text request requests = [ create_chat_request("req-1", "What is 2+2?", "gpt-4o-mini", max_tokens=50), - create_chat_request("req-2", "Write a haiku", "gpt-4o-mini", max_tokens=100), + ] + + # Structured output request + requests = [ + create_chat_request( + "req-2", + "What is 2+2?", + "gpt-4o-mini", + output_type=Response, + output_mode='native' + ), ] ``` """ # Build messages list - messages: list[dict[str, Any]] = [] - - if system_prompt: - messages.append({'role': 'system', 'content': system_prompt}) - - # Handle different prompt types - if isinstance(prompt, str): - messages.append({'role': 'user', 'content': prompt}) - elif isinstance(prompt, UserPromptPart): - # Convert UserPromptPart to message format (simplified) - messages.append({'role': 'user', 'content': str(prompt)}) - else: - # Handle list of UserPromptParts (prompt is list[UserPromptPart]) - content_parts: list[str] = [] - for part in prompt: - content_parts.append(str(part)) - messages.append({'role': 'user', 'content': ' '.join(content_parts)}) + messages = _build_messages(prompt, system_prompt) # Build request body body: dict[str, Any] = { @@ -77,16 +189,25 @@ def create_chat_request( if temperature is not None: body['temperature'] = temperature - return BatchRequest(custom_id=custom_id, body=body) + # Handle tools + if tools: + body['tools'] = [_map_tool_definition(tool) for tool in tools] + # Handle structured output + if output_type is not None: + if output_mode is None: + output_mode = 'tool' # Default mode -__all__ = ( - 'BatchRequest', - 'BatchJob', - 'BatchResult', - 'OpenAIBatchModel', - 'create_chat_request', -) + output_schema = OutputSchema.build(output_type, default_mode=output_mode) + + if output_schema.mode == 'native': + _handle_native_output(output_schema, body) + elif output_schema.mode == 'tool': + _handle_tool_output(output_schema, body) + elif output_schema.mode == 'prompted': + _handle_prompted_output(output_schema, body, system_prompt) + + return BatchRequest(custom_id=custom_id, body=body) @dataclass @@ -143,6 +264,56 @@ class BatchResult: response: dict[str, Any] | None error: dict[str, Any] | None + @property + def output(self) -> str | None: + """Get the text content from the response message. + + Returns: + The message content as a string, or None if not available. + """ + if not self.response: + return None + + try: + return self.response['body']['choices'][0]['message']['content'] + except (KeyError, IndexError, TypeError): + return None + + @property + def tool_calls(self) -> list[dict[str, Any]]: + """Get tool calls from the response message. + + Returns: + List of tool call objects, or empty list if none available. + """ + if not self.response: + return [] + + try: + message = self.response['body']['choices'][0]['message'] + return message.get('tool_calls', []) + except (KeyError, IndexError, TypeError): + return [] + + def get_tool_call_arguments(self, index: int = 0) -> dict[str, Any] | None: + """Get parsed arguments from a specific tool call. + + Args: + index: Index of the tool call (default: 0 for first call) + + Returns: + Parsed arguments as a dictionary, or None if not available. + """ + tool_calls = self.tool_calls + if not tool_calls or index >= len(tool_calls): + return None + + try: + args_json = tool_calls[index]['function']['arguments'] + return json.loads(args_json) + except (KeyError, json.JSONDecodeError, TypeError): + return None + @dataclass(init=False) class OpenAIBatchModel(WrapperModel): @@ -253,7 +424,7 @@ async def batch_create_job( raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e raise # pragma: no cover - async def batch_retrieve_job(self, batch_id: str) -> BatchJob: + async def batch_get_status(self, batch_id: str) -> BatchJob: """Retrieve the status and details of a batch job. Args: @@ -298,7 +469,7 @@ async def batch_retrieve_job(self, batch_id: str) -> BatchJob: raise ModelHTTPError(status_code=status_code, model_name=self.model_name, body=e.body) from e raise # pragma: no cover - async def batch_get_results(self, batch_id: str) -> list[BatchResult]: + async def batch_retrieve_job(self, batch_id: str) -> list[BatchResult]: """Get the results of a completed batch job. Args: @@ -314,7 +485,7 @@ async def batch_get_results(self, batch_id: str) -> list[BatchResult]: check_allow_model_requests() # First check if batch is completed - batch_info = await self.batch_retrieve_job(batch_id) + batch_info = await self.batch_get_status(batch_id) if batch_info.status != 'completed': raise ValueError(f'Batch {batch_id} is not completed. Status: {batch_info.status}') diff --git a/tests/batches/test_openai_batch.py b/tests/batches/test_openai_batch.py new file mode 100644 index 000000000..2affd4a61 --- /dev/null +++ b/tests/batches/test_openai_batch.py @@ -0,0 +1,380 @@ +"""Tests for OpenAI batch functionality based on real examples.""" + +from __future__ import annotations as _annotations + +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from pydantic import BaseModel + +from pydantic_ai import RunContext +from pydantic_ai.batches.openai import ( + BatchJob, + BatchResult, + OpenAIBatchModel, + create_chat_request, +) +from pydantic_ai.tools import Tool + + +class WeatherResult(BaseModel): + location: str + temperature: float + condition: str + humidity: int + + +def get_weather(ctx: RunContext[None], location: str, units: str = 'celsius') -> str: + """Get current weather information for a location.""" + return f'Weather in {location}: 22°{units[0].upper()}, sunny, 60% humidity' + + +def calculate(ctx: RunContext[None], expression: str) -> str: + """Perform mathematical calculations.""" + try: + result = eval(expression) # Don't use eval in production! + return f'Result: {result}' + except Exception as e: + return f'Error: {str(e)}' + + +class TestCreateChatRequest: + """Test create_chat_request function.""" + + def test_basic_chat_request(self): + """Test creating basic chat request like in batch_crt.py""" + request = create_chat_request( + custom_id='math-question', prompt='What is 2+2?', model='gpt-4o-mini', max_tokens=50 + ) + + assert request.custom_id == 'math-question' + assert request.body['model'] == 'gpt-4o-mini' + assert request.body['max_tokens'] == 50 + assert request.body['messages'] == [{'role': 'user', 'content': 'What is 2+2?'}] + + def test_creative_request_with_temperature(self): + """Test creative request with temperature like in batch_crt.py""" + request = create_chat_request( + custom_id='creative-writing', + prompt='Write a short poem about coding', + model='gpt-4o-mini', + max_tokens=100, + temperature=0.8, + ) + + assert request.custom_id == 'creative-writing' + assert request.body['temperature'] == 0.8 + assert request.body['max_tokens'] == 100 + + def test_request_with_tools(self): + """Test request with tools like in batch_tool.py""" + weather_tool = Tool(get_weather) + calc_tool = Tool(calculate) + tools = [weather_tool.tool_def, calc_tool.tool_def] + + request = create_chat_request( + custom_id='weather-tokyo', + prompt="What's the weather like in Tokyo?", + model='gpt-4o-mini', + tools=tools, + max_tokens=150, + ) + + assert request.custom_id == 'weather-tokyo' + assert 'tools' in request.body + assert len(request.body['tools']) == 2 + assert request.body['tools'][0]['function']['name'] == 'get_weather' + + def test_structured_output_native(self): + """Test structured output with native mode like in structured_output.py""" + request = create_chat_request( + custom_id='structured-paris', + prompt='Get weather information for Paris', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='native', + max_tokens=200, + ) + + assert request.custom_id == 'structured-paris' + assert 'response_format' in request.body + assert request.body['response_format']['type'] == 'json_schema' + + def test_structured_output_tool_mode(self): + """Test structured output with tool mode like in structured_output.py""" + request = create_chat_request( + custom_id='tool-mode-1', + prompt='Analyze the weather in Berlin', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='tool', + max_tokens=200, + ) + + assert request.custom_id == 'tool-mode-1' + assert 'tools' in request.body + assert 'tool_choice' in request.body + + +@pytest.fixture +def mock_openai_batch_model(monkeypatch: pytest.MonkeyPatch): + """Create a mocked OpenAI batch model.""" + monkeypatch.setattr('pydantic_ai.models.ALLOW_MODEL_REQUESTS', True) + + with patch('pydantic_ai.providers.openai.AsyncOpenAI') as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + model = OpenAIBatchModel('openai:gpt-4o-mini') + return model, mock_client + + +class TestBasicBatchWorkflow: + """Test basic batch workflow like in batch_crt.py examples.""" + + async def test_create_basic_batch_job(self, mock_openai_batch_model: Any) -> None: + """Test creating basic batch job with metadata.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch creation + mock_batch = MagicMock() + mock_batch.id = 'batch_test_123' + mock_client.batches.create = AsyncMock(return_value=mock_batch) + mock_client.files.create = AsyncMock(return_value=MagicMock(id='file_123')) + + # Create requests like in batch_crt.py + requests = [ + create_chat_request(custom_id='math-question', prompt='What is 2+2?', model='gpt-4o-mini', max_tokens=50), + create_chat_request( + custom_id='creative-writing', + prompt='Write a short poem about coding', + model='gpt-4o-mini', + max_tokens=100, + temperature=0.8, + ), + create_chat_request( + custom_id='explanation', + prompt='Explain quantum computing in one sentence', + model='gpt-4o-mini', + max_tokens=75, + ), + ] + + # Submit batch job with metadata + batch_id = await batch_model.batch_create_job( + requests=requests, metadata={'project': 'my-batch-job', 'version': '1.0'} + ) + + assert batch_id == 'batch_test_123' + mock_client.files.create.assert_called_once() + mock_client.batches.create.assert_called_once() + + # Check metadata was passed + call_args = mock_client.batches.create.call_args + assert call_args.kwargs['metadata'] == {'project': 'my-batch-job', 'version': '1.0'} + + async def test_batch_status_completed(self, mock_openai_batch_model: Any) -> None: + """Test getting batch status when completed like in check_status.py output.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock completed batch status + mock_batch = MagicMock() + mock_batch.id = 'batch_6887c4d206c88190b4e06c4f3867e8ca' + mock_batch.object = 'batch' + mock_batch.endpoint = '/v1/chat/completions' + mock_batch.errors = None + mock_batch.input_file_id = 'file-Jfyv6WaoGG1i8ukdEL7cJz' + mock_batch.completion_window = '24h' + mock_batch.status = 'completed' + mock_batch.output_file_id = 'file-AiJT2XheqDiVYc5a1FA8Y8' + mock_batch.error_file_id = None + mock_batch.created_at = 1753728210 + mock_batch.in_progress_at = 1753728210 + mock_batch.expires_at = 1753814610 + mock_batch.finalizing_at = 1753728292 + mock_batch.completed_at = 1753728293 + mock_batch.failed_at = None + mock_batch.expired_at = None + mock_batch.cancelling_at = None + mock_batch.cancelled_at = None + mock_batch.metadata = {} + + # Mock request_counts + mock_request_counts = MagicMock() + mock_request_counts.model_dump.return_value = {'completed': 3, 'failed': 0, 'total': 3} + mock_batch.request_counts = mock_request_counts + + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + # Get status + job_info = await batch_model.batch_get_status('batch_6887c4d206c88190b4e06c4f3867e8ca') + + assert isinstance(job_info, BatchJob) + assert job_info.id == 'batch_6887c4d206c88190b4e06c4f3867e8ca' + assert job_info.status == 'completed' + assert job_info.completion_window == '24h' + assert job_info.request_counts == {'completed': 3, 'failed': 0, 'total': 3} + assert job_info.created_at == 1753728210 + + async def test_retrieve_completed_results(self, mock_openai_batch_model: Any) -> None: + """Test retrieving completed batch results like in get_res.py output.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch status (completed) + mock_batch = MagicMock() + mock_batch.status = 'completed' + mock_batch.output_file_id = 'file-AiJT2XheqDiVYc5a1FA8Y8' + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + # Mock file content with real response format from get_res.py + mock_file_content = b"""{"id": "batch_req_1", "custom_id": "request-1", "response": {"body": {"choices": [{"message": {"content": "2 + 2 = 4"}}]}}, "error": null} +{"id": "batch_req_2", "custom_id": "request-2", "response": {"body": {"choices": [{"message": {"content": "In lines of code, a world takes flight,\\nFrom dawn's first thought to late at night."}}]}}, "error": null} +{"id": "batch_req_3", "custom_id": "request-3", "response": {"body": {"choices": [{"message": {"content": "Quantum computing is a type of computing that uses quantum bits, or qubits, leveraging principles like superposition and entanglement to perform complex calculations exponentially faster than classical computers for certain problems."}}]}}, "error": null}""" + + mock_file_response = MagicMock() + mock_file_response.read.return_value = mock_file_content + mock_client.files.content = AsyncMock(return_value=mock_file_response) + + # Retrieve results + results = await batch_model.batch_retrieve_job('batch_test_123') + + assert len(results) == 3 + assert all(isinstance(r, BatchResult) for r in results) + + # Check first result (math question) + assert results[0].custom_id == 'request-1' + assert results[0].output == '2 + 2 = 4' + assert results[0].error is None + + # Check second result (creative writing) + assert results[1].custom_id == 'request-2' + assert 'In lines of code, a world takes flight' in results[1].output + assert results[1].error is None + + # Check third result (explanation) + assert results[2].custom_id == 'request-3' + assert 'Quantum computing' in results[2].output + assert results[2].error is None + + +class TestBatchWithTools: + """Test batch functionality with tools like in batch_tool.py and get_tool.py.""" + + async def test_batch_with_tool_calls(self, mock_openai_batch_model: Any) -> None: + """Test batch job with tool calls like in get_tool.py output.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch creation + mock_batch = MagicMock() + mock_batch.id = 'batch_tools_123' + mock_client.batches.create = AsyncMock(return_value=mock_batch) + mock_client.files.create = AsyncMock(return_value=MagicMock(id='file_123')) + + # Mock completed batch status + mock_batch_status = MagicMock() + mock_batch_status.status = 'completed' + mock_batch_status.output_file_id = 'file_456' + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch_status) + + # Mock file content with tool calls like in get_tool.py output + mock_file_content = b"""{"id": "batch_req_1", "custom_id": "weather-tokyo", "response": {"body": {"choices": [{"message": {"tool_calls": [{"function": {"name": "get_weather", "arguments": "{\\"location\\": \\"Tokyo\\"}"}}]}}]}}, "error": null} +{"id": "batch_req_2", "custom_id": "calculation", "response": {"body": {"choices": [{"message": {"tool_calls": [{"function": {"name": "calculate", "arguments": "{\\"expression\\": \\"15 * 23 + 7\\"}"}}]}}]}}, "error": null} +{"id": "batch_req_3", "custom_id": "weather-london", "response": {"body": {"choices": [{"message": {"tool_calls": [{"function": {"name": "get_weather", "arguments": "{\\"location\\": \\"London\\"}"}}]}}]}}, "error": null}""" + + mock_file_response = MagicMock() + mock_file_response.read.return_value = mock_file_content + mock_client.files.content = AsyncMock(return_value=mock_file_response) + + # Create tools + weather_tool = Tool(get_weather) + calc_tool = Tool(calculate) + tools = [weather_tool.tool_def, calc_tool.tool_def] + + # Create requests like in batch_tool.py + requests = [ + create_chat_request( + custom_id='weather-tokyo', + prompt="What's the weather like in Tokyo?", + model='gpt-4o-mini', + tools=tools, + max_tokens=150, + ), + create_chat_request( + custom_id='calculation', + prompt='Calculate 15 * 23 + 7', + model='gpt-4o-mini', + tools=tools, + max_tokens=100, + ), + create_chat_request( + custom_id='weather-london', + prompt="What's the weather in London?", + model='gpt-4o-mini', + tools=tools, + max_tokens=150, + ), + ] + + # Submit batch + batch_id = await batch_model.batch_create_job(requests) + assert batch_id == 'batch_tools_123' + + # Retrieve results + results = await batch_model.batch_retrieve_job(batch_id) + assert len(results) == 3 + + # Check tool calls like in get_tool.py output + tokyo_result = next(r for r in results if r.custom_id == 'weather-tokyo') + assert len(tokyo_result.tool_calls) == 1 + assert tokyo_result.tool_calls[0]['function']['name'] == 'get_weather' + + calc_result = next(r for r in results if r.custom_id == 'calculation') + assert len(calc_result.tool_calls) == 1 + assert calc_result.tool_calls[0]['function']['name'] == 'calculate' + + london_result = next(r for r in results if r.custom_id == 'weather-london') + assert len(london_result.tool_calls) == 1 + assert london_result.tool_calls[0]['function']['name'] == 'get_weather' + + +class TestStructuredOutput: + """Test structured output like in structured_output.py.""" + + async def test_batch_with_structured_output(self, mock_openai_batch_model: Any) -> None: + """Test batch with structured output modes.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch creation + mock_batch = MagicMock() + mock_batch.id = 'batch_struct_123' + mock_client.batches.create = AsyncMock(return_value=mock_batch) + mock_client.files.create = AsyncMock(return_value=MagicMock(id='file_123')) + + # Create requests like in structured_output.py + requests = [ + create_chat_request( + custom_id='structured-paris', + prompt='Get weather information for Paris and format it properly', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='native', + max_tokens=200, + ), + create_chat_request( + custom_id='tool-mode-1', + prompt='Analyze the weather in Berlin', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='tool', + max_tokens=200, + ), + ] + + # Submit batch + batch_id = await batch_model.batch_create_job(requests) + assert batch_id == 'batch_struct_123' + + # Verify requests were structured correctly + create_call = mock_client.batches.create.call_args + assert create_call is not None From 20eecc14b678cf72cd60297cf5113b3a10236bb6 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 01:12:50 +0500 Subject: [PATCH 3/9] add try_import logic for ci pipelines testing --- tests/batches/test_openai_batch.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/batches/test_openai_batch.py b/tests/batches/test_openai_batch.py index 2affd4a61..a772a29e6 100644 --- a/tests/batches/test_openai_batch.py +++ b/tests/batches/test_openai_batch.py @@ -9,12 +9,17 @@ from pydantic import BaseModel from pydantic_ai import RunContext -from pydantic_ai.batches.openai import ( - BatchJob, - BatchResult, - OpenAIBatchModel, - create_chat_request, -) + +try: + from pydantic_ai.batches.openai import ( + BatchJob, + BatchResult, + OpenAIBatchModel, + create_chat_request, + ) +except ImportError: + pytest.skip('openai is not installed', allow_module_level=True) + from pydantic_ai.tools import Tool From 1a3841cd57ed06ba9c35195f31719e1a7411d841 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 03:21:48 +0500 Subject: [PATCH 4/9] check tests working --- docs/batches/openai.md | 112 +++++++++++++++--- .../pydantic_ai/batches/openai.py | 30 +++-- .../{test_openai_batch.py => test_openai.py} | 0 3 files changed, 113 insertions(+), 29 deletions(-) rename tests/batches/{test_openai_batch.py => test_openai.py} (100%) diff --git a/docs/batches/openai.md b/docs/batches/openai.md index ef2398b41..2e440b183 100644 --- a/docs/batches/openai.md +++ b/docs/batches/openai.md @@ -20,7 +20,7 @@ pip/uv-add nest-asyncio Then add this at the start of your notebook: -```python +```python {test="skip"} import nest_asyncio nest_asyncio.apply() ``` @@ -78,6 +78,8 @@ _(This example is complete, it can be run "as is" )_ ### Checking Job Status ```python +from pydantic_ai.batches.openai import OpenAIBatchModel + async def check_batch_status(batch_id: str): batch_model = OpenAIBatchModel('openai:gpt-4o-mini') @@ -99,10 +101,9 @@ _(This example is complete, it can be run "as is" — you'll need to add `asynci ```python from pydantic_ai.batches.openai import OpenAIBatchModel -import asyncio async def get_batch_results(batch_id: str): - batch_model = OpenAIBatchModel('openai:gpt-4.1-mini') + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') # Check if completed job_info = await batch_model.batch_get_status(batch_id) @@ -200,6 +201,7 @@ The batch API supports all structured output modes available in pydantic-ai: ```python from pydantic import BaseModel +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request class WeatherResult(BaseModel): location: str @@ -236,6 +238,15 @@ async def batch_with_structured_output(): ### Tool Mode ```python +from pydantic import BaseModel +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request + +class WeatherResult(BaseModel): + location: str + temperature: float + condition: str + humidity: int + async def batch_with_tool_mode(): batch_model = OpenAIBatchModel('openai:gpt-4o-mini') @@ -265,6 +276,15 @@ async def batch_with_tool_mode(): ### Prompted Mode ```python +from pydantic import BaseModel +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request + +class WeatherResult(BaseModel): + location: str + temperature: float + condition: str + humidity: int + async def batch_with_prompted_mode(): batch_model = OpenAIBatchModel('openai:gpt-4o-mini') @@ -294,6 +314,8 @@ async def batch_with_prompted_mode(): ### Getting result for tools (tool requests by LLM) ```python +from pydantic_ai.batches.openai import OpenAIBatchModel + async def get_batch_results_with_tools(batch_id: str): batch_model = OpenAIBatchModel('openai:gpt-4o-mini') @@ -367,9 +389,29 @@ The `OpenAIBatchModel` class provides several methods for managing batch jobs: ### Error Handling ```python +import asyncio +import time +from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request + async def robust_batch_processing(): batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + # Example requests + requests = [ + create_chat_request( + custom_id='example-1', + prompt='Hello world', + model='gpt-4o-mini', + max_tokens=50 + ), + create_chat_request( + custom_id='example-2', + prompt='Write a haiku', + model='gpt-4o-mini', + max_tokens=100 + ), + ] + try: # Submit batch batch_id = await batch_model.batch_create_job(requests) @@ -408,26 +450,58 @@ async def robust_batch_processing(): Batch results have a nested structure: ```python -# Correct way to extract content -content = result.output # Use the convenience property +from pydantic_ai.batches.openai import OpenAIBatchModel + +async def extract_content_example(batch_id: str): + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + results = await batch_model.batch_retrieve_job(batch_id) -# Alternative way to make it -content =result.response['body']['choices'][0]['message']['content'] # Manual access + for result in results: + # Correct way to extract content + content = result.output # Use the convenience property + + # Alternative way to access it + # content = result.response['body']['choices'][0]['message']['content'] # Manual access + + print(f"Content: {content}") ``` ### Debugging Tools ```python -# Inspect request structure -print(f"Request body: {request.body}") - -# Check tool definitions -for tool in tools: - print(f"Tool: {tool.name} - {tool.description}") - -# Monitor job progress -if job_info.request_counts: - completed = job_info.request_counts.get('completed', 0) - total = job_info.request_counts.get('total', 0) - print(f"Progress: {completed}/{total}") +from pydantic_ai.batches.openai import create_chat_request +from pydantic_ai.tools import Tool +from pydantic_ai import RunContext + +# Example debugging code +def debug_batch_job(): + # Example request creation + request = create_chat_request( + custom_id='debug-example', + prompt='Test prompt', + model='gpt-4o-mini', + max_tokens=50 + ) + + # Inspect request structure + print(f"Request body: {request.body}") + + # Example tool creation + def example_tool(ctx: RunContext[None], query: str) -> str: + return f"Example tool result for: {query}" + + tool = Tool(example_tool) + tools = [tool.tool_def] + + # Check tool definitions + for tool_def in tools: + print(f"Tool: {tool_def.name} - {tool_def.description}") + +async def monitor_job_progress(batch_model, batch_id: str): + """Example function showing how to monitor job progress.""" + job_info = await batch_model.batch_get_status(batch_id) + if job_info.request_counts: + completed = job_info.request_counts.get('completed', 0) + total = job_info.request_counts.get('total', 0) + print(f"Progress: {completed}/{total}") ``` diff --git a/pydantic_ai_slim/pydantic_ai/batches/openai.py b/pydantic_ai_slim/pydantic_ai/batches/openai.py index a00d3e668..b068f1e7a 100644 --- a/pydantic_ai_slim/pydantic_ai/batches/openai.py +++ b/pydantic_ai_slim/pydantic_ai/batches/openai.py @@ -325,21 +325,31 @@ class OpenAIBatchModel(WrapperModel): Example: ```python - import asyncio - from pydantic_ai.batches.openai import OpenAIBatchModel, BatchRequest - from pydantic_ai.messages import UserPrompt + from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request async def main(): - # Create directly from model name + # Initialize batch model batch_model = OpenAIBatchModel('openai:gpt-4o-mini') - # Use as regular model - messages = [UserPrompt("Hello")] - response = await batch_model.request(messages) - - # Or use batch functionality - requests = [BatchRequest(custom_id="1", body={"model": "gpt-4o-mini", "messages": []})] + # Create batch requests (minimum 2 required) + requests = [ + create_chat_request( + custom_id='math-question', + prompt='What is 2+2?', + model='gpt-4o-mini', + max_tokens=50 + ), + create_chat_request( + custom_id='creative-writing', + prompt='Write a short poem about coding', + model='gpt-4o-mini', + max_tokens=100 + ), + ] + + # Submit batch job batch_id = await batch_model.batch_create_job(requests) + print(f'Batch job created: {batch_id}') ``` """ diff --git a/tests/batches/test_openai_batch.py b/tests/batches/test_openai.py similarity index 100% rename from tests/batches/test_openai_batch.py rename to tests/batches/test_openai.py From 431c22aed8576ce6fde03e04e2fd889ba75424a5 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 03:44:50 +0500 Subject: [PATCH 5/9] Fix batch API docs: add ci_only test directives and remove duplicate examples --- docs/batches/openai.md | 44 +------------------ .../pydantic_ai/batches/openai.py | 29 +----------- 2 files changed, 3 insertions(+), 70 deletions(-) diff --git a/docs/batches/openai.md b/docs/batches/openai.md index 2e440b183..81ccefd59 100644 --- a/docs/batches/openai.md +++ b/docs/batches/openai.md @@ -30,7 +30,7 @@ nest_asyncio.apply() ### Creating and Submitting a Batch Job -```python +```python {test="ci_only"} import asyncio from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request @@ -129,7 +129,7 @@ _(This example is complete, it can be run "as is" — you'll need to add `asynci You can include tools in batch requests by extracting `ToolDefinition` objects from pydantic-ai `Tool` instances: -```python +```python {test="ci_only"} import asyncio from pydantic_ai import RunContext from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request @@ -465,43 +465,3 @@ async def extract_content_example(batch_id: str): print(f"Content: {content}") ``` - -### Debugging Tools - -```python -from pydantic_ai.batches.openai import create_chat_request -from pydantic_ai.tools import Tool -from pydantic_ai import RunContext - -# Example debugging code -def debug_batch_job(): - # Example request creation - request = create_chat_request( - custom_id='debug-example', - prompt='Test prompt', - model='gpt-4o-mini', - max_tokens=50 - ) - - # Inspect request structure - print(f"Request body: {request.body}") - - # Example tool creation - def example_tool(ctx: RunContext[None], query: str) -> str: - return f"Example tool result for: {query}" - - tool = Tool(example_tool) - tools = [tool.tool_def] - - # Check tool definitions - for tool_def in tools: - print(f"Tool: {tool_def.name} - {tool_def.description}") - -async def monitor_job_progress(batch_model, batch_id: str): - """Example function showing how to monitor job progress.""" - job_info = await batch_model.batch_get_status(batch_id) - if job_info.request_counts: - completed = job_info.request_counts.get('completed', 0) - total = job_info.request_counts.get('total', 0) - print(f"Progress: {completed}/{total}") -``` diff --git a/pydantic_ai_slim/pydantic_ai/batches/openai.py b/pydantic_ai_slim/pydantic_ai/batches/openai.py index b068f1e7a..d75a2a8df 100644 --- a/pydantic_ai_slim/pydantic_ai/batches/openai.py +++ b/pydantic_ai_slim/pydantic_ai/batches/openai.py @@ -323,34 +323,7 @@ class OpenAIBatchModel(WrapperModel): all the original functionality. Provides 50% cost savings compared to synchronous API calls with a 24-hour processing window. - Example: - ```python - from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request - - async def main(): - # Initialize batch model - batch_model = OpenAIBatchModel('openai:gpt-4o-mini') - - # Create batch requests (minimum 2 required) - requests = [ - create_chat_request( - custom_id='math-question', - prompt='What is 2+2?', - model='gpt-4o-mini', - max_tokens=50 - ), - create_chat_request( - custom_id='creative-writing', - prompt='Write a short poem about coding', - model='gpt-4o-mini', - max_tokens=100 - ), - ] - - # Submit batch job - batch_id = await batch_model.batch_create_job(requests) - print(f'Batch job created: {batch_id}') - ``` + See the batch processing documentation for detailed examples and usage patterns. """ def __init__(self, wrapped: Model | KnownModelName): From bcd5a78df9600d462a7033ac2f0925178ecb7bac Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 04:07:36 +0500 Subject: [PATCH 6/9] Fix batch API test failures by adding OpenAI client mocks - Add comprehensive mocks for OpenAI batch API in test_examples.py - Mock batches.create, batches.retrieve, files.create, and files.content methods - Add realistic batch result data in JSONL format - Add expected output comments to batch examples in documentation - Resolves ModelHTTPError and PytestUnraisableExceptionWarning issues - All batch-related tests now pass successfully --- docs/batches/openai.md | 6 ++++-- tests/test_examples.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/docs/batches/openai.md b/docs/batches/openai.md index 81ccefd59..16939094a 100644 --- a/docs/batches/openai.md +++ b/docs/batches/openai.md @@ -30,7 +30,7 @@ nest_asyncio.apply() ### Creating and Submitting a Batch Job -```python {test="ci_only"} +```python import asyncio from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request @@ -68,6 +68,7 @@ async def basic_batch_example(): metadata={'project': 'my-batch-job', 'version': '1.0'} ) print(f'Batch job created: {batch_id}') + #> Batch job created: batch_test_123 return batch_id @@ -129,7 +130,7 @@ _(This example is complete, it can be run "as is" — you'll need to add `asynci You can include tools in batch requests by extracting `ToolDefinition` objects from pydantic-ai `Tool` instances: -```python {test="ci_only"} +```python import asyncio from pydantic_ai import RunContext from pydantic_ai.batches.openai import OpenAIBatchModel, create_chat_request @@ -186,6 +187,7 @@ async def batch_with_tools(): # Submit batch job batch_id = await batch_model.batch_create_job(requests) print(f'Batch with tools submitted: {batch_id}') + #> Batch with tools submitted: batch_test_123 return batch_id diff --git a/tests/test_examples.py b/tests/test_examples.py index 20a7ece7b..b09d1930a 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -12,6 +12,7 @@ from io import StringIO from pathlib import Path from typing import Any +from unittest.mock import AsyncMock, MagicMock import httpx import pytest @@ -150,6 +151,38 @@ def print(self, *args: Any, **kwargs: Any) -> None: mocker.patch('pydantic_ai.mcp.MCPServerStreamableHTTP', return_value=MockMCPServer()) mocker.patch('mcp.server.fastmcp.FastMCP') + # Mock OpenAI batch API for batch examples + mock_openai_client = MagicMock() + + # Mock batch creation + mock_batch = MagicMock() + mock_batch.id = 'batch_test_123' + mock_openai_client.batches.create = AsyncMock(return_value=mock_batch) + mock_openai_client.files.create = AsyncMock(return_value=MagicMock(id='file_123')) + + # Mock batch status retrieval + mock_batch_status = MagicMock() + mock_batch_status.status = 'completed' + mock_batch_status.id = 'batch_test_123' + mock_batch_status.completion_window = '24h' + mock_batch_status.created_at = 1700000000 + mock_batch_status.request_counts = MagicMock() + mock_batch_status.request_counts.model_dump.return_value = {'completed': 3, 'failed': 0, 'total': 3} + mock_batch_status.output_file_id = 'file_456' + mock_openai_client.batches.retrieve = AsyncMock(return_value=mock_batch_status) + + # Mock file content retrieval with realistic batch results + mock_file_content = b"""{"id": "batch_req_1", "custom_id": "math-question", "response": {"body": {"choices": [{"message": {"content": "2 + 2 = 4"}}]}}, "error": null} +{"id": "batch_req_2", "custom_id": "creative-writing", "response": {"body": {"choices": [{"message": {"content": "In lines of code, a world takes flight,\\nFrom dawn's first thought to late at night."}}]}}, "error": null} +{"id": "batch_req_3", "custom_id": "explanation", "response": {"body": {"choices": [{"message": {"content": "Quantum computing uses quantum bits to perform complex calculations faster than classical computers."}}]}}, "error": null}""" + + mock_file_response = MagicMock() + mock_file_response.read.return_value = mock_file_content + mock_openai_client.files.content = AsyncMock(return_value=mock_file_response) + + # Patch the OpenAI client provider + mocker.patch('pydantic_ai.providers.openai.AsyncOpenAI', return_value=mock_openai_client) + env.set('OPENAI_API_KEY', 'testing') env.set('GEMINI_API_KEY', 'testing') env.set('GOOGLE_API_KEY', 'testing') From 7eb010ed6e2f3956ebea56ef3c458fed949e09c3 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 23:14:37 +0500 Subject: [PATCH 7/9] increasing test cov from ~65% to 97.72% using make, added specific test cases for batch processing --- tests/batches/test_openai.py | 754 +++++++++++++++++++++++++++++++++++ 1 file changed, 754 insertions(+) diff --git a/tests/batches/test_openai.py b/tests/batches/test_openai.py index a772a29e6..2411fc705 100644 --- a/tests/batches/test_openai.py +++ b/tests/batches/test_openai.py @@ -383,3 +383,757 @@ async def test_batch_with_structured_output(self, mock_openai_batch_model: Any) # Verify requests were structured correctly create_call = mock_client.batches.create.call_args assert create_call is not None + + +class TestErrorHandling: + """Test error handling scenarios.""" + + async def test_batch_create_job_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when batch_create_job API call fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + # Mock API error during file creation + mock_client.files.create = AsyncMock( + side_effect=APIStatusError( + message='File upload failed', + response=MagicMock(status_code=400), + body={'error': {'message': 'File upload failed'}}, + ) + ) + + requests = [create_chat_request(custom_id='test-req', prompt='Hello', model='gpt-4o-mini', max_tokens=50)] + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_create_job(requests) + + assert exc_info.value.status_code == 400 + + async def test_batch_create_job_batch_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when batch creation API call fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + # Mock successful file creation but failed batch creation + mock_client.files.create = AsyncMock(return_value=MagicMock(id='file_123')) + mock_client.batches.create = AsyncMock( + side_effect=APIStatusError( + message='Batch creation failed', + response=MagicMock(status_code=429), + body={'error': {'message': 'Rate limit exceeded'}}, + ) + ) + + requests = [create_chat_request(custom_id='test-req', prompt='Hello', model='gpt-4o-mini', max_tokens=50)] + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_create_job(requests) + + assert exc_info.value.status_code == 429 + + async def test_batch_get_status_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when batch_get_status API call fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + mock_client.batches.retrieve = AsyncMock( + side_effect=APIStatusError( + message='Batch not found', + response=MagicMock(status_code=404), + body={'error': {'message': 'Batch not found'}}, + ) + ) + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_get_status('nonexistent_batch') + + assert exc_info.value.status_code == 404 + + async def test_batch_retrieve_job_not_completed(self, mock_openai_batch_model: Any) -> None: + """Test error when trying to retrieve results from non-completed batch.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch status as in_progress + mock_batch = MagicMock() + mock_batch.status = 'in_progress' + mock_batch.output_file_id = None + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + with pytest.raises(ValueError, match=r'Batch .* is not completed. Status: in_progress'): + await batch_model.batch_retrieve_job('batch_123') + + async def test_batch_retrieve_job_no_output_file(self, mock_openai_batch_model: Any) -> None: + """Test error when completed batch has no output file.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch status as completed but no output file + mock_batch = MagicMock() + mock_batch.status = 'completed' + mock_batch.output_file_id = None + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + with pytest.raises(ValueError, match=r'Batch .* has no output file'): + await batch_model.batch_retrieve_job('batch_123') + + async def test_batch_retrieve_job_file_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when file content retrieval fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + # Mock successful batch status check + mock_batch = MagicMock() + mock_batch.status = 'completed' + mock_batch.output_file_id = 'file_123' + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + # Mock file content retrieval error + mock_client.files.content = AsyncMock( + side_effect=APIStatusError( + message='File not found', + response=MagicMock(status_code=404), + body={'error': {'message': 'File not found'}}, + ) + ) + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_retrieve_job('batch_123') + + assert exc_info.value.status_code == 404 + + +class TestBatchCancelAndList: + """Test batch cancel and list functionality.""" + + async def test_batch_cancel_job(self, mock_openai_batch_model: Any) -> None: + """Test cancelling a batch job.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock cancelled batch + mock_batch = MagicMock() + mock_batch.id = 'batch_cancel_123' + mock_batch.object = 'batch' + mock_batch.endpoint = '/v1/chat/completions' + mock_batch.errors = None + mock_batch.input_file_id = 'file-123' + mock_batch.completion_window = '24h' + mock_batch.status = 'cancelled' + mock_batch.output_file_id = None + mock_batch.error_file_id = None + mock_batch.created_at = 1753728210 + mock_batch.in_progress_at = None + mock_batch.expires_at = 1753814610 + mock_batch.finalizing_at = None + mock_batch.completed_at = None + mock_batch.failed_at = None + mock_batch.expired_at = None + mock_batch.cancelling_at = 1753728250 + mock_batch.cancelled_at = 1753728260 + mock_batch.request_counts = None + mock_batch.metadata = {} + + mock_client.batches.cancel = AsyncMock(return_value=mock_batch) + + result = await batch_model.batch_cancel_job('batch_cancel_123') + + assert isinstance(result, BatchJob) + assert result.id == 'batch_cancel_123' + assert result.status == 'cancelled' + assert result.cancelling_at == 1753728250 + assert result.cancelled_at == 1753728260 + mock_client.batches.cancel.assert_called_once_with('batch_cancel_123') + + async def test_batch_cancel_job_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when batch cancellation fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + mock_client.batches.cancel = AsyncMock( + side_effect=APIStatusError( + message='Cannot cancel completed batch', + response=MagicMock(status_code=400), + body={'error': {'message': 'Cannot cancel completed batch'}}, + ) + ) + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_cancel_job('batch_123') + + assert exc_info.value.status_code == 400 + + async def test_batch_list_jobs(self, mock_openai_batch_model: Any) -> None: + """Test listing batch jobs.""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch list response + mock_batch1 = MagicMock() + mock_batch1.id = 'batch_1' + mock_batch1.object = 'batch' + mock_batch1.endpoint = '/v1/chat/completions' + mock_batch1.errors = None + mock_batch1.input_file_id = 'file-1' + mock_batch1.completion_window = '24h' + mock_batch1.status = 'completed' + mock_batch1.output_file_id = 'file-out-1' + mock_batch1.error_file_id = None + mock_batch1.created_at = 1753728210 + mock_batch1.in_progress_at = 1753728210 + mock_batch1.expires_at = 1753814610 + mock_batch1.finalizing_at = 1753728292 + mock_batch1.completed_at = 1753728293 + mock_batch1.failed_at = None + mock_batch1.expired_at = None + mock_batch1.cancelling_at = None + mock_batch1.cancelled_at = None + mock_batch1.request_counts = None + mock_batch1.metadata = {} + + mock_batch2 = MagicMock() + mock_batch2.id = 'batch_2' + mock_batch2.object = 'batch' + mock_batch2.endpoint = '/v1/chat/completions' + mock_batch2.errors = None + mock_batch2.input_file_id = 'file-2' + mock_batch2.completion_window = '24h' + mock_batch2.status = 'in_progress' + mock_batch2.output_file_id = None + mock_batch2.error_file_id = None + mock_batch2.created_at = 1753728300 + mock_batch2.in_progress_at = 1753728300 + mock_batch2.expires_at = 1753814700 + mock_batch2.finalizing_at = None + mock_batch2.completed_at = None + mock_batch2.failed_at = None + mock_batch2.expired_at = None + mock_batch2.cancelling_at = None + mock_batch2.cancelled_at = None + mock_batch2.request_counts = None + mock_batch2.metadata = {} + + mock_list_response = MagicMock() + mock_list_response.data = [mock_batch1, mock_batch2] + mock_client.batches.list = AsyncMock(return_value=mock_list_response) + + results = await batch_model.batch_list_jobs(limit=10) + + assert len(results) == 2 + assert all(isinstance(job, BatchJob) for job in results) + assert results[0].id == 'batch_1' + assert results[0].status == 'completed' + assert results[1].id == 'batch_2' + assert results[1].status == 'in_progress' + mock_client.batches.list.assert_called_once_with(limit=10) + + async def test_batch_list_jobs_api_error(self, mock_openai_batch_model: Any) -> None: + """Test error handling when batch list fails.""" + from pydantic_ai import ModelHTTPError + from pydantic_ai.models.openai import APIStatusError + + batch_model, mock_client = mock_openai_batch_model + + mock_client.batches.list = AsyncMock( + side_effect=APIStatusError( + message='Unauthorized', + response=MagicMock(status_code=403), + body={'error': {'message': 'Unauthorized'}}, + ) + ) + + with pytest.raises(ModelHTTPError) as exc_info: + await batch_model.batch_list_jobs() + + assert exc_info.value.status_code == 403 + + +class TestBatchResultMethods: + """Test BatchResult methods.""" + + def test_batch_result_get_tool_call_arguments_success(self): + """Test successful tool call argument parsing.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={ + 'body': { + 'choices': [ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'test_function', + 'arguments': '{"location": "Tokyo", "units": "celsius"}', + } + } + ] + } + } + ] + } + }, + error=None, + ) + + args = result.get_tool_call_arguments() + assert args == {'location': 'Tokyo', 'units': 'celsius'} + + def test_batch_result_get_tool_call_arguments_specific_index(self): + """Test tool call argument parsing with specific index.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={ + 'body': { + 'choices': [ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'first_function', + 'arguments': '{"arg1": "value1"}', + } + }, + { + 'function': { + 'name': 'second_function', + 'arguments': '{"arg2": "value2"}', + } + }, + ] + } + } + ] + } + }, + error=None, + ) + + # Test first tool call (index 0) + args0 = result.get_tool_call_arguments(0) + assert args0 == {'arg1': 'value1'} + + # Test second tool call (index 1) + args1 = result.get_tool_call_arguments(1) + assert args1 == {'arg2': 'value2'} + + def test_batch_result_get_tool_call_arguments_no_response(self): + """Test tool call argument parsing when no response.""" + result = BatchResult(id='test_id', custom_id='test_custom', response=None, error=None) + + args = result.get_tool_call_arguments() + assert args is None + + def test_batch_result_get_tool_call_arguments_no_tool_calls(self): + """Test tool call argument parsing when no tool calls.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': [{'message': {'content': 'Hello'}}]}}, + error=None, + ) + + args = result.get_tool_call_arguments() + assert args is None + + def test_batch_result_get_tool_call_arguments_index_out_of_range(self): + """Test tool call argument parsing with out of range index.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={ + 'body': { + 'choices': [ + {'message': {'tool_calls': [{'function': {'name': 'test', 'arguments': '{"arg": "value"}'}}]}} + ] + } + }, + error=None, + ) + + args = result.get_tool_call_arguments(5) # Index 5 doesn't exist + assert args is None + + def test_batch_result_get_tool_call_arguments_invalid_json(self): + """Test tool call argument parsing with invalid JSON.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={ + 'body': { + 'choices': [ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'test_function', + 'arguments': 'invalid json {', + } + } + ] + } + } + ] + } + }, + error=None, + ) + + args = result.get_tool_call_arguments() + assert args is None + + def test_batch_result_get_tool_call_arguments_malformed_response(self): + """Test tool call argument parsing with malformed response structure.""" + result = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': [{'message': {}}]}}, # Missing tool_calls + error=None, + ) + + args = result.get_tool_call_arguments() + assert args is None + + +class TestMessageBuilding: + """Test message building functionality through create_chat_request.""" + + def test_create_chat_request_with_string_prompt(self): + """Test create_chat_request with simple string prompt.""" + request = create_chat_request(custom_id='string-test', prompt='Hello world', model='gpt-4o-mini', max_tokens=50) + + assert request.custom_id == 'string-test' + assert request.body['messages'] == [{'role': 'user', 'content': 'Hello world'}] + + def test_create_chat_request_with_system_prompt(self): + """Test create_chat_request with system prompt.""" + request = create_chat_request( + custom_id='system-test', + prompt='Hello world', + model='gpt-4o-mini', + system_prompt='You are a helpful assistant', + max_tokens=50, + ) + + expected_messages = [ + {'role': 'system', 'content': 'You are a helpful assistant'}, + {'role': 'user', 'content': 'Hello world'}, + ] + assert request.body['messages'] == expected_messages + + def test_create_chat_request_with_user_prompt_part(self): + """Test create_chat_request with UserPromptPart.""" + from pydantic_ai.messages import UserPromptPart + + user_part = UserPromptPart(content='Hello from UserPromptPart') + request = create_chat_request(custom_id='user-part-test', prompt=user_part, model='gpt-4o-mini', max_tokens=50) + + assert request.custom_id == 'user-part-test' + # The function should handle UserPromptPart conversion + assert len(request.body['messages']) == 1 + assert request.body['messages'][0]['role'] == 'user' + + def test_create_chat_request_with_user_prompt_part_list(self): + """Test create_chat_request with list of UserPromptPart.""" + from pydantic_ai.messages import UserPromptPart + + user_parts = [ + UserPromptPart(content='First part'), + UserPromptPart(content='Second part'), + UserPromptPart(content='Third part'), + ] + request = create_chat_request( + custom_id='user-parts-list-test', prompt=user_parts, model='gpt-4o-mini', max_tokens=50 + ) + + assert request.custom_id == 'user-parts-list-test' + # The function should handle list of UserPromptParts + assert len(request.body['messages']) == 1 + assert request.body['messages'][0]['role'] == 'user' + + def test_create_chat_request_with_complex_user_content(self): + """Test create_chat_request with UserPromptPart containing complex content.""" + from pydantic_ai.messages import BinaryContent, UserPromptPart + + # Create a UserPromptPart with mixed content + binary_content = BinaryContent(data=b'fake image data', media_type='image/jpeg') + user_part = UserPromptPart(content=['Text content', binary_content]) + + request = create_chat_request( + custom_id='complex-content-test', prompt=user_part, model='gpt-4o-mini', max_tokens=50 + ) + + assert request.custom_id == 'complex-content-test' + # The function should handle complex UserPromptPart conversion + assert len(request.body['messages']) == 1 + assert request.body['messages'][0]['role'] == 'user' + + +class TestPromptedOutputMode: + """Test prompted output mode functionality.""" + + def test_structured_output_prompted_mode(self): + """Test structured output with prompted mode.""" + request = create_chat_request( + custom_id='prompted-test', + prompt='Get weather for Berlin', + model='gpt-4o-mini', + output_type=WeatherResult, + output_mode='prompted', + max_tokens=200, + ) + + assert request.custom_id == 'prompted-test' + assert 'response_format' not in request.body # No response_format for prompted mode + assert 'tools' not in request.body # No tools for prompted mode + + # Check that system prompt was enhanced with schema instructions + messages = request.body['messages'] + system_message = next((msg for msg in messages if msg['role'] == 'system'), None) + assert system_message is not None + assert 'JSON' in system_message['content'] + assert 'schema' in system_message['content'] + + def test_structured_output_prompted_mode_with_existing_system_prompt(self): + """Test prompted mode with existing system prompt.""" + request = create_chat_request( + custom_id='prompted-with-system', + prompt='Get weather for Berlin', + model='gpt-4o-mini', + system_prompt='You are a weather expert.', + output_type=WeatherResult, + output_mode='prompted', + max_tokens=200, + ) + + messages = request.body['messages'] + system_message = next((msg for msg in messages if msg['role'] == 'system'), None) + assert system_message is not None + + # Should contain both original system prompt and schema instructions + content = system_message['content'] + assert 'You are a weather expert.' in content + assert 'JSON' in content + assert 'schema' in content + + +class TestModelInitialization: + """Test OpenAIBatchModel initialization.""" + + def test_openai_batch_model_with_non_openai_model(self, monkeypatch: pytest.MonkeyPatch): + """Test error when initializing with non-OpenAI model.""" + monkeypatch.setattr('pydantic_ai.models.ALLOW_MODEL_REQUESTS', True) + + # Mock infer_model in wrapper module to return a non-OpenAI model + from pydantic_ai.models.anthropic import AnthropicModel + + mock_anthropic_model = MagicMock(spec=AnthropicModel) + monkeypatch.setattr('pydantic_ai.models.wrapper.infer_model', MagicMock(return_value=mock_anthropic_model)) + + # This should raise ValueError since it's not an OpenAI model + with pytest.raises(ValueError, match=r'OpenAIBatchModel requires an OpenAI model'): + from typing import cast + + from pydantic_ai.models import KnownModelName + + OpenAIBatchModel(cast(KnownModelName, 'any-model')) + + def test_openai_batch_model_with_openai_model_string(self, monkeypatch: pytest.MonkeyPatch): + """Test successful initialization with OpenAI model string.""" + monkeypatch.setattr('pydantic_ai.models.ALLOW_MODEL_REQUESTS', True) + + with patch('pydantic_ai.providers.openai.AsyncOpenAI') as mock_openai: + mock_client = MagicMock() + mock_openai.return_value = mock_client + + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + # The model_name property returns just the model part, not the provider prefix + assert batch_model.model_name == 'gpt-4o-mini' + + def test_openai_batch_model_client_property(self, monkeypatch: pytest.MonkeyPatch): + """Test that client property returns AsyncOpenAI client.""" + monkeypatch.setattr('pydantic_ai.models.ALLOW_MODEL_REQUESTS', True) + + with patch('pydantic_ai.providers.openai.AsyncOpenAI') as mock_openai: + mock_client = MagicMock() + mock_client.batches = MagicMock() + mock_client.files = MagicMock() + mock_openai.return_value = mock_client + + batch_model = OpenAIBatchModel('openai:gpt-4o-mini') + client = batch_model.client + # The client should be an AsyncOpenAI instance + assert hasattr(client, 'batches') + assert hasattr(client, 'files') + + +class TestAdditionalErrorScenarios: + """Test additional error scenarios and edge cases.""" + + def test_batch_result_output_property_edge_cases(self): + """Test BatchResult.output property with various edge cases.""" + # Test with missing choices + result_no_choices = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {}}, + error=None, + ) + assert result_no_choices.output is None + + # Test with empty choices + result_empty_choices = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': []}}, + error=None, + ) + assert result_empty_choices.output is None + + # Test with missing message + result_no_message = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': [{}]}}, + error=None, + ) + assert result_no_message.output is None + + def test_batch_result_tool_calls_property_edge_cases(self): + """Test BatchResult.tool_calls property with various edge cases.""" + # Test with missing choices + result_no_choices = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {}}, + error=None, + ) + assert result_no_choices.tool_calls == [] + + # Test with empty choices + result_empty_choices = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': []}}, + error=None, + ) + assert result_empty_choices.tool_calls == [] + + # Test with missing message + result_no_message = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': [{}]}}, + error=None, + ) + assert result_no_message.tool_calls == [] + + # Test with message but no tool_calls + result_no_tool_calls = BatchResult( + id='test_id', + custom_id='test_custom', + response={'body': {'choices': [{'message': {'content': 'Hello'}}]}}, + error=None, + ) + assert result_no_tool_calls.tool_calls == [] + + def test_create_chat_request_default_output_mode(self): + """Test that default output mode is 'tool' when output_type is provided.""" + request = create_chat_request( + custom_id='default-mode-test', + prompt='Get weather data', + model='gpt-4o-mini', + output_type=WeatherResult, # No output_mode specified + ) + + # Should default to 'tool' mode + assert 'tools' in request.body + assert 'tool_choice' in request.body + + def test_native_output_with_description_and_strict(self): + """Test native output mode with description and strict=True.""" + from unittest.mock import MagicMock + + from pydantic import BaseModel + + from pydantic_ai._output import StructuredTextOutputSchema + + # Mock the StructuredTextOutputSchema to have description and strict=True + mock_object_def = MagicMock() + mock_object_def.name = 'TestResult' + mock_object_def.description = 'A test result with description' + mock_object_def.strict = True + mock_object_def.json_schema = {'type': 'object', 'properties': {'value': {'type': 'string'}}} + + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock(spec=StructuredTextOutputSchema) + mock_schema.mode = 'native' + mock_schema.object_def = mock_object_def + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='strict-test', + prompt='Get strict result', + model='gpt-4o-mini', + output_type=BaseModel, # Using BaseModel directly + output_mode='native', + ) + + # Check that response_format is set with description and strict + assert 'response_format' in request.body + response_format = request.body['response_format'] + assert response_format['type'] == 'json_schema' + json_schema = response_format['json_schema'] + + # This should trigger the description and strict lines (75, 77) + assert json_schema['name'] == 'TestResult' + assert json_schema['description'] == 'A test result with description' + assert json_schema['strict'] is True + assert 'schema' in json_schema + + def test_tool_output_with_existing_tools(self): + """Test tool output mode when body already has tools.""" + from pydantic_ai.tools import ToolDefinition + + # Create a mock tool definition + existing_tool = ToolDefinition( + name='existing_tool', + description='An existing tool', + parameters_json_schema={'type': 'object', 'properties': {}}, + ) + + request = create_chat_request( + custom_id='tools-test', + prompt='Use tools', + model='gpt-4o-mini', + tools=[existing_tool], # Add existing tools first + output_type=WeatherResult, + output_mode='tool', + ) + + # Should have both existing tools and output tools + assert 'tools' in request.body + tools = request.body['tools'] + assert len(tools) >= 2 # At least existing + output tool + + def test_batch_result_output_none_response(self): + """Test BatchResult.output when response is None.""" + result = BatchResult( + id='test-id', + custom_id='test-custom', + response=None, # This should trigger line 275 + error=None, + ) + + # Should return None when response is None + assert result.output is None From 46a986bbc10112b8817c193300ef521227815211 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Wed, 30 Jul 2025 23:56:30 +0500 Subject: [PATCH 8/9] test cov to 100%, for batches and tests for batches all cases shoud be covered now --- tests/batches/test_openai.py | 228 +++++++++++++++++++++++++++++++++-- 1 file changed, 217 insertions(+), 11 deletions(-) diff --git a/tests/batches/test_openai.py b/tests/batches/test_openai.py index 2411fc705..71c67ea09 100644 --- a/tests/batches/test_openai.py +++ b/tests/batches/test_openai.py @@ -9,17 +9,12 @@ from pydantic import BaseModel from pydantic_ai import RunContext - -try: - from pydantic_ai.batches.openai import ( - BatchJob, - BatchResult, - OpenAIBatchModel, - create_chat_request, - ) -except ImportError: - pytest.skip('openai is not installed', allow_module_level=True) - +from pydantic_ai.batches.openai import ( + BatchJob, + BatchResult, + OpenAIBatchModel, + create_chat_request, +) from pydantic_ai.tools import Tool @@ -1137,3 +1132,214 @@ def test_batch_result_output_none_response(self): # Should return None when response is None assert result.output is None + + +class TestHelperFunctions: + """Test helper functions to achieve 100% coverage.""" + + def test_get_weather_function(self) -> None: + """Test get_weather function directly.""" + from unittest.mock import Mock + + ctx = Mock() + result = get_weather(ctx, 'Paris') + assert result == 'Weather in Paris: 22°C, sunny, 60% humidity' + + result_fahrenheit = get_weather(ctx, 'New York', 'fahrenheit') + assert result_fahrenheit == 'Weather in New York: 22°F, sunny, 60% humidity' + + def test_calculate_function(self) -> None: + """Test calculate function directly.""" + from unittest.mock import Mock + + ctx = Mock() + + # Test successful calculation + result = calculate(ctx, '2 + 3') + assert result == 'Result: 5' + + # Test error case + result_error = calculate(ctx, 'invalid_expression') + assert result_error.startswith('Error:') + + +class TestBranchCoverage: + """Test specific branches to achieve 100% coverage.""" + + def test_file_content_with_empty_lines(self, mock_openai_batch_model: Any): + """Test batch_retrieve_job with empty lines in file content (line 486->485).""" + batch_model, mock_client = mock_openai_batch_model + + # Mock batch status (completed) + mock_batch = MagicMock() + mock_batch.status = 'completed' + mock_batch.output_file_id = 'file_456' + mock_client.batches.retrieve = AsyncMock(return_value=mock_batch) + + # Mock file content with empty lines and whitespace + mock_file_content = b"""{"id": "batch_req_1", "custom_id": "test-1", "response": {"body": {"choices": [{"message": {"content": "Hello"}}]}}, "error": null} + +{"id": "batch_req_2", "custom_id": "test-2", "response": {"body": {"choices": [{"message": {"content": "World"}}]}}, "error": null} + +""" # Empty lines and whitespace + + mock_file_response = MagicMock() + mock_file_response.read.return_value = mock_file_content + mock_client.files.content = AsyncMock(return_value=mock_file_response) + + async def run_test(): + results = await batch_model.batch_retrieve_job('batch_test_123') + # Should only have 2 results, empty lines should be skipped + assert len(results) == 2 + assert results[0].custom_id == 'test-1' + assert results[1].custom_id == 'test-2' + + import asyncio + + asyncio.run(run_test()) + + def test_create_request_with_non_structured_output_type(self): + """Test native output with non-StructuredTextOutputSchema.""" + from pydantic import BaseModel + + class SimpleModel(BaseModel): + value: str + + # Mock the OutputSchema.build to return a schema that's not StructuredTextOutputSchema + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock() + mock_schema.mode = 'native' + # This will make isinstance(output_schema, StructuredTextOutputSchema) return False + mock_schema.__class__.__name__ = 'NotStructuredTextOutputSchema' + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='non-structured-test', + prompt='Test request', + model='gpt-4o-mini', + output_type=SimpleModel, + output_mode='native', + ) + + # Should not have response_format since it's not StructuredTextOutputSchema + assert 'response_format' not in request.body + + def test_create_request_with_tool_mode_multiple_output_tools(self): + """Test tool mode with multiple output tools (no tool_choice).""" + from pydantic import BaseModel + + class MultiToolModel(BaseModel): + result1: str + result2: str + + # Mock the OutputSchema.build to return a schema with multiple tools + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock() + mock_schema.mode = 'tool' + + # Mock toolset with multiple tools + mock_tool_def1 = MagicMock() + mock_tool_def1.name = 'tool1' + mock_tool_def1.description = 'First tool' + mock_tool_def1.parameters_json_schema = {'type': 'object'} + + mock_tool_def2 = MagicMock() + mock_tool_def2.name = 'tool2' + mock_tool_def2.description = 'Second tool' + mock_tool_def2.parameters_json_schema = {'type': 'object'} + + mock_toolset = MagicMock() + mock_toolset._tool_defs = [mock_tool_def1, mock_tool_def2] + mock_schema.toolset = mock_toolset + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='multi-tool-test', + prompt='Test request', + model='gpt-4o-mini', + output_type=MultiToolModel, + output_mode='tool', + ) + + # Should have tools but no tool_choice since there are multiple tools + assert 'tools' in request.body + assert len(request.body['tools']) == 2 + assert 'tool_choice' not in request.body + + def test_create_request_with_tool_mode_no_toolset(self): + """Test tool mode with no toolset.""" + from pydantic import BaseModel + + class NoToolModel(BaseModel): + result: str + + # Mock the OutputSchema.build to return a schema with no toolset + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock() + mock_schema.mode = 'tool' + mock_schema.toolset = None # No toolset + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='no-toolset-test', + prompt='Test request', + model='gpt-4o-mini', + output_type=NoToolModel, + output_mode='tool', + ) + + # Should not have tools since there's no toolset + assert 'tools' not in request.body + + def test_create_request_with_prompted_mode_non_prompted_schema(self): + """Test prompted mode with non-PromptedOutputSchema.""" + from pydantic import BaseModel + + class NonPromptedModel(BaseModel): + result: str + + # Mock the OutputSchema.build to return a schema that's not PromptedOutputSchema + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock() + mock_schema.mode = 'prompted' + # This will make isinstance(output_schema, PromptedOutputSchema) return False + mock_schema.__class__.__name__ = 'NotPromptedOutputSchema' + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='non-prompted-test', + prompt='Test request', + model='gpt-4o-mini', + output_type=NonPromptedModel, + output_mode='prompted', + ) + + # Messages should remain unchanged since it's not PromptedOutputSchema + assert request.body['messages'] == [{'role': 'user', 'content': 'Test request'}] + + def test_create_request_with_unknown_output_mode(self): + """Test create_chat_request with an output mode that's not handled.""" + from pydantic import BaseModel + + class TestModel(BaseModel): + result: str + + # Mock the OutputSchema.build to return a schema with an unknown mode + with patch('pydantic_ai._output.OutputSchema.build') as mock_build: + mock_schema = MagicMock() + mock_schema.mode = 'unknown_mode' # This will not match any of native/tool/prompted + mock_build.return_value = mock_schema + + request = create_chat_request( + custom_id='unknown-mode-test', + prompt='Test request', + model='gpt-4o-mini', + output_type=TestModel, + output_mode='unknown_mode', # type: ignore[arg-type] + ) + + # Should still return a valid request, just without output processing + assert request.custom_id == 'unknown-mode-test' + # Should not have special formatting since mode is unknown + assert 'response_format' not in request.body + assert 'tool_choice' not in request.body From 08ec55926c57de9f938f7bc1d32b090d189ad553 Mon Sep 17 00:00:00 2001 From: bogdan01m Date: Thu, 31 Jul 2025 00:10:16 +0500 Subject: [PATCH 9/9] add __init__.py for batch testing, try import for package logic --- tests/batches/__init__.py | 0 tests/batches/test_openai.py | 21 +++++++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) create mode 100644 tests/batches/__init__.py diff --git a/tests/batches/__init__.py b/tests/batches/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/batches/test_openai.py b/tests/batches/test_openai.py index 71c67ea09..daec66ac4 100644 --- a/tests/batches/test_openai.py +++ b/tests/batches/test_openai.py @@ -9,14 +9,23 @@ from pydantic import BaseModel from pydantic_ai import RunContext -from pydantic_ai.batches.openai import ( - BatchJob, - BatchResult, - OpenAIBatchModel, - create_chat_request, -) from pydantic_ai.tools import Tool +from ..conftest import try_import + +with try_import() as imports_successful: + from pydantic_ai.batches.openai import ( + BatchJob, + BatchResult, + OpenAIBatchModel, + create_chat_request, + ) + +pytestmark = [ + pytest.mark.skipif(not imports_successful(), reason='openai not installed'), + pytest.mark.anyio, +] + class WeatherResult(BaseModel): location: str