Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from llama_index.core.base.llms.generic_utils import get_from_param_or_env
from tenacity import (
RetryCallState,
before_sleep_log,
retry,
retry_if_exception_type,
Expand All @@ -13,6 +14,7 @@
wait_random_exponential,
)
from tenacity.stop import stop_base
from tenacity.wait import wait_base

import openai
from openai.types.chat import ChatCompletionMessageToolCall
Expand All @@ -32,6 +34,56 @@

OpenAIToolCall = Union[ChatCompletionMessageToolCall, ChoiceDeltaToolCall]

# Maximum wait time (seconds) to accept from a Retry-After header.
# Prevents a misbehaving server from stalling the client indefinitely.
_MAX_RETRY_AFTER_SECONDS = 120.0


class _WaitRetryAfter(wait_base):
"""
Wait strategy that respects the Retry-After header on RateLimitError.

When the last exception is an ``openai.RateLimitError`` whose HTTP response
contains a ``Retry-After`` header, the wait time is taken from that header
(capped at ``_MAX_RETRY_AFTER_SECONDS``). For all other exceptions the
``fallback`` strategy decides the sleep duration.
"""

def __init__(self, fallback: wait_base) -> None:
self.fallback = fallback

def __call__(self, retry_state: RetryCallState) -> float:
exc = retry_state.outcome.exception() if retry_state.outcome else None
if isinstance(exc, openai.RateLimitError):
retry_after = _parse_retry_after(exc)
if retry_after is not None:
return min(retry_after, _MAX_RETRY_AFTER_SECONDS)
return self.fallback(retry_state)


def _parse_retry_after(exc: openai.RateLimitError) -> Optional[float]:
"""
Extract the Retry-After value (in seconds) from a RateLimitError.

Returns ``None`` when the header is missing or cannot be parsed.
"""
response = getattr(exc, "response", None)
if response is None:
return None
headers = getattr(response, "headers", None)
if headers is None:
return None
raw = headers.get("retry-after") # httpx.Headers is case-insensitive
if raw is None:
return None
try:
value = float(raw)
except (ValueError, TypeError):
return None
if value < 0:
return None
return value


