Skip to content

Commit 14a0931

Browse files
committed
Merge branch 'develop' into leakage
2 parents a9beaf1 + ad2e107 commit 14a0931

File tree

4 files changed

+153
-5
lines changed

4 files changed

+153
-5
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2619,6 +2619,7 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
26192619
[--tunnel-ssh-key-passphrase TUNNEL_SSH_KEY_PASSPHRASE]
26202620
[--tunnel-remote-port TUNNEL_REMOTE_PORT] [--threadless]
26212621
[--threaded] [--num-workers NUM_WORKERS] [--enable-events]
2622+
[--inactive-conn-cleanup-timeout INACTIVE_CONN_CLEANUP_TIMEOUT]
26222623
[--enable-proxy-protocol] [--enable-conn-pool] [--key-file KEY_FILE]
26232624
[--cert-file CERT_FILE] [--client-recvbuf-size CLIENT_RECVBUF_SIZE]
26242625
[--server-recvbuf-size SERVER_RECVBUF_SIZE]
@@ -2682,6 +2683,16 @@ options:
26822683
--enable-events Default: False. Enables core to dispatch lifecycle
26832684
events. Plugins can be used to subscribe for core
26842685
events.
2686+
--inactive-conn-cleanup-timeout INACTIVE_CONN_CLEANUP_TIMEOUT
2687+
Time after which inactive works must be cleaned up.
2688+
Increase this value if your backend services are slow
2689+
to response or when proxy.py is handling a high
2690+
volume. When running proxy.py on Google Cloud (GCP)
2691+
you may see 'backend_connection_closed_before_data_sen
2692+
t_to_client', with curl clients you may see 'Empty
2693+
reply from server' error when '--inactive-conn-
2694+
cleanup-timeout' value is low for your use-case.
2695+
Default 1 seconds
26852696
--enable-proxy-protocol
26862697
Default: False. If used, will enable proxy protocol.
26872698
Only version 1 is currently supported.

proxy/common/leakage.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
13+
14+
class Leakage:
15+
"""Leaky Bucket algorithm."""
16+
17+
def __init__(self, rate: int) -> None:
18+
"""Initialize the leaky bucket with a specified leak rate in bytes per second."""
19+
# Maximum number of tokens the bucket can hold (bytes per second)
20+
self.rate = rate
21+
self.tokens = rate
22+
self.last_check = time.time()
23+
24+
def _refill(self) -> None:
25+
"""Refill tokens based on the elapsed time since the last check."""
26+
now = time.time()
27+
elapsed = now - self.last_check
28+
# Add tokens proportional to elapsed time, up to the rate
29+
self.tokens += int(elapsed * self.rate)
30+
# Cap tokens at the maximum rate to enforce the rate limit
31+
self.tokens = min(self.tokens, self.rate)
32+
self.last_check = now
33+
34+
def release(self, tokens: int) -> None:
35+
"""When you are unable to consume amount units of token, release them into the bucket.
36+
37+
E.g. say you wanted to read 1024 units, but only 24 units were read, then put
38+
back unconsumed 1000 tokens back in the bucket."""
39+
if tokens < 0:
40+
raise ValueError('Cannot release a negative number of tokens')
41+
self.tokens += tokens
42+
self.tokens = min(self.tokens, self.rate)
43+
44+
def consume(self, amount: int) -> int:
45+
"""Attempt to consume the amount from the bucket.
46+
47+
Returns the amount allowed to be sent, up to the available tokens (rate).
48+
"""
49+
self._refill()
50+
allowed = min(amount, self.tokens)
51+
self.tokens -= allowed
52+
return allowed

proxy/core/work/threadless.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
cast,
2121
)
2222

