Skip to content

Commit 1a73cb8

Browse files
paultiqCopilotvutran1710
authored
Keep Retrying until Max Delay Has Expired (#200)
* feat: implement retry_until_max_delay for limiter, to retry until the max delay has expired * reduce log verbosity * Update pyrate_limiter/limiter.py Add docstring Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * move delay increment outside loop and conditionally apply the raise-delay-exception * condition to apply the right timeout * fix fetch depth * fetch-depth: was pulling wrong commit * undo * Merge and make typing happy again * fix the delay: needs to be after we get waiting time * Update pyrate_limiter/limiter.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Vu Tran <me@vutr.io>
1 parent 26e9a4d commit 1a73cb8

File tree

2 files changed

+82
-37
lines changed

2 files changed

+82
-37
lines changed

benchmarks/stress_limiters.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
from typing import Literal
1111
from typing import Optional
1212

13+
from pyrate_limiter import AbstractClock
1314
from pyrate_limiter import Duration
1415
from pyrate_limiter import Limiter
1516
from pyrate_limiter import Rate
1617
from pyrate_limiter import SQLiteBucket
1718
from pyrate_limiter import SQLiteClock
19+
from pyrate_limiter import TimeClock
1820

1921
logger = logging.getLogger(__name__)
2022

@@ -34,11 +36,11 @@ def create_sqlite_limiter(rate: Rate, use_fileLock: bool, max_delay: int):
3436

3537
# retry_until_max_delay=True
3638
if use_fileLock:
37-
kwargs = dict(clock=SQLiteClock(bucket))
39+
clock: AbstractClock = SQLiteClock(bucket)
3840
else:
39-
kwargs = {}
41+
clock = TimeClock()
4042

41-
return Limiter(bucket, raise_when_fail=False, max_delay=max_delay, **kwargs)
43+
return Limiter(bucket, raise_when_fail=False, max_delay=max_delay, clock=clock, retry_until_max_delay=True)
4244

4345

4446
def create_rate_limiter_factory(

pyrate_limiter/limiter.py

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class Limiter:
6464

6565
bucket_factory: BucketFactory
6666
raise_when_fail: bool
67+
retry_until_max_delay: bool
6768
max_delay: Optional[int] = None
6869
lock: RLock
6970

@@ -73,13 +74,23 @@ def __init__(
7374
clock: AbstractClock = TimeClock(),
7475
raise_when_fail: bool = True,
7576
max_delay: Optional[Union[int, Duration]] = None,
77+
retry_until_max_delay: bool = False
7678
):
7779
"""Init Limiter using either a single bucket / multiple-bucket factory
78-
/ single rate / rate list
80+
/ single rate / rate list.
81+
82+
Parameters:
83+
argument (Union[BucketFactory, AbstractBucket, Rate, List[Rate]]): The bucket or rate configuration.
84+
clock (AbstractClock, optional): The clock instance to use for rate limiting. Defaults to TimeClock().
85+
raise_when_fail (bool, optional): Whether to raise an exception when rate limiting fails. Defaults to True.
86+
max_delay (Optional[Union[int, Duration]], optional): The maximum delay allowed for rate limiting.
87+
Defaults to None.
88+
retry_until_max_delay (bool, optional): If True, retry operations until the maximum delay is reached.
89+
Useful for ensuring operations eventually succeed within the allowed delay window. Defaults to False.
7990
"""
8091
self.bucket_factory = self._init_bucket_factory(argument, clock=clock)
8192
self.raise_when_fail = raise_when_fail
82-
93+
self.retry_until_max_delay = retry_until_max_delay
8394
if max_delay is not None:
8495
if isinstance(max_delay, Duration):
8596
max_delay = int(max_delay)
@@ -179,25 +190,40 @@ async def _handle_async():
179190
nonlocal delay
180191
delay = await delay
181192
assert isinstance(delay, int), "Delay not integer"
182-
delay += 50
183-
184-
if delay > self.max_delay:
185-
logger.error(
186-
"Required delay too large: actual=%s, expected=%s",
187-
delay,
188-
self.max_delay,
189-
)
190-
self._raise_delay_exception_if_necessary(bucket, item, delay)
191-
return False
192193

193-
await asyncio.sleep(delay / 1000)
194-
item.timestamp += delay
195-
re_acquire = bucket.put(item)
194+
total_delay = 0
195+
delay += 50
196196

197-
if isawaitable(re_acquire):
198-
re_acquire = await re_acquire
199-
200-
return _handle_reacquire(re_acquire)
197+
while True:
198+
total_delay += delay
199+
200+
if self.retry_until_max_delay:
201+
if self.max_delay is not None and total_delay > self.max_delay:
202+
logger.error("Total delay exceeded max_delay: total_delay=%s, max_delay=%s",
203+
total_delay, self.max_delay)
204+
self._raise_delay_exception_if_necessary(bucket, item, total_delay)
205+
return False
206+
else:
207+
if self.max_delay is not None and delay > self.max_delay:
208+
logger.error(
209+
"Required delay too large: actual=%s, expected=%s",
210+
delay,
211+
self.max_delay,
212+
)
213+
self._raise_delay_exception_if_necessary(bucket, item, delay)
214+
return False
215+
216+
await asyncio.sleep(delay / 1000)
217+
item.timestamp += delay
218+
re_acquire = bucket.put(item)
219+
220+
if isawaitable(re_acquire):
221+
re_acquire = await re_acquire
222+
223+
if not self.retry_until_max_delay:
224+
return _handle_reacquire(re_acquire)
225+
elif re_acquire:
226+
return True
201227

202228
return _handle_async()
203229

@@ -213,23 +239,40 @@ async def _handle_async():
213239
self._raise_bucket_full_if_necessary(bucket, item)
214240
return False
215241

216-
delay += 50
242+
total_delay = 0
217243

218-
if delay > self.max_delay:
219-
logger.error(
220-
"Required delay too large: actual=%s, expected=%s",
221-
delay,
222-
self.max_delay,
223-
)
224-
self._raise_delay_exception_if_necessary(bucket, item, delay)
225-
return False
244+
while True:
245+
logger.debug("delay=%d, total_delay=%s", delay, total_delay)
246+
delay = bucket.waiting(item)
247+
assert isinstance(delay, int)
248+
249+
delay += 50
250+
total_delay += delay
251+
252+
if self.max_delay is not None and total_delay > self.max_delay:
253+
logger.error(
254+
"Required delay too large: actual=%s, expected=%s",
255+
delay,
256+
self.max_delay,
257+
)
226258

227-
sleep(delay / 1000)
228-
item.timestamp += delay
229-
re_acquire = bucket.put(item)
230-
# NOTE: if delay is not Awaitable, then `bucket.put` is not Awaitable
231-
assert isinstance(re_acquire, bool)
232-
return _handle_reacquire(re_acquire)
259+
if self.retry_until_max_delay:
260+
self._raise_delay_exception_if_necessary(bucket, item, total_delay)
261+
else:
262+
self._raise_delay_exception_if_necessary(bucket, item, delay)
263+
264+
return False
265+
266+
sleep(delay / 1000)
267+
item.timestamp += delay
268+
re_acquire = bucket.put(item)
269+
# NOTE: if delay is not Awaitable, then `bucket.put` is not Awaitable
270+
assert isinstance(re_acquire, bool)
271+
272+
if not self.retry_until_max_delay:
273+
return _handle_reacquire(re_acquire)
274+
elif re_acquire:
275+
return True
233276

234277
def handle_bucket_put(
235278
self,

0 commit comments

Comments
 (0)