Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions other/token_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Implementation of Token Bucket Algorithm
# Token `rate` is added to the bucket every `frequency` in seconds.
# The bucket can hold tokens up to `capacity` (full).
# The bucket starts full.
# Each request consume one token.
# If a token arrives when the bucket is full, token is discarded.
# If a request arrives when bucket is empty, request will be discarded.
# If bucket has tokens available, requests will pass.
# https://en.wikipedia.org/wiki/Token_bucket
import threading
import time


class TokenBucketRateLimiter:
def __init__(self, rate: int, capacity: int, frequency: int) -> None:
"""
Initialize a Token Bucket rate limiter.

:param rate: Number of tokens added to the bucket per refill
:param capacity: Maximum number of tokens the bucket can hold.
:param frequency: Frequency of refill in seconds
>>> bucket = TokenBucketRateLimiter(4, 4, 60)
>>> bucket.tokens
4
>>> bucket.capacity
4
>>> bucket.frequency
60
"""
self.rate = rate # Tokens added per refill
self.capacity = capacity # Maximum capacity of the bucket
self.frequency = frequency # Frequency tokens are refilled
self.tokens = capacity # Current tokens in the bucket
self.last_checked = time.time() # Time when tokens were last checked
self.lock = threading.Lock() # To make the rate limiter thread-safe

def _add_tokens(self) -> None:
"""
Refill tokens only when a full minute has passed.
>>> bucket = TokenBucketRateLimiter(1, 4, 60)
>>> bucket.tokens # Initially has 4 token (rate)
4
>>> bucket._add_tokens()
>>> bucket.tokens # Bucket already full
4
"""
current_time = time.time()
elapsed_time = current_time - self.last_checked

if elapsed_time >= self.frequency:
minutes_passed = int(elapsed_time // self.frequency)

# Add tokens based on rate
added_tokens = minutes_passed * self.rate
self.tokens = min(self.capacity, self.tokens + added_tokens)

# Update the last checked time
self.last_checked += minutes_passed * self.frequency

def allow_request(self) -> bool:
"""
Check if a request is allowed.
If there are enough tokens, it consumes one token.
:return: True if the request is allowed, False otherwise.
>>> bucket = TokenBucketRateLimiter(1, 2, 60)
>>> bucket.allow_request() # Token is available, request passes
True
>>> bucket.allow_request() # Token is available, request passes
True
>>> bucket.allow_request() # No token left, request is dropped
False
"""
with self.lock:
self._add_tokens()
if self.tokens >= 1:
self.tokens -= 1
return True
return False


if __name__ == "__main__":
import doctest

doctest.testmod()

print("Allow 4 requests per minute, capacity of 4")
bucket = TokenBucketRateLimiter(4, 4, 60)
total_requests = 10
delay_in_seconds = 10
print("Simulate 1 request per 10 seconds...")
for i in range(total_requests):
result = "pass" if bucket.allow_request() else "dropped"
print(
f"Request {i+1}/{total_requests} \
timeline: {i*delay_in_seconds} seconds = {result}"
)
time.sleep(delay_in_seconds)