Skip to content

Commit fc4a5c4

Browse files
committed
feat(core): add support for watch event zxids
ZooKeeper 3.9.0 began including the zxid responsible for a watch event in the response header. This adds support for that by plumbing the information through to the WatchedEvent. The documentation and field name is based on the text in the ZooKeeper java client.
1 parent 415dc93 commit fc4a5c4

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

kazoo/protocol/connection.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ def _find_persistent_recursive_watchers(self, path):
379379
)
380380
return watchers
381381

382-
def _read_watch_event(self, buffer, offset):
382+
def _read_watch_event(self, buffer, offset, zxid):
383383
client = self.client
384384
watch, offset = Watch.deserialize(buffer, offset)
385385
path = watch.path
@@ -405,7 +405,8 @@ def _read_watch_event(self, buffer, offset):
405405

406406
# Strip the chroot if needed
407407
path = client.unchroot(path)
408-
ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path)
408+
ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], client._state, path,
409+
zxid)
409410

410411
# Last check to ignore watches if we've been stopped
411412
if client._stopped.is_set():
@@ -520,7 +521,7 @@ def _read_socket(self, read_timeout):
520521
else:
521522
async_object.set(True)
522523
elif header.xid == WATCH_XID:
523-
self._read_watch_event(buffer, offset)
524+
self._read_watch_event(buffer, offset, header.zxid)
524525
else:
525526
self.logger.log(BLATHER, "Reading for header %r", header)
526527

kazoo/protocol/states.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ class EventType(object):
117117
}
118118

119119

120-
class WatchedEvent(namedtuple("WatchedEvent", ("type", "state", "path"))):
120+
class WatchedEvent(namedtuple("WatchedEvent",
121+
("type", "state", "path", "zxid"))):
121122
"""A change on ZooKeeper that a Watcher is able to respond to.
122123
123124
The :class:`WatchedEvent` includes exactly what happened, the
@@ -138,8 +139,23 @@ class WatchedEvent(namedtuple("WatchedEvent", ("type", "state", "path"))):
138139
139140
The path of the node for the watch event.
140141
142+
.. attribute:: zxid
143+
144+
The zxid of the transaction that triggered this watch if it is
145+
of one of the following types:
146+
147+
* EventType.CREATED
148+
* EventType.DELETED
149+
* EventType.CHANGED
150+
* EventType.CHILD
151+
152+
Otherwise, returns WatchedEvent.NO_ZXID. Note that NO_ZXID is also
153+
returned by old servers that do not support this feature.
154+
141155
"""
142156

157+
NO_ZXID = -1
158+
143159

144160
class Callback(namedtuple("Callback", ("type", "func", "args"))):
145161
"""A callback that is handed to a handler for dispatch

kazoo/tests/test_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,24 @@ def test_invalid_remove_all_watch_values(self):
13051305
with pytest.raises(ValueError):
13061306
client.remove_all_watches("/a", 42)
13071307

1308+
def test_watch_zxid(self):
1309+
self._require_zk_version(3, 9)
1310+
nodepath = "/" + uuid.uuid4().hex
1311+
event = self.client.handler.event_object()
1312+
1313+
def w(watch_event):
1314+
assert watch_event.path == nodepath
1315+
assert watch_event.zxid > -1
1316+
event.set()
1317+
1318+
exists = self.client.exists(nodepath, watch=w)
1319+
assert exists is None
1320+
1321+
self.client.create(nodepath, ephemeral=True)
1322+
1323+
event.wait(1)
1324+
assert event.is_set() is True
1325+
13081326

13091327
class TestSSLClient(KazooTestCase):
13101328
def setUp(self):

0 commit comments

Comments
 (0)