Skip to content

Commit df7c611

Browse files
committed
feat(core): Introduce rate limiting to the client
Add a "semaphore_impl" attribute on the various handlers. Allow a new, optional, `concurrent_request_limit` argument to the client constructor. Change the client to bound the number of outstanding async requests with a semaphore limited to `concurrent_request_limit`. Fixes #664
1 parent 242d91e commit df7c611

File tree

4 files changed

+33
-1
lines changed

4 files changed

+33
-1
lines changed

kazoo/client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def __init__(
120120
ca=None,
121121
use_ssl=False,
122122
verify_certs=True,
123+
concurrent_request_limit=0,
123124
**kwargs,
124125
):
125126
"""Create a :class:`KazooClient` instance. All time arguments
@@ -241,6 +242,18 @@ def __init__(
241242
self.keyfile = keyfile
242243
self.keyfile_password = keyfile_password
243244
self.ca = ca
245+
if concurrent_request_limit > 0:
246+
self.logger.info(
247+
"Zookeeper client rate-limited to %d concurrent requests",
248+
concurrent_request_limit,
249+
)
250+
self.rate_limiting_sem = self.handler.semaphore_impl(
251+
concurrent_request_limit
252+
)
253+
254+
else:
255+
self.rate_limiting_sem = None
256+
244257
# Curator like simplified state tracking, and listeners for
245258
# state transitions
246259
self._state = KeeperState.CLOSED
@@ -635,6 +648,16 @@ def _call(self, request, async_object):
635648
async_object.set_exception(SessionExpiredError())
636649
return False
637650

651+
if self.rate_limiting_sem:
652+
if not self.rate_limiting_sem.acquire(blocking=False):
653+
self.logger.info(
654+
"Limiting concurrent requests. Waiting for completion."
655+
)
656+
# Actually block on the sempahore here
657+
self.rate_limiting_sem.acquire(blocking=True)
658+
# Register the release of the semaphore on async request completion
659+
async_object.rawlink(lambda _res: self.rate_limiting_sem.release())
660+
638661
self._queue.append((request, async_object))
639662

640663
# wake the connection, guarding against a race with close()

kazoo/handlers/eventlet.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from eventlet.green import threading as green_threading
1212
from eventlet.green import selectors as green_selectors
1313
from eventlet import queue as green_queue
14+
from eventlet import semaphore as green_semaphore
1415

1516
from kazoo.handlers import utils
1617
from kazoo.handlers.utils import selector_select
@@ -80,6 +81,7 @@ class SequentialEventletHandler(object):
8081
name = "sequential_eventlet_handler"
8182
queue_impl = green_queue.LightQueue
8283
queue_empty = green_queue.Empty
84+
semaphore_impl = green_semaphore.BoundedSemaphore
8385

8486
def __init__(self):
8587
"""Create a :class:`SequentialEventletHandler` instance"""

kazoo/handlers/gevent.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""A gevent based handler."""
2+
23
from __future__ import absolute_import
34

45
import atexit
@@ -14,7 +15,10 @@
1415

1516
from kazoo.handlers.utils import selector_select
1617

17-
from gevent.lock import Semaphore, RLock
18+
from gevent.lock import (
19+
BoundedSemaphore as Semaphore,
20+
RLock as RLock,
21+
)
1822

1923
from kazoo.handlers import utils
2024

@@ -52,6 +56,7 @@ class SequentialGeventHandler(object):
5256
queue_impl = gevent.queue.Queue
5357
queue_empty = gevent.queue.Empty
5458
sleep_func = staticmethod(gevent.sleep)
59+
semaphore_impl = Semaphore
5560

5661
def __init__(self):
5762
"""Create a :class:`SequentialGeventHandler` instance"""

kazoo/handlers/threading.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
:class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead.
1111
1212
"""
13+
1314
from __future__ import absolute_import
1415

1516
import atexit
@@ -95,6 +96,7 @@ class SequentialThreadingHandler(object):
9596
sleep_func = staticmethod(time.sleep)
9697
queue_impl = queue.Queue
9798
queue_empty = queue.Empty
99+
semaphore_impl = threading.BoundedSemaphore
98100

99101
def __init__(self):
100102
"""Create a :class:`SequentialThreadingHandler` instance"""

0 commit comments

Comments
 (0)