Skip to content

Commit 7d2b9c6

Browse files
committed
wip
1 parent 4b13135 commit 7d2b9c6

File tree

4 files changed

+22
-4
lines changed

4 files changed

+22
-4
lines changed

kazoo/client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def __init__(
125125
ca=None,
126126
use_ssl=False,
127127
verify_certs=True,
128+
max_async_requests=0,
128129
**kwargs,
129130
):
130131
"""Create a :class:`KazooClient` instance. All time arguments
@@ -246,6 +247,18 @@ def __init__(
246247
self.keyfile = keyfile
247248
self.keyfile_password = keyfile_password
248249
self.ca = ca
250+
if max_async_requests > 0:
251+
self.logger.info(
252+
"Zookeeper client rate-limited to %d concurent async requests",
253+
max_async_requests,
254+
)
255+
self.rate_limiting_sem = self.handler.semaphore_impl(
256+
max_async_requests
257+
)
258+
259+
else:
260+
self.rate_limiting_sem = None
261+
249262
# Curator like simplified state tracking, and listeners for
250263
# state transitions
251264
self._state = KeeperState.CLOSED
@@ -640,6 +653,10 @@ def _call(self, request, async_object):
640653
async_object.set_exception(SessionExpiredError())
641654
return False
642655

656+
if self.rate_limiting_sem:
657+
self.rate_limiting_sem.acquire(blocking=True)
658+
async_object.rawlink(lambda _res: self.rate_limiting_sem.release())
659+
643660
self._queue.append((request, async_object))
644661

645662
# wake the connection, guarding against a race with close()

kazoo/handlers/eventlet.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from eventlet.green import threading as green_threading
1111
from eventlet.green import selectors as green_selectors
1212
from eventlet import queue as green_queue
13+
from eventlet import semaphore as green_semaphore
1314

1415
from kazoo.handlers import utils
1516
import kazoo.python2atexit as python2atexit
@@ -80,6 +81,7 @@ class SequentialEventletHandler(object):
8081
name = "sequential_eventlet_handler"
8182
queue_impl = green_queue.LightQueue
8283
queue_empty = green_queue.Empty
84+
semaphore_impl = green_semaphore.BoundedSemaphore
8385

8486
def __init__(self):
8587
"""Create a :class:`SequentialEventletHandler` instance"""

kazoo/handlers/gevent.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313

1414
from kazoo.handlers.utils import selector_select
1515

16-
try:
17-
from gevent.lock import Semaphore, RLock
18-
except ImportError:
19-
from gevent.coros import Semaphore, RLock
16+
from gevent.lock import BoundedSemaphore as Semaphore, RLock as RLock
2017

2118
from kazoo.handlers import utils
2219
from kazoo import python2atexit
@@ -55,6 +52,7 @@ class SequentialGeventHandler(object):
5552
queue_impl = gevent.queue.Queue
5653
queue_empty = gevent.queue.Empty
5754
sleep_func = staticmethod(gevent.sleep)
55+
semaphore_impl = Semaphore
5856

5957
def __init__(self):
6058
"""Create a :class:`SequentialGeventHandler` instance"""

kazoo/handlers/threading.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class SequentialThreadingHandler(object):
100100
sleep_func = staticmethod(time.sleep)
101101
queue_impl = Queue.Queue
102102
queue_empty = Queue.Empty
103+
semaphore_impl = threading.BoundedSemaphore
103104

104105
def __init__(self):
105106
"""Create a :class:`SequentialThreadingHandler` instance"""

0 commit comments

Comments
 (0)