Skip to content

Commit f051fea

Browse files
committed
Make guard to have only one pending reconnect
ControlConnection.reconnect could be calld from many places in parallel. When it happens you get to see streak for reconnects happning one by one. This commit adds guard that makes sure there is only one pending reconnect.
1 parent 23a6e85 commit f051fea

File tree

3 files changed

+46
-0
lines changed

3 files changed

+46
-0
lines changed

cassandra/cluster.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3561,6 +3561,9 @@ def try_reconnect(self):
35613561
def on_reconnection(self, connection):
35623562
self.control_connection._set_new_connection(connection)
35633563

3564+
def on_done(self, success: bool):
3565+
self.control_connection._reconnection_pending = False
3566+
35643567
def on_exception(self, exc, next_delay):
35653568
# TODO only overridden to add logging, so add logging
35663569
if isinstance(exc, AuthenticationFailed):
@@ -3662,6 +3665,7 @@ def __init__(self, cluster, timeout,
36623665

36633666
self._reconnection_handler = None
36643667
self._reconnection_lock = RLock()
3668+
self._reconnection_pending = False
36653669

36663670
self._event_schedule_times = {}
36673671

@@ -3681,6 +3685,7 @@ def _set_new_connection(self, conn):
36813685
with self._lock:
36823686
old = self._connection
36833687
self._connection = conn
3688+
self._reconnection_pending = False
36843689

36853690
if old:
36863691
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
@@ -3818,6 +3823,10 @@ def reconnect(self):
38183823
if self._is_shutdown:
38193824
return
38203825

3826+
if self._reconnection_pending:
3827+
return
3828+
self._reconnection_pending = True
3829+
38213830
self._submit(self._reconnect)
38223831

38233832
def _reconnect(self):
@@ -3855,6 +3864,7 @@ def _get_and_set_reconnection_handler(self, new_handler):
38553864
with self._reconnection_lock:
38563865
old = self._reconnection_handler
38573866
self._reconnection_handler = new_handler
3867+
self._reconnection_pending = False
38583868
return old
38593869

38603870
def _submit(self, *args, **kwargs):
@@ -4216,6 +4226,9 @@ def wait_for_schema_agreement(self, target_connection=None, preloaded_results=No
42164226
# from the response type and one from the pushed notification. Holding
42174227
# a lock is just a simple way to cut down on the number of schema queries
42184228
# we'll make.
4229+
4230+
4231+
42194232
with self._schema_agreement_lock:
42204233
if self._is_shutdown:
42214234
return

cassandra/pool.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ def run(self):
293293
# call on_exception for logging purposes even if next_delay is None
294294
if self.on_exception(exc, next_delay):
295295
if next_delay is None:
296+
self.on_done(False)
296297
log.warning(
297298
"Will not continue to retry reconnection attempts "
298299
"due to an exhausted retry schedule")
@@ -301,6 +302,7 @@ def run(self):
301302
else:
302303
if not self._cancelled:
303304
self.on_reconnection(conn)
305+
self.on_done(False)
304306
self.callback(*(self.callback_args), **(self.callback_kwargs))
305307
finally:
306308
if conn:
@@ -324,6 +326,13 @@ def on_reconnection(self, connection):
324326
"""
325327
pass
326328

329+
def on_done(self, success: bool):
330+
"""
331+
Called when either new connection is successfully opened or
332+
when failed to reconnect and no more reconnections scheduled.
333+
"""
334+
pass
335+
327336
def on_exception(self, exc, next_delay):
328337
"""
329338
Called when an Exception is raised when trying to connect.

tests/integration/standard/test_cluster.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,30 @@ def test_invalid_protocol_negotation(self):
326326
cluster.connect()
327327
cluster.shutdown()
328328

329+
def test_control_connection_reconnect(self):
330+
"""
331+
Ensure clusters that connect on a keyspace, do
332+
"""
333+
cassandra.cluster.log.setLevel(logging.DEBUG)
334+
335+
cluster = TestCluster()
336+
_ = cluster.connect()
337+
338+
cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal)
339+
340+
cluster.control_connection.reconnect()
341+
cluster.control_connection.reconnect()
342+
cluster.control_connection.reconnect()
343+
cluster.control_connection.reconnect()
344+
345+
while cluster.control_connection._reconnection_pending:
346+
time.sleep(0.1)
347+
348+
self.assertFalse(cluster.control_connection._connection.is_closed)
349+
self.assertFalse(cluster.control_connection._connection.is_defunct)
350+
self.assertTrue(cluster.control_connection.refresh_schema())
351+
self.assertEqual(1, len(cluster.control_connection._reconnect_internal.mock_calls))
352+
329353
def test_connect_on_keyspace(self):
330354
"""
331355
Ensure clusters that connect on a keyspace, do

0 commit comments

Comments
 (0)