Skip to content

Commit d9e0e72

Browse files
laura-surcelStephenSorriaux
authored andcommitted
fix(handlers): make AsyncResult call all registered callbacks instantly if the handler has stopped running (#549)
This avoids zombie thread to appear when creating and closing the client right after. A new unit case is added.
1 parent 1452a48 commit d9e0e72

File tree

4 files changed

+49
-16
lines changed

4 files changed

+49
-16
lines changed

kazoo/handlers/gevent.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ def __init__(self):
6060
self._state_change = Semaphore()
6161
self._workers = []
6262

63+
@property
64+
def running(self):
65+
return self._running
66+
6367
class timeout_exception(gevent.Timeout):
6468
def __init__(self, msg):
6569
gevent.Timeout.__init__(self, exception=msg)

kazoo/handlers/threading.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ def __init__(self):
113113
self._state_change = threading.Lock()
114114
self._workers = []
115115

116+
@property
117+
def running(self):
118+
return self._running
119+
116120
def _create_thread_worker(self, queue):
117121
def _thread_worker(): # pragma: nocover
118122
while True:

kazoo/handlers/utils.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,14 @@ def set(self, value=None):
4646
with self._condition:
4747
self.value = value
4848
self._exception = None
49-
for callback in self._callbacks:
50-
self._handler.completion_queue.put(
51-
functools.partial(callback, self)
52-
)
49+
self._do_callbacks()
5350
self._condition.notify_all()
5451

5552
def set_exception(self, exception):
5653
"""Store the exception. Wake up the waiters."""
5754
with self._condition:
5855
self._exception = exception
59-
for callback in self._callbacks:
60-
self._handler.completion_queue.put(
61-
functools.partial(callback, self)
62-
)
56+
self._do_callbacks()
6357
self._condition.notify_all()
6458

6559
def get(self, block=True, timeout=None):
@@ -102,16 +96,13 @@ def rawlink(self, callback):
10296
"""Register a callback to call when a value or an exception is
10397
set"""
10498
with self._condition:
105-
# Are we already set? Dispatch it now
106-
if self.ready():
107-
self._handler.completion_queue.put(
108-
functools.partial(callback, self)
109-
)
110-
return
111-
11299
if callback not in self._callbacks:
113100
self._callbacks.append(callback)
114101

102+
# Are we already set? Dispatch it now
103+
if self.ready():
104+
self._do_callbacks()
105+
115106
def unlink(self, callback):
116107
"""Remove the callback set by :meth:`rawlink`"""
117108
with self._condition:
@@ -122,6 +113,18 @@ def unlink(self, callback):
122113
if callback in self._callbacks:
123114
self._callbacks.remove(callback)
124115

116+
def _do_callbacks(self):
117+
"""Execute the callbacks that were registered by :meth:`rawlink`.
118+
If the handler is in running state this method only schedules
119+
the calls to be performed by the handler. If it's stopped,
120+
the callbacks are called right away."""
121+
122+
for callback in self._callbacks:
123+
if self._handler.running:
124+
self._handler.completion_queue.put(
125+
functools.partial(callback, self))
126+
else:
127+
functools.partial(callback, self)()
125128

126129
def _set_fd_cloexec(fd):
127130
flags = fcntl.fcntl(fd, fcntl.F_GETFD)

kazoo/tests/test_client.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ def test_context(self):
11541154
eq_(self.client.get('/smith')[0], b'32')
11551155

11561156

1157-
class TestCallbacks(unittest.TestCase):
1157+
class TestSessionCallbacks(unittest.TestCase):
11581158
def test_session_callback_states(self):
11591159
from kazoo.protocol.states import KazooState, KeeperState
11601160
from kazoo.client import KazooClient
@@ -1185,6 +1185,28 @@ def test_session_callback_states(self):
11851185
eq_(client.state, KazooState.SUSPENDED)
11861186

11871187

1188+
class TestCallbacks(KazooTestCase):
1189+
def test_async_result_callbacks_are_always_called(self):
1190+
# create a callback object
1191+
callback_mock = mock.Mock()
1192+
1193+
# simulate waiting for a response
1194+
async_result = self.client.handler.async_result()
1195+
async_result.rawlink(callback_mock)
1196+
1197+
# begin the procedure to stop the client
1198+
self.client.stop()
1199+
1200+
# the response has just been received;
1201+
# this should be on another thread,
1202+
# simultaneously with the stop procedure
1203+
async_result.set_exception(
1204+
Exception("Anything that throws an exception"))
1205+
1206+
# with the fix the callback should be called
1207+
self.assertGreater(callback_mock.call_count, 0)
1208+
1209+
11881210
class TestNonChrootClient(KazooTestCase):
11891211

11901212
def test_create(self):

0 commit comments

Comments
 (0)