Skip to content

Commit a636d7a

Browse files
authored
fix(core): allow requests to be queued in CONNECTING state (#374) (#588)
With this patch, requests issued while the client is in the 'CONNECTING' state get queued instead of raising a misleading 'SessionExpiredError'. This fixes #374, and brings Kazoo more in line with the Java and C clients. See the 'kazoo.client.KazooClient.state' documentation as well as these discussions for more details: #570 (comment) #583 (comment)
1 parent 933b38b commit a636d7a

File tree

3 files changed

+127
-9
lines changed

3 files changed

+127
-9
lines changed

docs/api/client.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,27 @@ Public API
3030
A :class:`~kazoo.protocol.states.KazooState` attribute indicating
3131
the current higher-level connection state.
3232

33+
.. note::
34+
35+
Up to version 2.6.1, requests could only be submitted
36+
in the CONNECTED state. Requests submitted while
37+
SUSPENDED would immediately raise a
38+
:exc:`~kazoo.exceptions.SessionExpiredError`. This
39+
was problematic, as sessions are usually recovered on
40+
reconnect.
41+
42+
Kazoo now simply queues requests submitted in the
43+
SUSPENDED state, expecting a recovery. This matches
44+
the behavior of the Java and C clients.
45+
46+
Requests submitted in a LOST state still fail
47+
immediately with the corresponding exception.
48+
49+
See:
50+
51+
* https://github.com/python-zk/kazoo/issues/374 and
52+
* https://github.com/python-zk/kazoo/pull/570
53+
3354
.. autoclass:: TransactionRequest
3455
:members:
3556
:member-order: bysource

kazoo/client.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -584,11 +584,11 @@ def _safe_close(self):
584584
"and wouldn't close after %s seconds" % timeout)
585585

586586
def _call(self, request, async_object):
587-
"""Ensure there's an active connection and put the request in
588-
the queue if there is.
587+
"""Ensure the client is in CONNECTED or SUSPENDED state and put the
588+
request in the queue if it is.
589589
590590
Returns False if the call short circuits due to AUTH_FAILED,
591-
CLOSED, EXPIRED_SESSION or CONNECTING state.
591+
CLOSED, or EXPIRED_SESSION state.
592592
593593
"""
594594

@@ -599,8 +599,7 @@ def _call(self, request, async_object):
599599
async_object.set_exception(ConnectionClosedError(
600600
"Connection has been closed"))
601601
return False
602-
elif self._state in (KeeperState.EXPIRED_SESSION,
603-
KeeperState.CONNECTING):
602+
elif self._state == KeeperState.EXPIRED_SESSION:
604603
async_object.set_exception(SessionExpiredError())
605604
return False
606605

kazoo/tests/test_client.py

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import mock
99
from mock import patch
1010
from nose import SkipTest
11-
from nose.tools import eq_
11+
from nose.tools import eq_, ok_, assert_not_equal
1212
from nose.tools import raises
1313

1414
from kazoo.testing import KazooTestCase
@@ -492,9 +492,6 @@ def test_create_on_broken_connection(self):
492492
self.assertRaises(AuthFailedError, client.create,
493493
'/closedpath', b'bar')
494494

495-
client._state = KeeperState.CONNECTING
496-
self.assertRaises(SessionExpiredError, client.create,
497-
'/closedpath', b'bar')
498495
client.stop()
499496
client.close()
500497

@@ -982,6 +979,107 @@ def test_update_host_list(self):
982979
finally:
983980
self.cluster[0].run()
984981

982+
# utility for test_request_queuing*
983+
def _make_request_queuing_client(self):
984+
from kazoo.client import KazooClient
985+
server = self.cluster[0]
986+
handler = self._makeOne()
987+
# create a client with only one server in its list, and
988+
# infinite retries
989+
client = KazooClient(
990+
hosts=server.address + self.client.chroot,
991+
handler=handler,
992+
connection_retry=dict(
993+
max_tries=-1,
994+
delay=0.1,
995+
backoff=1,
996+
max_jitter=0.0,
997+
sleep_func=handler.sleep_func
998+
)
999+
)
1000+
1001+
return client, server
1002+
1003+
# utility for test_request_queuing*
1004+
def _request_queuing_common(self, client, server, path, expire_session):
1005+
ev_suspended = client.handler.event_object()
1006+
ev_connected = client.handler.event_object()
1007+
1008+
def listener(state):
1009+
if state == KazooState.SUSPENDED:
1010+
ev_suspended.set()
1011+
elif state == KazooState.CONNECTED:
1012+
ev_connected.set()
1013+
client.add_listener(listener)
1014+
1015+
# wait for the client to connect
1016+
client.start()
1017+
1018+
try:
1019+
# force the client to suspend
1020+
server.stop()
1021+
1022+
ev_suspended.wait(5)
1023+
ok_(ev_suspended.is_set())
1024+
ev_connected.clear()
1025+
1026+
# submit a request, expecting it to be queued
1027+
result = client.create_async(path)
1028+
assert_not_equal(len(client._queue), 0)
1029+
eq_(result.ready(), False)
1030+
eq_(client.state, KazooState.SUSPENDED)
1031+
1032+
# optionally cause a SessionExpiredError to occur by
1033+
# mangling the first byte of the session password.
1034+
if expire_session:
1035+
b0 = b'\x00'
1036+
if client._session_passwd[0] == 0:
1037+
b0 = b'\xff'
1038+
client._session_passwd = b0 + client._session_passwd[1:]
1039+
finally:
1040+
server.run()
1041+
1042+
# wait for the client to reconnect (either with a recovered
1043+
# session, or with a new one if expire_session was set)
1044+
ev_connected.wait(5)
1045+
ok_(ev_connected.is_set())
1046+
1047+
return result
1048+
1049+
def test_request_queuing_session_recovered(self):
1050+
path = "/" + uuid.uuid4().hex
1051+
client, server = self._make_request_queuing_client()
1052+
1053+
try:
1054+
result = self._request_queuing_common(
1055+
client=client,
1056+
server=server,
1057+
path=path,
1058+
expire_session=False
1059+
)
1060+
1061+
eq_(result.get(), path)
1062+
assert_not_equal(client.exists(path), None)
1063+
finally:
1064+
client.stop()
1065+
1066+
def test_request_queuing_session_expired(self):
1067+
path = "/" + uuid.uuid4().hex
1068+
client, server = self._make_request_queuing_client()
1069+
1070+
try:
1071+
result = self._request_queuing_common(
1072+
client=client,
1073+
server=server,
1074+
path=path,
1075+
expire_session=True
1076+
)
1077+
1078+
eq_(len(client._queue), 0)
1079+
self.assertRaises(SessionExpiredError, result.get)
1080+
finally:
1081+
client.stop()
1082+
9851083

9861084
dummy_dict = {
9871085
'aversion': 1, 'ctime': 0, 'cversion': 1,

0 commit comments

Comments
 (0)