Skip to content

Commit e9815d1

Browse files
authored
Merge pull request #391 from michielbaird/watcher_changes
Ensure pending watches are not dropped when connection is lost
2 parents 4e0cad0 + f4d7ebc commit e9815d1

File tree

4 files changed

+40
-2
lines changed

4 files changed

+40
-2
lines changed

kazoo/client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@
4444
Sync,
4545
Transaction
4646
)
47+
from kazoo.protocol.states import Callback
48+
from kazoo.protocol.states import EventType
4749
from kazoo.protocol.states import KazooState
4850
from kazoo.protocol.states import KeeperState
51+
from kazoo.protocol.states import WatchedEvent
4952
from kazoo.retry import KazooRetry
5053
from kazoo.security import ACL
5154
from kazoo.security import OPEN_ACL_UNSAFE
@@ -192,7 +195,8 @@ def __init__(self, hosts='127.0.0.1:2181',
192195
self._state = KeeperState.CLOSED
193196
self.state = KazooState.LOST
194197
self.state_listeners = set()
195-
198+
self._child_watchers = defaultdict(set)
199+
self._data_watchers = defaultdict(set)
196200
self._reset()
197201
self.read_only = read_only
198202

@@ -309,9 +313,20 @@ def _reset(self):
309313
self._protocol_version = None
310314

311315
def _reset_watchers(self):
316+
watchers = []
317+
for child_watchers in six.itervalues(self._child_watchers):
318+
watchers.extend(child_watchers)
319+
320+
for data_watchers in six.itervalues(self._data_watchers):
321+
watchers.extend(data_watchers)
322+
312323
self._child_watchers = defaultdict(set)
313324
self._data_watchers = defaultdict(set)
314325

326+
ev = WatchedEvent(EventType.NONE, self._state, None)
327+
for watch in watchers:
328+
self.handler.dispatch_callback(Callback("watch", watch, (ev,)))
329+
315330
def _reset_session(self):
316331
self._session_id = None
317332
self._session_passwd = b'\x00' * 16

kazoo/protocol/states.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,19 @@ class EventType(object):
9393
removed). This event does not indicate the data for a child
9494
node has changed, which must have its own watch established.
9595
96+
.. attribute:: NONE
97+
98+
The connection state has been altered.
99+
96100
"""
97101
CREATED = 'CREATED'
98102
DELETED = 'DELETED'
99103
CHANGED = 'CHANGED'
100104
CHILD = 'CHILD'
105+
NONE = 'NONE'
101106

102107
EVENT_TYPE_MAP = {
108+
-1:EventType.NONE,
103109
1: EventType.CREATED,
104110
2: EventType.DELETED,
105111
3: EventType.CHANGED,

kazoo/recipe/watchers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,8 @@ def _get_children(self, event=None):
345345
raise
346346

347347
def _watcher(self, event):
348-
self._get_children(event)
348+
if event.type != "NONE":
349+
self._get_children(event)
349350

350351
def _session_watcher(self, state):
351352
if state in (KazooState.LOST, KazooState.SUSPENDED):

kazoo/tests/test_client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,22 @@ def test_closed(self):
414414
client._state = oldstate
415415
client._connection._write_sock = None
416416

417+
def test_watch_trigger_expire(self):
418+
client = self.client
419+
cv = self.make_event()
420+
421+
client.create("/test", b"")
422+
423+
def test_watch(event):
424+
cv.set()
425+
426+
client.get("/test/", watch=test_watch)
427+
self.expire_session(self.make_event)
428+
429+
430+
cv.wait(3)
431+
assert cv.is_set()
432+
417433

418434
class TestClient(KazooTestCase):
419435
def _makeOne(self, *args):

0 commit comments

Comments
 (0)