|
1 | | -from datetime import datetime, timedelta |
2 | | -import random |
| 1 | +import asyncio |
| 2 | +import math |
| 3 | +import time |
3 | 4 |
|
4 | 5 |
|
5 | | -class ModelBreaker: |
| 6 | +class ModelBreaker(object): |
6 | 7 | def __init__(self): |
7 | 8 | # 初始化 allow_time 为当前时间 |
8 | | - self.allow_time = datetime.now() |
| 9 | + self._allow_time = time.perf_counter() |
9 | 10 |
|
10 | | - def allow(self): |
11 | | - # 检查当前时间是否在 allow_time 之后 |
12 | | - return datetime.now() > self.allow_time |
| 11 | + self._waiters = 0 |
| 12 | + self._permits = 0 |
| 13 | + self._base = 0 |
13 | 14 |
|
14 | | - def reset(self, duration): |
15 | | - # 将 allow_time 重置为当前时间加上指定的持续时间 |
16 | | - self.allow_time = datetime.now() + timedelta(seconds=duration.total_seconds()) |
| 15 | + def _allow(self) -> bool: |
| 16 | + # 检查当前时间是否在 allow_time 之后 |
| 17 | + return time.perf_counter() > self._allow_time |
17 | 18 |
|
18 | | - def get_allowed_duration(self): |
| 19 | + def _get_allowed_duration(self) -> float: |
19 | 20 | # 计算当前时间与 allow_time 之间的持续时间 |
20 | | - allow_duration = self.allow_time - datetime.now() |
| 21 | + allow_duration = self._allow_time - time.perf_counter() |
| 22 | + |
| 23 | + # 如果持续时间为负,返回零 |
| 24 | + if allow_duration < 0: |
| 25 | + return 0 |
| 26 | + return allow_duration |
| 27 | + |
| 28 | + def _acquire(self) -> int: |
| 29 | + self._waiters += 1 |
| 30 | + return self._waiters |
| 31 | + |
| 32 | + def _release(self) -> None: |
| 33 | + self._waiters -= 1 |
| 34 | + self._permits += 1 |
| 35 | + |
| 36 | + def _jitter(self, i: int) -> float: |
| 37 | + if i <= self._base: |
| 38 | + return 0 |
| 39 | + return math.log2(i - self._base) |
| 40 | + |
| 41 | + def reset(self, duration: float) -> None: |
| 42 | + # 将 allow_time 重置为当前时间加上指定的持续时间 |
| 43 | + self._allow_time = time.perf_counter() + duration |
| 44 | + self._base = self._permits |
21 | 45 |
|
22 | | - # 添加一个随机抖动,该抖动的范围为 0 到 10 秒,概率密度函数为 f(x) = x/50 (0 <= x <= 10) |
23 | | - # 约有 1/100 的请求会在结束熔断的第一秒内发起重试 |
24 | | - jitter = timedelta(seconds=random.triangular(0, 10, 10)) |
| 46 | + def wait(self) -> None: |
| 47 | + i = self._acquire() |
| 48 | + while not self._allow(): |
| 49 | + time.sleep(self._get_allowed_duration() + self._jitter(i)) |
| 50 | + self._release() |
25 | 51 |
|
26 | | - # 如果持续时间为负,则返回一个零时长的 timedelta 对象 |
27 | | - if allow_duration.total_seconds() < 0: |
28 | | - return timedelta(0) + jitter |
29 | | - return allow_duration + jitter |
| 52 | + async def asyncwait(self) -> None: |
| 53 | + i = self._acquire() |
| 54 | + while not self._allow(): |
| 55 | + await asyncio.sleep(self._get_allowed_duration() + self._jitter(i)) |
| 56 | + self._release() |
0 commit comments