-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Closed
Labels
Description
Initial Checks
- I confirm that I'm using the latest version of Pydantic AI
- I confirm that I searched for my issue in https://github.com/pydantic/pydantic-ai/issues before opening this issue
Description
When sending a couple of requests concurrently where some fail and are being retried, AsyncTenacityTransport starts throwing RuntimeException("The retry controller did not make any attempts").
Note that the reproducer might need a couple of runs to show the issue.
Example Code
#!/usr/bin/env uv run --script
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "pydantic-ai==0.7.5",
# "httpx==0.28.1",
# "tenacity",
# ]
# ///
import asyncio
import random
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
import httpx
from pydantic_ai.retries import AsyncTenacityTransport, wait_retry_after
from tenacity import AsyncRetrying, stop_after_attempt, retry_if_exception_type, wait_exponential
class AlternatingHTTPHandler(BaseHTTPRequestHandler):
call_count = 0
def do_GET(self):
AlternatingHTTPHandler.call_count += 1
if AlternatingHTTPHandler.call_count % 2 == 1:
self.send_response(429)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Too Many Requests')
else:
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Success')
def start_test_server(port: int = 8429) -> HTTPServer:
server = HTTPServer(('localhost', port), AlternatingHTTPHandler)
def run_server():
server.serve_forever()
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
time.sleep(0.1)
return server
def create_retrying_client() -> httpx.AsyncClient:
def validate_response(response: httpx.Response) -> None:
if response.status_code in (429, 502, 503, 504):
raise httpx.HTTPStatusError(f"HTTP Error {response.status_code}", request=httpx.Request("GET", "/dummy"), response=response)
transport = AsyncTenacityTransport(
controller=AsyncRetrying(
retry=retry_if_exception_type((httpx.HTTPStatusError, ConnectionError)),
wait=wait_retry_after(
fallback_strategy=wait_exponential(multiplier=1, max=60),
max_wait=300
),
stop=stop_after_attempt(3),
reraise=True
),
validate_response=validate_response,
wrapped=httpx.AsyncHTTPTransport(
limits=httpx.Limits(max_connections=50,max_keepalive_connections=5,keepalive_expiry=5)
)
)
return httpx.AsyncClient(transport=transport)
async def make_request(client: httpx.AsyncClient, url: str, request_id: int):
print(f"Starting request {request_id}")
try:
response = await client.get(url)
print(f"Request {request_id} completed with status: {response.status_code}")
return response
except Exception as e:
print(f"Request {request_id} failed: {e}")
raise
async def main():
server = start_test_server(8429)
test_url = "http://localhost:8429/test"
client = create_retrying_client()
tasks = [
make_request(client, test_url, i)
for i in range(1, 20)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
print(f"All requests completed. Results: {len(responses)} responses")
await client.aclose()
if __name__ == "__main__":
asyncio.run(main())Python, Pydantic AI & LLM client version
Python 3.13, pydantic-ai 0.7.5