From 7c430fe1324d4e352dc6887810ff6595a6317034 Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Mon, 6 Oct 2025 13:26:59 +0000 Subject: [PATCH 1/5] Add poll function for LRO which follows linear backoff with jitter. --- databricks/sdk/retries.py | 103 ++++++++++++++++++++++++++++++++++++- tests/test_retries.py | 104 +++++++++++++++++++++++++++++++++++++- 2 files changed, 204 insertions(+), 3 deletions(-) diff --git a/databricks/sdk/retries.py b/databricks/sdk/retries.py index 5528a8978..0fc04aa30 100644 --- a/databricks/sdk/retries.py +++ b/databricks/sdk/retries.py @@ -1,13 +1,15 @@ import functools import logging from datetime import timedelta -from random import random -from typing import Callable, Optional, Sequence, Type +from random import random, uniform +from typing import Callable, Optional, Sequence, Type, TypeVar from .clock import Clock, RealClock logger = logging.getLogger(__name__) +T = TypeVar("T") + def retried( *, @@ -67,3 +69,100 @@ def wrapper(*args, **kwargs): return wrapper return decorator + + +class RetryError(Exception): + """Error that can be returned from poll functions to control retry behavior.""" + + def __init__(self, err: Exception, halt: bool = False): + self.err = err + self.halt = halt + super().__init__(str(err)) + + @staticmethod + def continues(msg: str) -> "RetryError": + """Create a non-halting retry error with a message.""" + return RetryError(Exception(msg), halt=False) + + @staticmethod + def halt(err: Exception) -> "RetryError": + """Create a halting retry error.""" + return RetryError(err, halt=True) + + +def _backoff(attempt: int) -> float: + """Calculate backoff time with jitter. + + Linear backoff: attempt * 1 second, capped at 10 seconds + Plus random jitter between 50ms and 750ms. + """ + wait = min(10, attempt) + jitter = uniform(0.05, 0.75) + return wait + jitter + + +# This function is not meant to be used directly by users. +# It is used internally by the SDK to poll for the result of an operation. +# It can be changed in the future without any notice. +def poll( + fn: Callable[[], tuple[Optional[T], Optional[RetryError]]], + timeout: timedelta = timedelta(minutes=20), + clock: Optional[Clock] = None, +) -> T: + """Poll a function until it succeeds or times out. + + The backoff is linear backoff and jitter. + + :param fn: Function that returns (result, error). + Return (None, RetryError.continues("msg")) to continue polling. + Return (None, RetryError.halt(err)) to stop with error. + Return (result, None) on success. + :param timeout: Maximum time to poll (default: 20 minutes) + :param clock: Clock implementation for testing (default: RealClock) + :returns: The result of the successful function call + :raises TimeoutError: If the timeout is reached + :raises Exception: If a halting error is encountered + + Example: + def check_operation(): + op = get_operation() + if not op.done: + return None, RetryError.continues("operation still in progress") + if op.error: + return None, RetryError.halt(Exception(f"operation failed: {op.error}")) + return op.result, None + + result = poll(check_operation, timeout=timedelta(minutes=5)) + """ + if clock is None: + clock = RealClock() + + deadline = clock.time() + timeout.total_seconds() + attempt = 0 + last_err = None + + while clock.time() < deadline: + attempt += 1 + + try: + result, err = fn() + + if err is None: + return result + + if err.halt: + raise err.err + + # Continue polling. + last_err = err.err + wait = _backoff(attempt) + logger.debug(f"{str(err.err).rstrip('.')}. Sleeping {wait:.3f}s") + clock.sleep(wait) + + except RetryError: + raise + except Exception as e: + # Unexpected error, halt immediately. + raise e + + raise TimeoutError(f"Timed out after {timeout}") from last_err diff --git a/tests/test_retries.py b/tests/test_retries.py index 2ad6e4ef6..1ef85224b 100644 --- a/tests/test_retries.py +++ b/tests/test_retries.py @@ -1,9 +1,10 @@ from datetime import timedelta +from typing import Any, Literal, Optional, Type import pytest from databricks.sdk.errors import NotFound, ResourceDoesNotExist -from databricks.sdk.retries import retried +from databricks.sdk.retries import poll, retried, RetryError from tests.clock import FakeClock @@ -73,3 +74,104 @@ def foo(): raise KeyError(1) foo() + + +@pytest.mark.parametrize( + "scenario,attempts,result_value,exception_type,exception_msg,timeout,min_time,max_time", + [ + pytest.param("success", 1, "immediate", None, None, 60, 0.0, 0.0, + id="returns string immediately on first attempt with no sleep"), + pytest.param("success", 2, 42, None, None, 60, 1.05, 1.75, + id="returns integer after 1 retry with ~1s backoff"), + pytest.param("success", 3, {"key": "val"}, None, None, 60, 3.10, 3.90, + id="returns dict after 2 retries with linear backoff (1s+2s)"), + pytest.param("success", 5, [1, 2], None, None, 60, 10.25, 11.75, + id="returns list after 4 retries with linear backoff (1s+2s+3s+4s)"), + pytest.param("success", 1, None, None, None, 60, 0.0, 0.0, + id="returns None as valid result immediately (None is acceptable)"), + pytest.param("success", 5, "ok", None, None, 200, 10.2, 13.0, + id="verifies linear backoff increase over 4 retries"), + pytest.param("success", 11, "ok", None, None, 200, 55.5, 62.5, + id="verifies linear backoff approaching 10s cap over 10 retries"), + pytest.param("success", 15, "ok", None, None, 200, 95.7, 105.5, + id="verifies backoff is capped at 10s after 10th retry"), + pytest.param("timeout", None, None, TimeoutError, "Timed out after", 1, 1, None, + id="raises TimeoutError after 1 second of continuous retries"), + pytest.param("timeout", None, None, TimeoutError, "Timed out after", 5, 5, None, + id="raises TimeoutError after 5 seconds of continuous retries"), + pytest.param("timeout", None, None, TimeoutError, "Timed out after", 15, 15, None, + id="raises TimeoutError after 15 seconds of continuous retries"), + pytest.param("halt", 1, None, ValueError, "halt error", 60, None, None, + id="raises ValueError immediately when halt error on first attempt"), + pytest.param("halt", 2, None, ValueError, "halt error", 60, None, None, + id="raises ValueError after 1 retry when halt error on second attempt"), + pytest.param("halt", 3, None, ValueError, "halt error", 60, None, None, + id="raises ValueError after 2 retries when halt error on third attempt"), + pytest.param("unexpected", 1, None, RuntimeError, "unexpected", 60, None, None, + id="raises RuntimeError immediately on unexpected exception"), + pytest.param("unexpected", 3, None, RuntimeError, "unexpected", 60, None, None, + id="raises RuntimeError after 2 retries on unexpected exception"), + ], +) +def test_poll_behavior( + scenario: Literal["success", "timeout", "halt", "unexpected"], + attempts: Optional[int], + result_value: Any, + exception_type: Optional[Type[Exception]], + exception_msg: Optional[str], + timeout: int, + min_time: Optional[float], + max_time: Optional[float], +) -> None: + """ + Comprehensive test for poll function covering all scenarios: + - Success cases with various return types and retry counts + - Backoff timing behavior (linear increase, 10s cap) + - Timeout behavior + - Halting errors + - Unexpected exceptions + """ + clock: FakeClock = FakeClock() + call_count: int = 0 + + def fn() -> tuple[Any, Optional[RetryError]]: + nonlocal call_count + call_count += 1 + + if scenario == "success": + if call_count < attempts: + return None, RetryError.continues(f"attempt {call_count}") + return result_value, None + + elif scenario == "timeout": + return None, RetryError.continues("retrying") + + elif scenario == "halt": + if call_count < attempts: + return None, RetryError.continues("retrying") + return None, RetryError.halt(ValueError(exception_msg)) + + elif scenario == "unexpected": + if call_count < attempts: + return None, RetryError.continues("retrying") + raise RuntimeError(exception_msg) + + if scenario == "success": + result: Any = poll(fn, timeout=timedelta(seconds=timeout), clock=clock) + assert result == result_value + assert call_count == attempts + if min_time is not None: + assert clock.time() >= min_time + if max_time is not None: + assert clock.time() <= max_time + else: + with pytest.raises(exception_type) as exc_info: + poll(fn, timeout=timedelta(seconds=timeout), clock=clock) + + assert exception_msg in str(exc_info.value) + assert call_count >= 1 + + if scenario == "timeout": + assert clock.time() >= min_time - 1 + elif scenario in ("halt", "unexpected"): + assert call_count == attempts From 4cd016025125c842aa81c22edf9ebb6098a4e121 Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Mon, 6 Oct 2025 13:30:59 +0000 Subject: [PATCH 2/5] update --- tests/test_retries.py | 198 +++++++++++++++++++++++++++++++++--------- 1 file changed, 158 insertions(+), 40 deletions(-) diff --git a/tests/test_retries.py b/tests/test_retries.py index 1ef85224b..bfaa0099b 100644 --- a/tests/test_retries.py +++ b/tests/test_retries.py @@ -4,7 +4,7 @@ import pytest from databricks.sdk.errors import NotFound, ResourceDoesNotExist -from databricks.sdk.retries import poll, retried, RetryError +from databricks.sdk.retries import RetryError, poll, retried from tests.clock import FakeClock @@ -79,38 +79,156 @@ def foo(): @pytest.mark.parametrize( "scenario,attempts,result_value,exception_type,exception_msg,timeout,min_time,max_time", [ - pytest.param("success", 1, "immediate", None, None, 60, 0.0, 0.0, - id="returns string immediately on first attempt with no sleep"), - pytest.param("success", 2, 42, None, None, 60, 1.05, 1.75, - id="returns integer after 1 retry with ~1s backoff"), - pytest.param("success", 3, {"key": "val"}, None, None, 60, 3.10, 3.90, - id="returns dict after 2 retries with linear backoff (1s+2s)"), - pytest.param("success", 5, [1, 2], None, None, 60, 10.25, 11.75, - id="returns list after 4 retries with linear backoff (1s+2s+3s+4s)"), - pytest.param("success", 1, None, None, None, 60, 0.0, 0.0, - id="returns None as valid result immediately (None is acceptable)"), - pytest.param("success", 5, "ok", None, None, 200, 10.2, 13.0, - id="verifies linear backoff increase over 4 retries"), - pytest.param("success", 11, "ok", None, None, 200, 55.5, 62.5, - id="verifies linear backoff approaching 10s cap over 10 retries"), - pytest.param("success", 15, "ok", None, None, 200, 95.7, 105.5, - id="verifies backoff is capped at 10s after 10th retry"), - pytest.param("timeout", None, None, TimeoutError, "Timed out after", 1, 1, None, - id="raises TimeoutError after 1 second of continuous retries"), - pytest.param("timeout", None, None, TimeoutError, "Timed out after", 5, 5, None, - id="raises TimeoutError after 5 seconds of continuous retries"), - pytest.param("timeout", None, None, TimeoutError, "Timed out after", 15, 15, None, - id="raises TimeoutError after 15 seconds of continuous retries"), - pytest.param("halt", 1, None, ValueError, "halt error", 60, None, None, - id="raises ValueError immediately when halt error on first attempt"), - pytest.param("halt", 2, None, ValueError, "halt error", 60, None, None, - id="raises ValueError after 1 retry when halt error on second attempt"), - pytest.param("halt", 3, None, ValueError, "halt error", 60, None, None, - id="raises ValueError after 2 retries when halt error on third attempt"), - pytest.param("unexpected", 1, None, RuntimeError, "unexpected", 60, None, None, - id="raises RuntimeError immediately on unexpected exception"), - pytest.param("unexpected", 3, None, RuntimeError, "unexpected", 60, None, None, - id="raises RuntimeError after 2 retries on unexpected exception"), + pytest.param( + "success", + 1, + "immediate", + None, + None, + 60, + 0.0, + 0.0, + id="returns string immediately on first attempt with no sleep", + ), + pytest.param("success", 2, 42, None, None, 60, 1.05, 1.75, id="returns integer after 1 retry with ~1s backoff"), + pytest.param( + "success", + 3, + {"key": "val"}, + None, + None, + 60, + 3.10, + 3.90, + id="returns dict after 2 retries with linear backoff (1s+2s)", + ), + pytest.param( + "success", + 5, + [1, 2], + None, + None, + 60, + 10.25, + 11.75, + id="returns list after 4 retries with linear backoff (1s+2s+3s+4s)", + ), + pytest.param( + "success", + 1, + None, + None, + None, + 60, + 0.0, + 0.0, + id="returns None as valid result immediately (None is acceptable)", + ), + pytest.param( + "success", 5, "ok", None, None, 200, 10.2, 13.0, id="verifies linear backoff increase over 4 retries" + ), + pytest.param( + "success", + 11, + "ok", + None, + None, + 200, + 55.5, + 62.5, + id="verifies linear backoff approaching 10s cap over 10 retries", + ), + pytest.param( + "success", 15, "ok", None, None, 200, 95.7, 105.5, id="verifies backoff is capped at 10s after 10th retry" + ), + pytest.param( + "timeout", + None, + None, + TimeoutError, + "Timed out after", + 1, + 1, + None, + id="raises TimeoutError after 1 second of continuous retries", + ), + pytest.param( + "timeout", + None, + None, + TimeoutError, + "Timed out after", + 5, + 5, + None, + id="raises TimeoutError after 5 seconds of continuous retries", + ), + pytest.param( + "timeout", + None, + None, + TimeoutError, + "Timed out after", + 15, + 15, + None, + id="raises TimeoutError after 15 seconds of continuous retries", + ), + pytest.param( + "halt", + 1, + None, + ValueError, + "halt error", + 60, + None, + None, + id="raises ValueError immediately when halt error on first attempt", + ), + pytest.param( + "halt", + 2, + None, + ValueError, + "halt error", + 60, + None, + None, + id="raises ValueError after 1 retry when halt error on second attempt", + ), + pytest.param( + "halt", + 3, + None, + ValueError, + "halt error", + 60, + None, + None, + id="raises ValueError after 2 retries when halt error on third attempt", + ), + pytest.param( + "unexpected", + 1, + None, + RuntimeError, + "unexpected", + 60, + None, + None, + id="raises RuntimeError immediately on unexpected exception", + ), + pytest.param( + "unexpected", + 3, + None, + RuntimeError, + "unexpected", + 60, + None, + None, + id="raises RuntimeError after 2 retries on unexpected exception", + ), ], ) def test_poll_behavior( @@ -137,20 +255,20 @@ def test_poll_behavior( def fn() -> tuple[Any, Optional[RetryError]]: nonlocal call_count call_count += 1 - + if scenario == "success": if call_count < attempts: return None, RetryError.continues(f"attempt {call_count}") return result_value, None - + elif scenario == "timeout": return None, RetryError.continues("retrying") - + elif scenario == "halt": - if call_count < attempts: + if call_count < attempts: return None, RetryError.continues("retrying") return None, RetryError.halt(ValueError(exception_msg)) - + elif scenario == "unexpected": if call_count < attempts: return None, RetryError.continues("retrying") @@ -167,10 +285,10 @@ def fn() -> tuple[Any, Optional[RetryError]]: else: with pytest.raises(exception_type) as exc_info: poll(fn, timeout=timedelta(seconds=timeout), clock=clock) - + assert exception_msg in str(exc_info.value) assert call_count >= 1 - + if scenario == "timeout": assert clock.time() >= min_time - 1 elif scenario in ("halt", "unexpected"): From 0555e6d975fb898a2cedb71804bac3b436b20b2d Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Mon, 6 Oct 2025 13:40:46 +0000 Subject: [PATCH 3/5] update --- databricks/sdk/retries.py | 4 ++-- tests/test_retries.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/databricks/sdk/retries.py b/databricks/sdk/retries.py index 0fc04aa30..a478d7a9f 100644 --- a/databricks/sdk/retries.py +++ b/databricks/sdk/retries.py @@ -2,7 +2,7 @@ import logging from datetime import timedelta from random import random, uniform -from typing import Callable, Optional, Sequence, Type, TypeVar +from typing import Callable, Optional, Sequence, Tuple, Type, TypeVar from .clock import Clock, RealClock @@ -105,7 +105,7 @@ def _backoff(attempt: int) -> float: # It is used internally by the SDK to poll for the result of an operation. # It can be changed in the future without any notice. def poll( - fn: Callable[[], tuple[Optional[T], Optional[RetryError]]], + fn: Callable[[], Tuple[Optional[T], Optional[RetryError]]], timeout: timedelta = timedelta(minutes=20), clock: Optional[Clock] = None, ) -> T: diff --git a/tests/test_retries.py b/tests/test_retries.py index bfaa0099b..3fc97114d 100644 --- a/tests/test_retries.py +++ b/tests/test_retries.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Any, Literal, Optional, Type +from typing import Any, Literal, Optional, Tuple, Type import pytest @@ -99,7 +99,7 @@ def foo(): None, 60, 3.10, - 3.90, + 4.50, id="returns dict after 2 retries with linear backoff (1s+2s)", ), pytest.param( @@ -109,8 +109,8 @@ def foo(): None, None, 60, - 10.25, - 11.75, + 10.2, + 13.0, id="returns list after 4 retries with linear backoff (1s+2s+3s+4s)", ), pytest.param( @@ -252,7 +252,7 @@ def test_poll_behavior( clock: FakeClock = FakeClock() call_count: int = 0 - def fn() -> tuple[Any, Optional[RetryError]]: + def fn() -> Tuple[Any, Optional[RetryError]]: nonlocal call_count call_count += 1 @@ -290,6 +290,6 @@ def fn() -> tuple[Any, Optional[RetryError]]: assert call_count >= 1 if scenario == "timeout": - assert clock.time() >= min_time - 1 + assert clock.time() >= min_time elif scenario in ("halt", "unexpected"): assert call_count == attempts From 5ed83573ef71e45d6c130412dd22524eb40b3ba6 Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Mon, 6 Oct 2025 14:40:09 +0000 Subject: [PATCH 4/5] update --- databricks/sdk/retries.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/databricks/sdk/retries.py b/databricks/sdk/retries.py index a478d7a9f..6a98110b8 100644 --- a/databricks/sdk/retries.py +++ b/databricks/sdk/retries.py @@ -101,9 +101,6 @@ def _backoff(attempt: int) -> float: return wait + jitter -# This function is not meant to be used directly by users. -# It is used internally by the SDK to poll for the result of an operation. -# It can be changed in the future without any notice. def poll( fn: Callable[[], Tuple[Optional[T], Optional[RetryError]]], timeout: timedelta = timedelta(minutes=20), @@ -112,6 +109,10 @@ def poll( """Poll a function until it succeeds or times out. The backoff is linear backoff and jitter. + + This function is not meant to be used directly by users. + It is used internally by the SDK to poll for the result of an operation. + It can be changed in the future without any notice. :param fn: Function that returns (result, error). Return (None, RetryError.continues("msg")) to continue polling. From b633dc1731a3121345485a3b90404b7dfc855ad0 Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Mon, 6 Oct 2025 14:57:09 +0000 Subject: [PATCH 5/5] update --- databricks/sdk/retries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/sdk/retries.py b/databricks/sdk/retries.py index 6a98110b8..a6cf5d8dc 100644 --- a/databricks/sdk/retries.py +++ b/databricks/sdk/retries.py @@ -109,7 +109,7 @@ def poll( """Poll a function until it succeeds or times out. The backoff is linear backoff and jitter. - + This function is not meant to be used directly by users. It is used internally by the SDK to poll for the result of an operation. It can be changed in the future without any notice.