Skip to content

Commit b4967d1

Browse files
authored
Merge pull request #450 from python-zk/feat/revert-pr-305
fix(core): revert PR #305 SetWatches which caused RuntimeError
2 parents 1956bab + a7b4539 commit b4967d1

File tree

4 files changed

+1
-84
lines changed

4 files changed

+1
-84
lines changed

kazoo/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,7 @@ def _session_callback(self, state):
477477
self._live.clear()
478478
self._notify_pending(state)
479479
self._make_state_change(KazooState.SUSPENDED)
480-
if state != KeeperState.CONNECTING:
481-
self._reset_watchers()
480+
self._reset_watchers()
482481

483482
def _notify_pending(self, state):
484483
"""Used to clear a pending response queue and request queue

kazoo/protocol/connection.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
Ping,
2727
PingInstance,
2828
ReplyHeader,
29-
SetWatches,
3029
Transaction,
3130
Watch,
3231
int_struct
@@ -60,7 +59,6 @@
6059
WATCH_XID = -1
6160
PING_XID = -2
6261
AUTH_XID = -4
63-
SET_WATCHES_XID = -8
6462

6563
CLOSE_RESPONSE = Close.type
6664

@@ -410,8 +408,6 @@ def _read_socket(self, read_timeout):
410408
async_object.set(True)
411409
elif header.xid == WATCH_XID:
412410
self._read_watch_event(buffer, offset)
413-
elif header.xid == SET_WATCHES_XID:
414-
self.logger.log(BLATHER, 'Received SetWatches reply')
415411
else:
416412
self.logger.log(BLATHER, 'Reading for header %r', header)
417413

@@ -444,8 +440,6 @@ def _send_request(self, read_timeout, connect_timeout):
444440
# Special case for auth packets
445441
if request.type == Auth.type:
446442
xid = AUTH_XID
447-
elif request.type == SetWatches.type:
448-
xid = SET_WATCHES_XID
449443
else:
450444
self._xid = (self._xid % 2147483647) + 1
451445
xid = self._xid
@@ -619,11 +613,6 @@ def _connect(self, host, port):
619613
client._session_id or 0, client._session_passwd,
620614
client.read_only)
621615

622-
# save the client's last_zxid before it gets overwritten by the
623-
# server's.
624-
# we'll need this to reset watches via SetWatches further below.
625-
last_zxid = client.last_zxid
626-
627616
connect_result, zxid = self._invoke(
628617
client._session_timeout / 1000.0, connect)
629618

@@ -663,15 +652,4 @@ def _connect(self, host, port):
663652
if zxid:
664653
client.last_zxid = zxid
665654

666-
# TODO: separate exist from data watches
667-
if client._data_watchers or client._child_watchers.keys():
668-
sw = SetWatches(last_zxid,
669-
client._data_watchers.keys(),
670-
client._data_watchers.keys(),
671-
client._child_watchers.keys())
672-
zxid = self._invoke(connect_timeout / 1000.0, sw,
673-
xid=SET_WATCHES_XID)
674-
if zxid:
675-
client.last_zxid = zxid
676-
677655
return read_timeout, connect_timeout

kazoo/protocol/serialization.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# Struct objects with formats compiled
1313
bool_struct = struct.Struct('B')
1414
int_struct = struct.Struct('!i')
15-
long_struct = struct.Struct('!q')
1615
int_int_struct = struct.Struct('!ii')
1716
int_int_long_struct = struct.Struct('!iiq')
1817

@@ -52,14 +51,6 @@ def write_string(bytes):
5251
return int_struct.pack(len(utf8_str)) + utf8_str
5352

5453

55-
def write_string_vector(v):
56-
b = bytearray()
57-
b.extend(int_struct.pack(len(v)))
58-
for s in v:
59-
b.extend(write_string(s))
60-
return b
61-
62-
6354
def write_buffer(bytes):
6455
if bytes is None:
6556
return int_struct.pack(-1)
@@ -386,20 +377,6 @@ def serialize(self):
386377
write_string(self.auth))
387378

388379

389-
class SetWatches(
390-
namedtuple('SetWatches',
391-
'relativeZxid, dataWatches, existWatches, childWatches')):
392-
type = 101
393-
394-
def serialize(self):
395-
b = bytearray()
396-
b.extend(long_struct.pack(self.relativeZxid))
397-
b.extend(write_string_vector(self.dataWatches))
398-
b.extend(write_string_vector(self.existWatches))
399-
b.extend(write_string_vector(self.childWatches))
400-
return b
401-
402-
403380
class Watch(namedtuple('Watch', 'type state path')):
404381
@classmethod
405382
def deserialize(cls, bytes, offset):

kazoo/tests/test_client.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -963,43 +963,6 @@ def test_update_host_list(self):
963963
finally:
964964
self.cluster[0].run()
965965

966-
def test_set_watches_on_reconnect(self):
967-
client = self.client
968-
watch_event = client.handler.event_object()
969-
970-
client.create("/tacos")
971-
972-
# set the watch
973-
def w(we):
974-
eq_(we.path, "/tacos")
975-
watch_event.set()
976-
977-
client.get_children("/tacos", watch=w)
978-
979-
# force a reconnect
980-
states = []
981-
rc = client.handler.event_object()
982-
983-
@client.add_listener
984-
def listener(state):
985-
if state == KazooState.CONNECTED:
986-
states.append(state)
987-
rc.set()
988-
989-
client._connection._socket.shutdown(socket.SHUT_RDWR)
990-
991-
rc.wait(10)
992-
eq_(states, [KazooState.CONNECTED])
993-
994-
# watches should still be there
995-
self.assertTrue(len(client._child_watchers) == 1)
996-
997-
# ... and they should fire
998-
client.create("/tacos/hello_", b"", ephemeral=True, sequence=True)
999-
1000-
watch_event.wait(1)
1001-
self.assertTrue(watch_event.is_set())
1002-
1003966

1004967
dummy_dict = {
1005968
'aversion': 1, 'ctime': 0, 'cversion': 1,

0 commit comments

Comments
 (0)