|
1 | | -from collections import namedtuple |
| 1 | +from collections import namedtuple, deque |
2 | 2 | import os |
3 | 3 | import threading |
4 | 4 | import time |
5 | 5 | import uuid |
6 | 6 | import struct |
| 7 | +import sys |
7 | 8 |
|
8 | 9 | from nose import SkipTest |
9 | 10 | from nose.tools import eq_ |
@@ -304,3 +305,73 @@ def testit(): |
304 | 305 | client.remove_listener(listen) |
305 | 306 | self.cluster[1].run() |
306 | 307 | self.cluster[2].run() |
| 308 | + |
| 309 | + |
| 310 | +class TestUnorderedXids(KazooTestCase): |
| 311 | + |
| 312 | + def setUp(self): |
| 313 | + super(TestUnorderedXids, self).setUp() |
| 314 | + |
| 315 | + self.connection = self.client._connection |
| 316 | + self.connection_routine = self.connection._connection_routine |
| 317 | + |
| 318 | + self._pending = self.client._pending |
| 319 | + self.client._pending = _naughty_deque() |
| 320 | + |
| 321 | + def tearDown(self): |
| 322 | + self.client._pending = self._pending |
| 323 | + super(TestUnorderedXids, self).tearDown() |
| 324 | + |
| 325 | + def _get_client(self, **kwargs): |
| 326 | + # overrides for patching zk_loop |
| 327 | + c = KazooTestCase._get_client(self, **kwargs) |
| 328 | + self._zk_loop = c._connection.zk_loop |
| 329 | + self._zk_loop_errors = [] |
| 330 | + c._connection.zk_loop = self._zk_loop_func |
| 331 | + return c |
| 332 | + |
| 333 | + def _zk_loop_func(self, *args, **kwargs): |
| 334 | + # patched zk_loop which will catch and collect all RuntimeError |
| 335 | + try: |
| 336 | + self._zk_loop(*args, **kwargs) |
| 337 | + except RuntimeError as e: |
| 338 | + self._zk_loop_errors.append(e) |
| 339 | + |
| 340 | + def test_xids_mismatch(self): |
| 341 | + from kazoo.protocol.states import KeeperState |
| 342 | + |
| 343 | + ev = threading.Event() |
| 344 | + error_stack = [] |
| 345 | + |
| 346 | + @self.client.add_listener |
| 347 | + def listen(state): |
| 348 | + if self.client.client_state == KeeperState.CLOSED: |
| 349 | + ev.set() |
| 350 | + |
| 351 | + def log_exception(*args): |
| 352 | + error_stack.append((args, sys.exc_info())) |
| 353 | + |
| 354 | + self.connection.logger.exception = log_exception |
| 355 | + |
| 356 | + ev.clear() |
| 357 | + self.assertRaises(RuntimeError, self.client.get_children, '/') |
| 358 | + |
| 359 | + ev.wait() |
| 360 | + eq_(self.client.connected, False) |
| 361 | + eq_(self.client.state, 'LOST') |
| 362 | + eq_(self.client.client_state, KeeperState.CLOSED) |
| 363 | + |
| 364 | + args, exc_info = error_stack[-1] |
| 365 | + eq_(args, ('Unhandled exception in connection loop',)) |
| 366 | + eq_(exc_info[0], RuntimeError) |
| 367 | + |
| 368 | + self.client.handler.sleep_func(0.2) |
| 369 | + assert not self.connection_routine.is_alive() |
| 370 | + assert len(self._zk_loop_errors) == 1 |
| 371 | + assert self._zk_loop_errors[0] == exc_info[1] |
| 372 | + |
| 373 | + |
| 374 | +class _naughty_deque(deque): |
| 375 | + def append(self, s): |
| 376 | + request, async_object, xid = s |
| 377 | + return deque.append(self, (request, async_object, xid + 1)) # +1s |
0 commit comments