Skip to content

Commit fa60a7a

Browse files
authored
Migrate retry handler in task SDK API client to use tenacity instead of retryhttp (apache#56762)
1 parent b2f8b9f commit fa60a7a

File tree

3 files changed

+195
-208
lines changed

3 files changed

+195
-208
lines changed

task-sdk/pyproject.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,7 @@ dependencies = [
6161
"python-dateutil>=2.7.0",
6262
"psutil>=6.1.0",
6363
"structlog>=25.4.0",
64-
"retryhttp>=1.2.0,!=1.3.0",
6564
"greenback>=1.2.1",
66-
# Requests is known to introduce breaking changes, so we pin it to a specific range
67-
"requests>=2.31.0,<3",
68-
"types-requests>=2.31.0",
6965
"tenacity>=8.3.0",
7066
# Start of shared timezones dependencies
7167
"pendulum>=3.1.0",

task-sdk/src/airflow/sdk/api/client.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@
3030
import msgspec
3131
import structlog
3232
from pydantic import BaseModel
33-
from retryhttp import retry, wait_retry_after
34-
from tenacity import before_log, wait_random_exponential
33+
from tenacity import (
34+
before_log,
35+
retry,
36+
retry_if_exception,
37+
stop_after_attempt,
38+
wait_random_exponential,
39+
)
3540
from uuid6 import uuid7
3641

3742
from airflow.configuration import conf
@@ -832,6 +837,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
832837
API_TIMEOUT = conf.getfloat("workers", "execution_api_timeout")
833838

834839

840+
def _should_retry_api_request(exception: BaseException) -> bool:
841+
"""Determine if an API request should be retried based on the exception type."""
842+
if isinstance(exception, httpx.HTTPStatusError):
843+
return exception.response.status_code >= 500
844+
845+
return isinstance(exception, httpx.RequestError)
846+
847+
835848
class Client(httpx.Client):
836849
def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any):
837850
if (not base_url) ^ dry_run:
@@ -864,21 +877,17 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, *
864877
**kwargs,
865878
)
866879

867-
_default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)
868-
869880
def _update_auth(self, response: httpx.Response):
870881
if new_token := response.headers.get("Refreshed-API-Token"):
871882
log.debug("Execution API issued us a refreshed Task token")
872883
self.auth = BearerAuth(new_token)
873884

874885
@retry(
875-
reraise=True,
876-
max_attempt_number=API_RETRIES,
877-
wait_server_errors=_default_wait,
878-
wait_network_errors=_default_wait,
879-
wait_timeouts=_default_wait,
880-
wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429
886+
retry=retry_if_exception(_should_retry_api_request),
887+
stop=stop_after_attempt(API_RETRIES),
888+
wait=wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX),
881889
before_sleep=before_log(log, logging.WARNING),
890+
reraise=True,
882891
)
883892
def request(self, *args, **kwargs):
884893
"""Implement a convenience for httpx.Client.request with a retry layer."""

0 commit comments

Comments
 (0)