diff --git a/llama-index-integrations/embeddings/llama-index-embeddings-openai/llama_index/embeddings/openai/utils.py b/llama-index-integrations/embeddings/llama-index-embeddings-openai/llama_index/embeddings/openai/utils.py index 14fa8fbc0da..7535254fada 100644 --- a/llama-index-integrations/embeddings/llama-index-embeddings-openai/llama_index/embeddings/openai/utils.py +++ b/llama-index-integrations/embeddings/llama-index-embeddings-openai/llama_index/embeddings/openai/utils.py @@ -4,6 +4,7 @@ from llama_index.core.base.llms.generic_utils import get_from_param_or_env from tenacity import ( + RetryCallState, before_sleep_log, retry, retry_if_exception_type, @@ -13,6 +14,7 @@ wait_random_exponential, ) from tenacity.stop import stop_base +from tenacity.wait import wait_base import openai from openai.types.chat import ChatCompletionMessageToolCall @@ -32,6 +34,56 @@ OpenAIToolCall = Union[ChatCompletionMessageToolCall, ChoiceDeltaToolCall] +# Maximum wait time (seconds) to accept from a Retry-After header. +# Prevents a misbehaving server from stalling the client indefinitely. +_MAX_RETRY_AFTER_SECONDS = 120.0 + + +class _WaitRetryAfter(wait_base): + """ + Wait strategy that respects the Retry-After header on RateLimitError. + + When the last exception is an ``openai.RateLimitError`` whose HTTP response + contains a ``Retry-After`` header, the wait time is taken from that header + (capped at ``_MAX_RETRY_AFTER_SECONDS``). For all other exceptions the + ``fallback`` strategy decides the sleep duration. + """ + + def __init__(self, fallback: wait_base) -> None: + self.fallback = fallback + + def __call__(self, retry_state: RetryCallState) -> float: + exc = retry_state.outcome.exception() if retry_state.outcome else None + if isinstance(exc, openai.RateLimitError): + retry_after = _parse_retry_after(exc) + if retry_after is not None: + return min(retry_after, _MAX_RETRY_AFTER_SECONDS) + return self.fallback(retry_state) + + +def _parse_retry_after(exc: openai.RateLimitError) -> Optional[float]: + """ + Extract the Retry-After value (in seconds) from a RateLimitError. + + Returns ``None`` when the header is missing or cannot be parsed. + """ + response = getattr(exc, "response", None) + if response is None: + return None + headers = getattr(response, "headers", None) + if headers is None: + return None + raw = headers.get("retry-after") # httpx.Headers is case-insensitive + if raw is None: + return None + try: + value = float(raw) + except (ValueError, TypeError): + return None + if value < 0: + return None + return value + def create_retry_decorator( max_retries: int, @@ -40,11 +92,12 @@ def create_retry_decorator( min_seconds: float = 4, max_seconds: float = 10, ) -> Callable[[Any], Any]: - wait_strategy = ( + fallback = ( wait_random_exponential(min=min_seconds, max=max_seconds) if random_exponential else wait_exponential(multiplier=1, min=min_seconds, max=max_seconds) ) + wait_strategy = _WaitRetryAfter(fallback) stop_strategy: stop_base = stop_after_attempt(max_retries) if stop_after_delay_seconds is not None: diff --git a/llama-index-integrations/embeddings/llama-index-embeddings-openai/pyproject.toml b/llama-index-integrations/embeddings/llama-index-embeddings-openai/pyproject.toml index 3626cfcca57..fccabbf3c1d 100644 --- a/llama-index-integrations/embeddings/llama-index-embeddings-openai/pyproject.toml +++ b/llama-index-integrations/embeddings/llama-index-embeddings-openai/pyproject.toml @@ -26,7 +26,7 @@ dev = [ [project] name = "llama-index-embeddings-openai" -version = "0.5.1" +version = "0.5.2" description = "llama-index embeddings openai integration" authors = [{name = "Your Name", email = "you@example.com"}] requires-python = ">=3.9,<4.0" diff --git a/llama-index-integrations/embeddings/llama-index-embeddings-openai/tests/test_retry_after.py b/llama-index-integrations/embeddings/llama-index-embeddings-openai/tests/test_retry_after.py new file mode 100644 index 00000000000..37f3e21d8e4 --- /dev/null +++ b/llama-index-integrations/embeddings/llama-index-embeddings-openai/tests/test_retry_after.py @@ -0,0 +1,198 @@ +from unittest.mock import MagicMock, patch + +import httpx +import openai +import pytest +from tenacity import Future, RetryCallState, wait_exponential + +from llama_index.embeddings.openai.utils import ( + _MAX_RETRY_AFTER_SECONDS, + _WaitRetryAfter, + _parse_retry_after, + create_retry_decorator, +) + + +def _make_rate_limit_error(headers=None): + """Build an openai.RateLimitError with the given response headers.""" + response = httpx.Response( + status_code=429, + headers=headers or {}, + request=httpx.Request("POST", "https://api.openai.com/v1/embeddings"), + ) + return openai.RateLimitError( + message="Rate limit exceeded", + response=response, + body=None, + ) + + +def _make_retry_state(exc): + """Build a RetryCallState whose outcome holds the given exception.""" + rs = RetryCallState( + retry_object=MagicMock(), + fn=MagicMock(), + args=(), + kwargs={}, + ) + fut = Future(attempt_number=1) + fut.set_exception(exc) + rs.outcome = fut + rs.attempt_number = 1 + return rs + + +# -- _parse_retry_after unit tests -- + + +def test_parse_retry_after_integer(): + exc = _make_rate_limit_error(headers={"Retry-After": "30"}) + assert _parse_retry_after(exc) == 30.0 + + +def test_parse_retry_after_float(): + exc = _make_rate_limit_error(headers={"Retry-After": "1.5"}) + assert _parse_retry_after(exc) == 1.5 + + +def test_parse_retry_after_zero(): + exc = _make_rate_limit_error(headers={"Retry-After": "0"}) + assert _parse_retry_after(exc) == 0.0 + + +def test_parse_retry_after_missing_header(): + exc = _make_rate_limit_error(headers={}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_non_numeric(): + exc = _make_rate_limit_error( + headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"} + ) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_negative(): + exc = _make_rate_limit_error(headers={"Retry-After": "-5"}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_empty_string(): + exc = _make_rate_limit_error(headers={"Retry-After": ""}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_no_response(): + exc = openai.RateLimitError.__new__(openai.RateLimitError) + # Manually create without response attribute + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_case_insensitive(): + """httpx.Headers is case-insensitive, so 'RETRY-AFTER' should work.""" + exc = _make_rate_limit_error(headers={"RETRY-AFTER": "42"}) + assert _parse_retry_after(exc) == 42.0 + + +# -- _WaitRetryAfter unit tests -- + + +def test_wait_retry_after_uses_header(): + fallback = wait_exponential(multiplier=1, min=4, max=10) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "15"}) + rs = _make_retry_state(exc) + assert strategy(rs) == 15.0 + + +def test_wait_retry_after_caps_at_maximum(): + fallback = wait_exponential(multiplier=1, min=4, max=10) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "9999"}) + rs = _make_retry_state(exc) + assert strategy(rs) == _MAX_RETRY_AFTER_SECONDS + + +def test_wait_retry_after_falls_back_when_no_header(): + fallback = MagicMock(return_value=5.0) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={}) + rs = _make_retry_state(exc) + assert strategy(rs) == 5.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_for_non_rate_limit_error(): + fallback = MagicMock(return_value=7.0) + strategy = _WaitRetryAfter(fallback) + + exc = openai.APITimeoutError( + request=httpx.Request("POST", "https://api.openai.com") + ) + rs = _make_retry_state(exc) + assert strategy(rs) == 7.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_when_header_unparseable(): + fallback = MagicMock(return_value=6.0) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "not-a-number"}) + rs = _make_retry_state(exc) + assert strategy(rs) == 6.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_when_outcome_is_none(): + fallback = MagicMock(return_value=4.0) + strategy = _WaitRetryAfter(fallback) + + rs = RetryCallState( + retry_object=MagicMock(), + fn=MagicMock(), + args=(), + kwargs={}, + ) + rs.outcome = None + assert strategy(rs) == 4.0 + fallback.assert_called_once_with(rs) + + +# -- create_retry_decorator integration tests -- + + +def test_create_retry_decorator_respects_retry_after(): + """Verify the full decorator stack uses Retry-After when available.""" + call_count = 0 + + @create_retry_decorator(max_retries=3) + def flaky_function(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise _make_rate_limit_error(headers={"Retry-After": "0"}) + return "ok" + + with patch("llama_index.embeddings.openai.utils.logger"): + result = flaky_function() + + assert result == "ok" + assert call_count == 3 + + +def test_create_retry_decorator_exhausts_retries(): + """Verify retries stop at max_retries even with Retry-After.""" + + @create_retry_decorator(max_retries=2) + def always_fails(): + raise _make_rate_limit_error(headers={"Retry-After": "0"}) + + with ( + patch("llama_index.embeddings.openai.utils.logger"), + pytest.raises(openai.RateLimitError), + ): + always_fails() diff --git a/llama-index-integrations/llms/llama-index-llms-openai/llama_index/llms/openai/utils.py b/llama-index-integrations/llms/llama-index-llms-openai/llama_index/llms/openai/utils.py index d2f7e03da7f..3e906c30f6e 100644 --- a/llama-index-integrations/llms/llama-index-llms-openai/llama_index/llms/openai/utils.py +++ b/llama-index-integrations/llms/llama-index-llms-openai/llama_index/llms/openai/utils.py @@ -10,6 +10,7 @@ from openai.types.chat.chat_completion_token_logprob import ChatCompletionTokenLogprob from openai.types.completion_choice import Logprobs from tenacity import ( + RetryCallState, before_sleep_log, retry, retry_if_exception_type, @@ -19,6 +20,7 @@ wait_random_exponential, ) from tenacity.stop import stop_base +from tenacity.wait import wait_base from llama_index.core.base.llms.generic_utils import get_from_param_or_env from llama_index.core.base.llms.types import ( @@ -251,6 +253,56 @@ def is_json_schema_supported(model: str) -> bool: OpenAIToolCall = Union[ChatCompletionMessageToolCall, ChoiceDeltaToolCall] +# Maximum wait time (seconds) to accept from a Retry-After header. +# Prevents a misbehaving server from stalling the client indefinitely. +_MAX_RETRY_AFTER_SECONDS = 120.0 + + +class _WaitRetryAfter(wait_base): + """ + Wait strategy that respects the Retry-After header on RateLimitError. + + When the last exception is an ``openai.RateLimitError`` whose HTTP response + contains a ``Retry-After`` header, the wait time is taken from that header + (capped at ``_MAX_RETRY_AFTER_SECONDS``). For all other exceptions the + ``fallback`` strategy decides the sleep duration. + """ + + def __init__(self, fallback: wait_base) -> None: + self.fallback = fallback + + def __call__(self, retry_state: RetryCallState) -> float: + exc = retry_state.outcome.exception() if retry_state.outcome else None + if isinstance(exc, openai.RateLimitError): + retry_after = _parse_retry_after(exc) + if retry_after is not None: + return min(retry_after, _MAX_RETRY_AFTER_SECONDS) + return self.fallback(retry_state) + + +def _parse_retry_after(exc: openai.RateLimitError) -> Optional[float]: + """ + Extract the Retry-After value (in seconds) from a RateLimitError. + + Returns ``None`` when the header is missing or cannot be parsed. + """ + response = getattr(exc, "response", None) + if response is None: + return None + headers = getattr(response, "headers", None) + if headers is None: + return None + raw = headers.get("retry-after") # httpx.Headers is case-insensitive + if raw is None: + return None + try: + value = float(raw) + except (ValueError, TypeError): + return None + if value < 0: + return None + return value + def create_retry_decorator( max_retries: int, @@ -259,11 +311,12 @@ def create_retry_decorator( min_seconds: float = 4, max_seconds: float = 60, ) -> Callable[[Any], Any]: - wait_strategy = ( + fallback = ( wait_random_exponential(min=min_seconds, max=max_seconds) if random_exponential else wait_exponential(multiplier=1, min=min_seconds, max=max_seconds) ) + wait_strategy = _WaitRetryAfter(fallback) stop_strategy: stop_base = stop_after_attempt(max_retries) if stop_after_delay_seconds is not None: diff --git a/llama-index-integrations/llms/llama-index-llms-openai/pyproject.toml b/llama-index-integrations/llms/llama-index-llms-openai/pyproject.toml index ead7581c0fc..394a52dce54 100644 --- a/llama-index-integrations/llms/llama-index-llms-openai/pyproject.toml +++ b/llama-index-integrations/llms/llama-index-llms-openai/pyproject.toml @@ -27,7 +27,7 @@ dev = [ [project] name = "llama-index-llms-openai" -version = "0.6.24" +version = "0.6.25" description = "llama-index llms openai integration" authors = [{name = "llama-index"}] requires-python = ">=3.9,<4.0" diff --git a/llama-index-integrations/llms/llama-index-llms-openai/tests/test_retry_after.py b/llama-index-integrations/llms/llama-index-llms-openai/tests/test_retry_after.py new file mode 100644 index 00000000000..6ede74ef7b3 --- /dev/null +++ b/llama-index-integrations/llms/llama-index-llms-openai/tests/test_retry_after.py @@ -0,0 +1,218 @@ +from unittest.mock import MagicMock, patch + +import httpx +import openai +import pytest +from tenacity import Future, RetryCallState, wait_exponential + +from llama_index.llms.openai.utils import ( + _MAX_RETRY_AFTER_SECONDS, + _WaitRetryAfter, + _parse_retry_after, + create_retry_decorator, +) + + +def _make_rate_limit_error(headers=None): + """Build an openai.RateLimitError with the given response headers.""" + response = httpx.Response( + status_code=429, + headers=headers or {}, + request=httpx.Request("POST", "https://api.openai.com/v1/chat/completions"), + ) + return openai.RateLimitError( + message="Rate limit exceeded", + response=response, + body=None, + ) + + +def _make_retry_state(exc): + """Build a RetryCallState whose outcome holds the given exception.""" + rs = RetryCallState( + retry_object=MagicMock(), + fn=MagicMock(), + args=(), + kwargs={}, + ) + fut = Future(attempt_number=1) + fut.set_exception(exc) + rs.outcome = fut + rs.attempt_number = 1 + return rs + + +# -- _parse_retry_after unit tests -- + + +def test_parse_retry_after_integer(): + exc = _make_rate_limit_error(headers={"Retry-After": "30"}) + assert _parse_retry_after(exc) == 30.0 + + +def test_parse_retry_after_float(): + exc = _make_rate_limit_error(headers={"Retry-After": "1.5"}) + assert _parse_retry_after(exc) == 1.5 + + +def test_parse_retry_after_zero(): + exc = _make_rate_limit_error(headers={"Retry-After": "0"}) + assert _parse_retry_after(exc) == 0.0 + + +def test_parse_retry_after_missing_header(): + exc = _make_rate_limit_error(headers={}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_non_numeric(): + exc = _make_rate_limit_error( + headers={"Retry-After": "Wed, 21 Oct 2025 07:28:00 GMT"} + ) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_negative(): + exc = _make_rate_limit_error(headers={"Retry-After": "-5"}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_empty_string(): + exc = _make_rate_limit_error(headers={"Retry-After": ""}) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_no_response(): + exc = openai.RateLimitError.__new__(openai.RateLimitError) + assert _parse_retry_after(exc) is None + + +def test_parse_retry_after_case_insensitive(): + """httpx.Headers is case-insensitive, so 'RETRY-AFTER' should work.""" + exc = _make_rate_limit_error(headers={"RETRY-AFTER": "42"}) + assert _parse_retry_after(exc) == 42.0 + + +# -- _WaitRetryAfter unit tests -- + + +def test_wait_retry_after_uses_header(): + fallback = wait_exponential(multiplier=1, min=4, max=60) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "15"}) + rs = _make_retry_state(exc) + assert strategy(rs) == 15.0 + + +def test_wait_retry_after_caps_at_maximum(): + fallback = wait_exponential(multiplier=1, min=4, max=60) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "9999"}) + rs = _make_retry_state(exc) + assert strategy(rs) == _MAX_RETRY_AFTER_SECONDS + + +def test_wait_retry_after_falls_back_when_no_header(): + fallback = MagicMock(return_value=5.0) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={}) + rs = _make_retry_state(exc) + assert strategy(rs) == 5.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_for_non_rate_limit_error(): + fallback = MagicMock(return_value=7.0) + strategy = _WaitRetryAfter(fallback) + + exc = openai.APITimeoutError( + request=httpx.Request("POST", "https://api.openai.com") + ) + rs = _make_retry_state(exc) + assert strategy(rs) == 7.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_when_header_unparseable(): + fallback = MagicMock(return_value=6.0) + strategy = _WaitRetryAfter(fallback) + + exc = _make_rate_limit_error(headers={"Retry-After": "not-a-number"}) + rs = _make_retry_state(exc) + assert strategy(rs) == 6.0 + fallback.assert_called_once_with(rs) + + +def test_wait_retry_after_falls_back_when_outcome_is_none(): + fallback = MagicMock(return_value=4.0) + strategy = _WaitRetryAfter(fallback) + + rs = RetryCallState( + retry_object=MagicMock(), + fn=MagicMock(), + args=(), + kwargs={}, + ) + rs.outcome = None + assert strategy(rs) == 4.0 + fallback.assert_called_once_with(rs) + + +# -- create_retry_decorator integration tests -- + + +def test_create_retry_decorator_respects_retry_after(): + """Verify the full decorator stack uses Retry-After when available.""" + call_count = 0 + + @create_retry_decorator(max_retries=3) + def flaky_function(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise _make_rate_limit_error(headers={"Retry-After": "0"}) + return "ok" + + with patch("llama_index.llms.openai.utils.logger"): + result = flaky_function() + + assert result == "ok" + assert call_count == 3 + + +def test_create_retry_decorator_exhausts_retries(): + """Verify retries stop at max_retries even with Retry-After.""" + + @create_retry_decorator(max_retries=2) + def always_fails(): + raise _make_rate_limit_error(headers={"Retry-After": "0"}) + + with ( + patch("llama_index.llms.openai.utils.logger"), + pytest.raises(openai.RateLimitError), + ): + always_fails() + + +def test_create_retry_decorator_non_rate_limit_still_retries(): + """Non-RateLimitError exceptions still retry with exponential backoff.""" + call_count = 0 + + @create_retry_decorator(max_retries=3) + def timeout_then_succeed(): + nonlocal call_count + call_count += 1 + if call_count < 2: + raise openai.APITimeoutError( + request=httpx.Request("POST", "https://api.openai.com") + ) + return "ok" + + with patch("llama_index.llms.openai.utils.logger"): + result = timeout_then_succeed() + + assert result == "ok" + assert call_count == 2