Skip to content

Commit 34d04be

Browse files
committed
Leakage class
1 parent 71d796b commit 34d04be

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

proxy/common/leakage.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
13+
14+
class Leakage:
15+
"""Leaky Bucket algorithm."""
16+
17+
def __init__(self, rate: int):
18+
"""Initialize the leaky bucket with a specified leak rate in bytes per second."""
19+
# Maximum number of tokens the bucket can hold (bytes per second)
20+
self.rate = rate
21+
self.tokens = rate
22+
self.last_check = time.time()
23+
24+
def _refill(self):
25+
"""Refill tokens based on the elapsed time since the last check."""
26+
now = time.time()
27+
elapsed = now - self.last_check
28+
# Add tokens proportional to elapsed time, up to the rate
29+
self.tokens += int(elapsed * self.rate)
30+
# Cap tokens at the maximum rate to enforce the rate limit
31+
self.tokens = min(self.tokens, self.rate)
32+
self.last_check = now
33+
34+
def release(self, tokens: int) -> None:
35+
"""When you are unable to consume amount units of token, release them into the bucket.
36+
37+
E.g. say you wanted to read 1024 units, but only 24 units were read, then put
38+
back unconsumed 1000 tokens back in the bucket."""
39+
if tokens < 0:
40+
raise ValueError("Cannot release a negative number of tokens")
41+
self.tokens += tokens
42+
self.tokens = min(self.tokens, self.rate)
43+
44+
def consume(self, amount: int) -> int:
45+
"""Attempt to consume the amount from the bucket.
46+
47+
Returns the amount allowed to be sent, up to the available tokens (rate).
48+
"""
49+
self._refill()
50+
allowed = min(amount, self.tokens)
51+
self.tokens -= allowed
52+
return allowed

tests/common/test_leakage.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
13+
import unittest
14+
15+
from proxy.common.leakage import Leakage
16+
17+
18+
class TestLeakage(unittest.TestCase):
19+
def test_initial_consume_no_tokens(self):
20+
# Test consuming with no tokens available initially
21+
rate = 100 # bytes per second
22+
bucket = Leakage(rate)
23+
self.assertEqual(
24+
bucket.consume(150),
25+
100,
26+
) # No tokens yet, so expect 0 bytes to be sent
27+
28+
def test_consume_with_refill(self):
29+
# Test consuming with refill after waiting
30+
rate = 100 # bytes per second
31+
bucket = Leakage(rate)
32+
time.sleep(1) # Wait for a second to allow refill
33+
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available
34+
35+
def test_consume_above_leak_rate(self):
36+
# Test attempting to consume more than the leak rate after a refill
37+
rate = 100 # bytes per second
38+
bucket = Leakage(rate)
39+
time.sleep(1) # Wait for a second to allow refill
40+
self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed
41+
42+
def test_repeated_consume_with_partial_refill(self):
43+
# Test repeated consumption with partial refill
44+
rate = 100 # bytes per second
45+
bucket = Leakage(rate)
46+
47+
time.sleep(1) # Allow tokens to accumulate
48+
bucket.consume(80) # Consume 80 bytes, should leave 20
49+
time.sleep(0.5) # Wait half a second to refill by 50 bytes
50+
51+
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now
52+
53+
def test_negative_token_guard(self):
54+
# Ensure tokens do not go negative
55+
rate = 100 # bytes per second
56+
bucket = Leakage(rate)
57+
time.sleep(1) # Allow tokens to accumulate
58+
bucket.consume(150) # Consume all available tokens
59+
self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left
60+
self.assertEqual(bucket.tokens, 0) # Tokens should not be negative

0 commit comments

Comments
 (0)