def create_retry_decorator(
max_retries: int,
Expand All @@ -40,11 +92,12 @@ def create_retry_decorator(
min_seconds: float = 4,
max_seconds: float = 10,
) -> Callable[[Any], Any]:
wait_strategy = (
fallback = (
wait_random_exponential(min=min_seconds, max=max_seconds)
if random_exponential
else wait_exponential(multiplier=1, min=min_seconds, max=max_seconds)
)
wait_strategy = _WaitRetryAfter(fallback)

stop_strategy: stop_base = stop_after_attempt(max_retries)
if stop_after_delay_seconds is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dev = [

[project]
name = "llama-index-embeddings-openai"
version = "0.5.1"
version = "0.5.2"
description = "llama-index embeddings openai integration"
authors = [{name = "Your Name", email = "you@example.com"}]
requires-python = ">=3.9,<4.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from unittest.mock import MagicMock, patch

import httpx
import openai
import pytest
from tenacity import Future, RetryCallState, wait_exponential

from llama_index.embeddings.openai.utils import (
_MAX_RETRY_AFTER_SECONDS,
_WaitRetryAfter,
_parse_retry_after,
create_retry_decorator,
)


def _make_rate_limit_error(headers=None):
"""Build an openai.RateLimitError with the given response headers."""
response = httpx.Response(
status_code=429,
headers=headers or {},
request=httpx.Request("POST", "https://api.openai.com/v1/embeddings"),
)
return openai.RateLimitError(
message="Rate limit exceeded",
response=response,
body=None,
)


def _make_retry_state(exc):
"""Build a RetryCallState whose outcome holds the given exception."""
rs = RetryCallState(
retry_object=MagicMock(),
fn=MagicMock(),
args=(),
kwargs={},
)
fut = Future(attempt_number=1)
fut.set_exception(exc)
rs.outcome = fut
rs.attempt_number = 1
return rs


# -- _parse_retry_after unit tests --


def test_parse_retry_after_integer():
exc = _make_rate_limit_error(headers={"Retry-After": "30"})
assert _parse_retry_after(exc) == 30.0


def test_parse_retry_after_float():
exc = _make_rate_limit_error(headers={"Retry-After": "1.5"})
assert _parse_retry_after(exc) == 1.5


def test_parse_retry_after_zero():
exc = _make_rate_limit_error(headers={"Retry-After": "0"})
assert _parse_retry_after(exc) == 0.0


def test_parse_retry_after_missing_header():
exc = _make_rate_limit_error(headers={})
assert _parse_retry_after(exc) is None


def test_parse_retry_after_non_numeric():
exc = _make_rate_limit_error(
headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"}
)
assert _parse_retry_after(exc) is None


def test_parse_retry_after_negative():
exc = _make_rate_limit_error(headers={"Retry-After": "-5"})
assert _parse_retry_after(exc) is None


def test_parse_retry_after_empty_string():
exc = _make_rate_limit_error(headers={"Retry-After": ""})
assert _parse_retry_after(exc) is None


def test_parse_retry_after_no_response():
exc = openai.RateLimitError.__new__(openai.RateLimitError)
# Manually create without response attribute
assert _parse_retry_after(exc) is None


def test_parse_retry_after_case_insensitive():
"""httpx.Headers is case-insensitive, so 'RETRY-AFTER' should work."""
exc = _make_rate_limit_error(headers={"RETRY-AFTER": "42"})
assert _parse_retry_after(exc) == 42.0


# -- _WaitRetryAfter unit tests --


def test_wait_retry_after_uses_header():
fallback = wait_exponential(multiplier=1, min=4, max=10)
strategy = _WaitRetryAfter(fallback)

exc = _make_rate_limit_error(headers={"Retry-After": "15"})
rs = _make_retry_state(exc)
assert strategy(rs) == 15.0


def test_wait_retry_after_caps_at_maximum():
fallback = wait_exponential(multiplier=1, min=4, max=10)
strategy = _WaitRetryAfter(fallback)

exc = _make_rate_limit_error(headers={"Retry-After": "9999"})
rs = _make_retry_state(exc)
assert strategy(rs) == _MAX_RETRY_AFTER_SECONDS


def test_wait_retry_after_falls_back_when_no_header():
fallback = MagicMock(return_value=5.0)
strategy = _WaitRetryAfter(fallback)

exc = _make_rate_limit_error(headers={})
rs = _make_retry_state(exc)
assert strategy(rs) == 5.0
fallback.assert_called_once_with(rs)


def test_wait_retry_after_falls_back_for_non_rate_limit_error():
fallback = MagicMock(return_value=7.0)
strategy = _WaitRetryAfter(fallback)

exc = openai.APITimeoutError(
request=httpx.Request("POST", "https://api.openai.com")
)
rs = _make_retry_state(exc)
assert strategy(rs) == 7.0
fallback.assert_called_once_with(rs)


def test_wait_retry_after_falls_back_when_header_unparseable():
fallback = MagicMock(return_value=6.0)
strategy = _WaitRetryAfter(fallback)

exc = _make_rate_limit_error(headers={"Retry-After": "not-a-number"})
rs = _make_retry_state(exc)
assert strategy(rs) == 6.0
fallback.assert_called_once_with(rs)


def test_wait_retry_after_falls_back_when_outcome_is_none():
fallback = MagicMock(return_value=4.0)
strategy = _WaitRetryAfter(fallback)

rs = RetryCallState(
retry_object=MagicMock(),
fn=MagicMock(),
args=(),
kwargs={},
)
rs.outcome = None
assert strategy(rs) == 4.0
fallback.assert_called_once_with(rs)


# -- create_retry_decorator integration tests --


def test_create_retry_decorator_respects_retry_after():
"""Verify the full decorator stack uses Retry-After when available."""
call_count = 0

@create_retry_decorator(max_retries=3)
def flaky_function():
nonlocal call_count
call_count += 1
if call_count < 3:
raise _make_rate_limit_error(headers={"Retry-After": "0"})
return "ok"

with patch("llama_index.embeddings.openai.utils.logger"):
result = flaky_function()

assert result == "ok"
assert call_count == 3


def test_create_retry_decorator_exhausts_retries():
"""Verify retries stop at max_retries even with Retry-After."""

@create_retry_decorator(max_retries=2)
def always_fails():
raise _make_rate_limit_error(headers={"Retry-After": "0"})

with (
patch("llama_index.embeddings.openai.utils.logger"),
pytest.raises(openai.RateLimitError),
):
always_fails()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from openai.types.chat.chat_completion_token_logprob import ChatCompletionTokenLogprob
from openai.types.completion_choice import Logprobs
from tenacity import (
RetryCallState,
before_sleep_log,
retry,
retry_if_exception_type,
Expand All @@ -19,6 +20,7 @@
wait_random_exponential,
)
from tenacity.stop import stop_base
from tenacity.wait import wait_base

from llama_index.core.base.llms.generic_utils import get_from_param_or_env
from llama_index.core.base.llms.types import (
Expand Down Expand Up @@ -251,6 +253,56 @@ def is_json_schema_supported(model: str) -> bool:

OpenAIToolCall = Union[ChatCompletionMessageToolCall, ChoiceDeltaToolCall]

# Maximum wait time (seconds) to accept from a Retry-After header.
# Prevents a misbehaving server from stalling the client indefinitely.
_MAX_RETRY_AFTER_SECONDS = 120.0


class _WaitRetryAfter(wait_base):
"""
Wait strategy that respects the Retry-After header on RateLimitError.

When the last exception is an ``openai.RateLimitError`` whose HTTP response
contains a ``Retry-After`` header, the wait time is taken from that header
(capped at ``_MAX_RETRY_AFTER_SECONDS``). For all other exceptions the
``fallback`` strategy decides the sleep duration.
"""

def __init__(self, fallback: wait_base) -> None:
self.fallback = fallback

def __call__(self, retry_state: RetryCallState) -> float:
exc = retry_state.outcome.exception() if retry_state.outcome else None
if isinstance(exc, openai.RateLimitError):
retry_after = _parse_retry_after(exc)
if retry_after is not None:
return min(retry_after, _MAX_RETRY_AFTER_SECONDS)
return self.fallback(retry_state)


def _parse_retry_after(exc: openai.RateLimitError) -> Optional[float]:
"""
Extract the Retry-After value (in seconds) from a RateLimitError.

Returns ``None`` when the header is missing or cannot be parsed.
"""
response = getattr(exc, "response", None)
if response is None:
return None
headers = getattr(response, "headers", None)
if headers is None:
return None
raw = headers.get("retry-after") # httpx.Headers is case-insensitive
if raw is None:
return None
try:
value = float(raw)
except (ValueError, TypeError):
return None
if value < 0:
return None
return value


def create_retry_decorator(
max_retries: int,
Expand All @@ -259,11 +311,12 @@ def create_retry_decorator(
min_seconds: float = 4,
max_seconds: float = 60,
) -> Callable[[Any], Any]:
wait_strategy = (
fallback = (
wait_random_exponential(min=min_seconds, max=max_seconds)
if random_exponential
else wait_exponential(multiplier=1, min=min_seconds, max=max_seconds)
)
wait_strategy = _WaitRetryAfter(fallback)

stop_strategy: stop_base = stop_after_attempt(max_retries)
if stop_after_delay_seconds is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dev = [

[project]
name = "llama-index-llms-openai"
version = "0.6.23"
version = "0.6.24"
description = "llama-index llms openai integration"
authors = [{name = "llama-index"}]
requires-python = ">=3.9,<4.0"
Expand Down
Loading
Loading