Skip to content

Commit 161c30a

Browse files
authored
Add poll function for LRO which follows linear backoff with jitter. (#1065)
## 🥞 Stacked PR Use this [link](https://github.com/databricks/databricks-sdk-py/pull/1065/files) to review incremental changes. - [**stack/add-retry-logic**](#1065) [[Files changed](https://github.com/databricks/databricks-sdk-py/pull/1065/files)] --------- ## What changes are proposed in this pull request? This PR introduces a polling function for long-running operations (LRO) that uses a linear backoff strategy with random jitter and a maximum wait cap of 10 seconds. This function is similar to the one in the Go SDK. ## How is this tested? Added unit tests. NO_CHANGELOG=true
1 parent 156ec40 commit 161c30a

File tree

2 files changed

+323
-3
lines changed

2 files changed

+323
-3
lines changed

databricks/sdk/retries.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import functools
22
import logging
33
from datetime import timedelta
4-
from random import random
5-
from typing import Callable, Optional, Sequence, Type
4+
from random import random, uniform
5+
from typing import Callable, Optional, Sequence, Tuple, Type, TypeVar
66

77
from .clock import Clock, RealClock
88

99
logger = logging.getLogger(__name__)
1010

11+
T = TypeVar("T")
12+
1113

1214
def retried(
1315
*,
@@ -67,3 +69,101 @@ def wrapper(*args, **kwargs):
6769
return wrapper
6870

6971
return decorator
72+
73+
74+
class RetryError(Exception):
75+
"""Error that can be returned from poll functions to control retry behavior."""
76+
77+
def __init__(self, err: Exception, halt: bool = False):
78+
self.err = err
79+
self.halt = halt
80+
super().__init__(str(err))
81+
82+
@staticmethod
83+
def continues(msg: str) -> "RetryError":
84+
"""Create a non-halting retry error with a message."""
85+
return RetryError(Exception(msg), halt=False)
86+
87+
@staticmethod
88+
def halt(err: Exception) -> "RetryError":
89+
"""Create a halting retry error."""
90+
return RetryError(err, halt=True)
91+
92+
93+
def _backoff(attempt: int) -> float:
94+
"""Calculate backoff time with jitter.
95+
96+
Linear backoff: attempt * 1 second, capped at 10 seconds
97+
Plus random jitter between 50ms and 750ms.
98+
"""
99+
wait = min(10, attempt)
100+
jitter = uniform(0.05, 0.75)
101+
return wait + jitter
102+
103+
104+
def poll(
105+
fn: Callable[[], Tuple[Optional[T], Optional[RetryError]]],
106+
timeout: timedelta = timedelta(minutes=20),
107+
clock: Optional[Clock] = None,
108+
) -> T:
109+
"""Poll a function until it succeeds or times out.
110+
111+
The backoff is linear backoff and jitter.
112+
113+
This function is not meant to be used directly by users.
114+
It is used internally by the SDK to poll for the result of an operation.
115+
It can be changed in the future without any notice.
116+
117+
:param fn: Function that returns (result, error).
118+
Return (None, RetryError.continues("msg")) to continue polling.
119+
Return (None, RetryError.halt(err)) to stop with error.
120+
Return (result, None) on success.
121+
:param timeout: Maximum time to poll (default: 20 minutes)
122+
:param clock: Clock implementation for testing (default: RealClock)
123+
:returns: The result of the successful function call
124+
:raises TimeoutError: If the timeout is reached
125+
:raises Exception: If a halting error is encountered
126+
127+
Example:
128+
def check_operation():
129+
op = get_operation()
130+
if not op.done:
131+
return None, RetryError.continues("operation still in progress")
132+
if op.error:
133+
return None, RetryError.halt(Exception(f"operation failed: {op.error}"))
134+
return op.result, None
135+
136+
result = poll(check_operation, timeout=timedelta(minutes=5))
137+
"""
138+
if clock is None:
139+
clock = RealClock()
140+
141+
deadline = clock.time() + timeout.total_seconds()
142+
attempt = 0
143+
last_err = None
144+
145+
while clock.time() < deadline:
146+
attempt += 1
147+
148+
try:
149+
result, err = fn()
150+
151+
if err is None:
152+
return result
153+
154+
if err.halt:
155+
raise err.err
156+
157+
# Continue polling.
158+
last_err = err.err
159+
wait = _backoff(attempt)
160+
logger.debug(f"{str(err.err).rstrip('.')}. Sleeping {wait:.3f}s")
161+
clock.sleep(wait)
162+
163+
except RetryError:
164+
raise
165+
except Exception as e:
166+
# Unexpected error, halt immediately.
167+
raise e
168+
169+
raise TimeoutError(f"Timed out after {timeout}") from last_err

tests/test_retries.py

Lines changed: 221 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from datetime import timedelta
2+
from typing import Any, Literal, Optional, Tuple, Type
23

34
import pytest
45

56
from databricks.sdk.errors import NotFound, ResourceDoesNotExist
6-
from databricks.sdk.retries import retried
7+
from databricks.sdk.retries import RetryError, poll, retried
78
from tests.clock import FakeClock
89

910

@@ -73,3 +74,222 @@ def foo():
7374
raise KeyError(1)
7475

7576
foo()
77+
78+
79+
@pytest.mark.parametrize(
80+
"scenario,attempts,result_value,exception_type,exception_msg,timeout,min_time,max_time",
81+
[
82+
pytest.param(
83+
"success",
84+
1,
85+
"immediate",
86+
None,
87+
None,
88+
60,
89+
0.0,
90+
0.0,
91+
id="returns string immediately on first attempt with no sleep",
92+
),
93+
pytest.param("success", 2, 42, None, None, 60, 1.05, 1.75, id="returns integer after 1 retry with ~1s backoff"),
94+
pytest.param(
95+
"success",
96+
3,
97+
{"key": "val"},
98+
None,
99+
None,
100+
60,
101+
3.10,
102+
4.50,
103+
id="returns dict after 2 retries with linear backoff (1s+2s)",
104+
),
105+
pytest.param(
106+
"success",
107+
5,
108+
[1, 2],
109+
None,
110+
None,
111+
60,
112+
10.2,
113+
13.0,
114+
id="returns list after 4 retries with linear backoff (1s+2s+3s+4s)",
115+
),
116+
pytest.param(
117+
"success",
118+
1,
119+
None,
120+
None,
121+
None,
122+
60,
123+
0.0,
124+
0.0,
125+
id="returns None as valid result immediately (None is acceptable)",
126+
),
127+
pytest.param(
128+
"success", 5, "ok", None, None, 200, 10.2, 13.0, id="verifies linear backoff increase over 4 retries"
129+
),
130+
pytest.param(
131+
"success",
132+
11,
133+
"ok",
134+
None,
135+
None,
136+
200,
137+
55.5,
138+
62.5,
139+
id="verifies linear backoff approaching 10s cap over 10 retries",
140+
),
141+
pytest.param(
142+
"success", 15, "ok", None, None, 200, 95.7, 105.5, id="verifies backoff is capped at 10s after 10th retry"
143+
),
144+
pytest.param(
145+
"timeout",
146+
None,
147+
None,
148+
TimeoutError,
149+
"Timed out after",
150+
1,
151+
1,
152+
None,
153+
id="raises TimeoutError after 1 second of continuous retries",
154+
),
155+
pytest.param(
156+
"timeout",
157+
None,
158+
None,
159+
TimeoutError,
160+
"Timed out after",
161+
5,
162+
5,
163+
None,
164+
id="raises TimeoutError after 5 seconds of continuous retries",
165+
),
166+
pytest.param(
167+
"timeout",
168+
None,
169+
None,
170+
TimeoutError,
171+
"Timed out after",
172+
15,
173+
15,
174+
None,
175+
id="raises TimeoutError after 15 seconds of continuous retries",
176+
),
177+
pytest.param(
178+
"halt",
179+
1,
180+
None,
181+
ValueError,
182+
"halt error",
183+
60,
184+
None,
185+
None,
186+
id="raises ValueError immediately when halt error on first attempt",
187+
),
188+
pytest.param(
189+
"halt",
190+
2,
191+
None,
192+
ValueError,
193+
"halt error",
194+
60,
195+
None,
196+
None,
197+
id="raises ValueError after 1 retry when halt error on second attempt",
198+
),
199+
pytest.param(
200+
"halt",
201+
3,
202+
None,
203+
ValueError,
204+
"halt error",
205+
60,
206+
None,
207+
None,
208+
id="raises ValueError after 2 retries when halt error on third attempt",
209+
),
210+
pytest.param(
211+
"unexpected",
212+
1,
213+
None,
214+
RuntimeError,
215+
"unexpected",
216+
60,
217+
None,
218+
None,
219+
id="raises RuntimeError immediately on unexpected exception",
220+
),
221+
pytest.param(
222+
"unexpected",
223+
3,
224+
None,
225+
RuntimeError,
226+
"unexpected",
227+
60,
228+
None,
229+
None,
230+
id="raises RuntimeError after 2 retries on unexpected exception",
231+
),
232+
],
233+
)
234+
def test_poll_behavior(
235+
scenario: Literal["success", "timeout", "halt", "unexpected"],
236+
attempts: Optional[int],
237+
result_value: Any,
238+
exception_type: Optional[Type[Exception]],
239+
exception_msg: Optional[str],
240+
timeout: int,
241+
min_time: Optional[float],
242+
max_time: Optional[float],
243+
) -> None:
244+
"""
245+
Comprehensive test for poll function covering all scenarios:
246+
- Success cases with various return types and retry counts
247+
- Backoff timing behavior (linear increase, 10s cap)
248+
- Timeout behavior
249+
- Halting errors
250+
- Unexpected exceptions
251+
"""
252+
clock: FakeClock = FakeClock()
253+
call_count: int = 0
254+
255+
def fn() -> Tuple[Any, Optional[RetryError]]:
256+
nonlocal call_count
257+
call_count += 1
258+
259+
if scenario == "success":
260+
if call_count < attempts:
261+
return None, RetryError.continues(f"attempt {call_count}")
262+
return result_value, None
263+
264+
elif scenario == "timeout":
265+
return None, RetryError.continues("retrying")
266+
267+
elif scenario == "halt":
268+
if call_count < attempts:
269+
return None, RetryError.continues("retrying")
270+
return None, RetryError.halt(ValueError(exception_msg))
271+
272+
elif scenario == "unexpected":
273+
if call_count < attempts:
274+
return None, RetryError.continues("retrying")
275+
raise RuntimeError(exception_msg)
276+
277+
if scenario == "success":
278+
result: Any = poll(fn, timeout=timedelta(seconds=timeout), clock=clock)
279+
assert result == result_value
280+
assert call_count == attempts
281+
if min_time is not None:
282+
assert clock.time() >= min_time
283+
if max_time is not None:
284+
assert clock.time() <= max_time
285+
else:
286+
with pytest.raises(exception_type) as exc_info:
287+
poll(fn, timeout=timedelta(seconds=timeout), clock=clock)
288+
289+
assert exception_msg in str(exc_info.value)
290+
assert call_count >= 1
291+
292+
if scenario == "timeout":
293+
assert clock.time() >= min_time
294+
elif scenario in ("halt", "unexpected"):
295+
assert call_count == attempts

0 commit comments

Comments
 (0)