Skip to content

Commit 18b0090

Browse files
Merge pull request scylladb#29 from riptano/long-python-1181
Cancel reconnections when node removed PYTHON-1181
2 parents df6e69e + d0241ad commit 18b0090

File tree

4 files changed

+55
-1
lines changed

4 files changed

+55
-1
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Bug Fixes
1919
---------
2020
* re-raising the CQLEngineException will fail on Python 3 (PYTHON-1166)
2121
* asyncio message chunks can be processed discontinuously (PYTHON-1185)
22+
* Reconnect attempts persist after downed node removed from peers (PYTHON-1181)
2223

2324
Others
2425
------

cassandra/cluster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,6 +2029,10 @@ def on_remove(self, host):
20292029
listener.on_remove(host)
20302030
self.control_connection.on_remove(host)
20312031

2032+
reconnection_handler = host.get_and_set_reconnection_handler(None)
2033+
if reconnection_handler:
2034+
reconnection_handler.cancel()
2035+
20322036
def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False):
20332037
is_down = host.signal_connection_failure(connection_exc)
20342038
if is_down:
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from unittest import TestCase
2+
3+
from cassandra.cluster import Cluster
4+
from cassandra.policies import HostStateListener
5+
from tests.integration import PROTOCOL_VERSION, get_node, use_cluster, local
6+
from tests.integration.long.utils import decommission
7+
from tests.integration.util import wait_until
8+
9+
10+
class StateListener(HostStateListener):
11+
def __init__(self):
12+
self.downed_host = None
13+
self.removed_host = None
14+
15+
def on_remove(self, host):
16+
self.removed_host = host
17+
18+
def on_up(self, host):
19+
pass
20+
21+
def on_down(self, host):
22+
self.downed_host = host
23+
24+
def on_add(self, host):
25+
pass
26+
27+
28+
class TopologyChangeTests(TestCase):
29+
@local
30+
def test_removed_node_stops_reconnecting(self):
31+
""" Ensure we stop reconnecting after a node is removed. PYTHON-1181 """
32+
use_cluster("test_down_then_removed", [3], start=True)
33+
34+
state_listener = StateListener()
35+
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
36+
self.addCleanup(cluster.shutdown)
37+
cluster.register_listener(state_listener)
38+
session = cluster.connect(wait_for_all_pools=True)
39+
40+
get_node(3).nodetool("disablebinary")
41+
42+
wait_until(condition=lambda: state_listener.downed_host is not None, delay=2, max_attempts=50)
43+
self.assertTrue(state_listener.downed_host.is_currently_reconnecting())
44+
45+
decommission(3)
46+
47+
wait_until(condition=lambda: state_listener.removed_host is not None, delay=2, max_attempts=50)
48+
self.assertIs(state_listener.downed_host, state_listener.removed_host) # Just a sanity check
49+
self.assertFalse(state_listener.removed_host.is_currently_reconnecting())

tests/integration/long/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def force_stop(node):
9393

9494

9595
def decommission(node):
96-
if DSE_VERSION >= Version("5.1") or CASSANDRA_VERSION >= Version("4.0"):
96+
if (DSE_VERSION and DSE_VERSION >= Version("5.1")) or CASSANDRA_VERSION >= Version("4.0"):
9797
# CASSANDRA-12510
9898
get_node(node).decommission(force=True)
9999
else:

0 commit comments

Comments
 (0)