23+
from ...common.flag import flags as proxy_flags
2324
from ...common.types import Readables, Writables, SelectableEvents
2425
from ...common.logger import Logger
2526
from ...common.constants import (
@@ -37,6 +38,20 @@
3738
logger = logging.getLogger(__name__)
3839

3940

41+
proxy_flags.add_argument(
42+
'--inactive-conn-cleanup-timeout',
43+
default=DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT,
44+
help='Time after which inactive works must be cleaned up. '
45+
+ 'Increase this value if your backend services are slow to response '
46+
+ 'or when proxy.py is handling a high volume. When running proxy.py on Google Cloud (GCP) '
47+
+ "you may see 'backend_connection_closed_before_data_sent_to_client', with curl clients "
48+
+ "you may see 'Empty reply from server' error when '--inactive-conn-cleanup-timeout' "
49+
+ 'value is low for your use-case. Default {0} seconds'.format(
50+
DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT,
51+
),
52+
)
53+
54+
4055
class Threadless(ABC, Generic[T]):
4156
"""Work executor base class.
4257
@@ -87,7 +102,9 @@ def __init__(
87102
SelectableEvents,
88103
] = {}
89104
self.wait_timeout: float = DEFAULT_WAIT_FOR_TASKS_TIMEOUT
90-
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT
105+
self.cleanup_inactive_timeout: float = float(
106+
self.flags.inactive_conn_cleanup_timeout,
107+
)
91108
self._total: int = 0
92109
# When put at the top, causes circular import error
93110
# since integrated ssh tunnel was introduced.
@@ -318,10 +335,17 @@ def _cleanup(self, work_id: int, reason: str) -> None:
318335
self.selector.unregister(fileno)
319336
self.registered_events_by_work_ids[work_id].clear()
320337
del self.registered_events_by_work_ids[work_id]
321-
self.works[work_id].shutdown()
322-
del self.works[work_id]
323-
if self.work_queue_fileno() is not None:
324-
os.close(work_id)
338+
try:
339+
self.works[work_id].shutdown()
340+
except Exception as exc:
341+
logger.exception(
342+
'Error when shutting down work#{0}'.format(work_id),
343+
exc_info=exc,
344+
)
345+
finally:
346+
del self.works[work_id]
347+
if self.work_queue_fileno() is not None:
348+
os.close(work_id)
325349

326350
def _create_tasks(
327351
self,

tests/common/test_leakage.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
13+
import unittest
14+
15+
from proxy.common.leakage import Leakage
16+
17+
18+
class TestLeakage(unittest.TestCase):
19+
20+
def test_initial_consume_no_tokens(self) -> None:
21+
# Test consuming with no tokens available initially
22+
rate = 100 # bytes per second
23+
bucket = Leakage(rate)
24+
self.assertEqual(
25+
bucket.consume(150),
26+
100,
27+
) # No tokens yet, so expect 0 bytes to be sent
28+
29+
def test_consume_with_refill(self) -> None:
30+
# Test consuming with refill after waiting
31+
rate = 100 # bytes per second
32+
bucket = Leakage(rate)
33+
time.sleep(1) # Wait for a second to allow refill
34+
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available
35+
36+
def test_consume_above_leak_rate(self) -> None:
37+
# Test attempting to consume more than the leak rate after a refill
38+
rate = 100 # bytes per second
39+
bucket = Leakage(rate)
40+
time.sleep(1) # Wait for a second to allow refill
41+
self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed
42+
43+
def test_repeated_consume_with_partial_refill(self) -> None:
44+
# Test repeated consumption with partial refill
45+
rate = 100 # bytes per second
46+
bucket = Leakage(rate)
47+
48+
time.sleep(1) # Allow tokens to accumulate
49+
bucket.consume(80) # Consume 80 bytes, should leave 20
50+
time.sleep(0.5) # Wait half a second to refill by 50 bytes
51+
52+
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now
53+
54+
def test_negative_token_guard(self) -> None:
55+
# Ensure tokens do not go negative
56+
rate = 100 # bytes per second
57+
bucket = Leakage(rate)
58+
time.sleep(1) # Allow tokens to accumulate
59+
bucket.consume(150) # Consume all available tokens
60+
self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left
61+
self.assertEqual(bucket.tokens, 0) # Tokens should not be negative

0 commit comments

Comments
 (0)