Skip to content

Commit 050e018

Browse files
committed
1
1 parent d5834c6 commit 050e018

File tree

5 files changed

+296
-42
lines changed

5 files changed

+296
-42
lines changed

cassandra/cluster.py

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import atexit
2222
import datetime
23+
import threading
2324
from binascii import hexlify
2425
from collections import defaultdict
2526
from collections.abc import Mapping
@@ -72,7 +73,8 @@
7273
ExponentialReconnectionPolicy, HostDistance,
7374
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
7475
NoSpeculativeExecutionPolicy, DefaultLoadBalancingPolicy,
75-
NeverRetryPolicy)
76+
NeverRetryPolicy, ConstantReconnectionPolicy, ReconnectionPolicy,
77+
ShardReconnectionScope, ShardReconnectionScopeHost, NoDelayReconnectionPolicy)
7678
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
7779
HostConnectionPool, HostConnection,
7880
NoConnectionsAvailable)
@@ -742,6 +744,32 @@ def auth_provider(self, value):
742744

743745
self._auth_provider = value
744746

747+
_shard_reconnection_policy = None
748+
@property
749+
def shard_reconnection_policy(self):
750+
return self._shard_reconnection_policy
751+
752+
@shard_reconnection_policy.setter
753+
def shard_reconnection_policy(self, srp):
754+
if self._config_mode == _ConfigMode.PROFILES:
755+
raise ValueError(
756+
"Cannot set Cluster.shard_reconnection_policy while using Configuration Profiles. Set this in a profile instead.")
757+
self._shard_reconnection_policy = srp
758+
self._config_mode = _ConfigMode.LEGACY
759+
760+
_shard_reconnection_scope = None
761+
@property
762+
def shard_reconnection_scope(self):
763+
return self._shard_reconnection_scope
764+
765+
@shard_reconnection_scope.setter
766+
def shard_reconnection_scope(self, scope):
767+
if self._config_mode == _ConfigMode.PROFILES:
768+
raise ValueError(
769+
"Cannot set Cluster.shard_reconnection_scope while using Configuration Profiles. Set this in a profile instead.")
770+
self._shard_reconnection_scope = scope
771+
self._config_mode = _ConfigMode.LEGACY
772+
745773
_load_balancing_policy = None
746774
@property
747775
def load_balancing_policy(self):
@@ -1204,6 +1232,8 @@ def __init__(self,
12041232
shard_aware_options=None,
12051233
metadata_request_timeout=None,
12061234
column_encryption_policy=None,
1235+
shard_reconnection_policy=None,
1236+
shard_reconnection_scope=None,
12071237
):
12081238
"""
12091239
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
@@ -1309,6 +1339,24 @@ def __init__(self,
13091339
else:
13101340
self._load_balancing_policy = default_lbp_factory() # set internal attribute to avoid committing to legacy config mode
13111341

1342+
if shard_reconnection_scope is not None:
1343+
if isinstance(shard_reconnection_scope, type):
1344+
raise TypeError("shard_reconnection_scope should not be a class, it should be an instance of that class")
1345+
if not isinstance(shard_reconnection_policy, ShardReconnectionScope):
1346+
raise TypeError("load_balancing_policy should be an instance of class derived from ReconnectionPolicy")
1347+
self.shard_reconnection_scope = shard_reconnection_scope
1348+
else:
1349+
self._shard_reconnection_scope = ShardReconnectionScopeHost() # set internal attribute to avoid committing to legacy config mode
1350+
1351+
if shard_reconnection_policy is not None:
1352+
if isinstance(shard_reconnection_policy, type):
1353+
raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
1354+
if not isinstance(shard_reconnection_policy, ReconnectionPolicy):
1355+
raise TypeError("load_balancing_policy should be an instance of class derived from ReconnectionPolicy")
1356+
self.shard_reconnection_policy = shard_reconnection_policy
1357+
else:
1358+
self._shard_reconnection_policy = ConstantReconnectionPolicy(2, 0) # set internal attribute to avoid committing to legacy config mode
1359+
13121360
if reconnection_policy is not None:
13131361
if isinstance(reconnection_policy, type):
13141362
raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
@@ -2707,6 +2755,11 @@ def __init__(self, cluster, hosts, keyspace=None):
27072755
self._protocol_version = self.cluster.protocol_version
27082756

27092757
self.encoder = Encoder()
2758+
if isinstance(cluster.shard_reconnection_policy, NoDelayReconnectionPolicy):
2759+
self.shard_reconnection_scheduler = NoDelayShardReconnectionScheduler(self)
2760+
else:
2761+
self.shard_reconnection_scheduler = ShardReconnectionScheduler(
2762+
self, cluster, cluster.shard_reconnection_scope, cluster.shard_reconnection_policy)
27102763

27112764
# create connection pools in parallel
27122765
self._initial_connect_futures = set()
@@ -3546,6 +3599,141 @@ class UserTypeDoesNotExist(Exception):
35463599
pass
35473600

35483601

3602+
class ScopeBucket(object):
3603+
def __init__(self, session, shard_reconnection_policy):
3604+
self._items = []
3605+
self.last_run = None
3606+
self.session = session
3607+
self.policy = shard_reconnection_policy
3608+
self.lock = threading.Lock()
3609+
self.running = False
3610+
self.schedule = self.policy.new_schedule()
3611+
3612+
def add(self, method, *args, **kwargs):
3613+
with self.lock:
3614+
self._items.append([method, args, kwargs])
3615+
if not self.running:
3616+
self.running = True
3617+
self._schedule()
3618+
3619+
def _get_delay(self):
3620+
try:
3621+
return next(self.schedule)
3622+
except StopIteration:
3623+
self.schedule = self.policy.new_schedule()
3624+
return next(self.schedule)
3625+
3626+
def _schedule(self):
3627+
if self.session.is_shutdown:
3628+
return
3629+
delay = self._get_delay()
3630+
if delay:
3631+
self.session.cluster.scheduler.schedule(delay, self.run)
3632+
else:
3633+
self.session.submit(self.run)
3634+
3635+
def run(self):
3636+
if self.session.is_shutdown:
3637+
return
3638+
3639+
with self.lock:
3640+
try:
3641+
item = self._items.pop()
3642+
except IndexError:
3643+
self.running = False
3644+
return
3645+
3646+
method, args, kwargs = item
3647+
try:
3648+
method(*args, **kwargs)
3649+
finally:
3650+
self._schedule()
3651+
3652+
3653+
class ShardReconnectionSchedulerBase(object):
3654+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3655+
raise NotImplementedError()
3656+
3657+
def forced_schedule(self, host_id, shard_id, method, *args, **kwargs):
3658+
raise NotImplementedError()
3659+
3660+
3661+
class NoDelayShardReconnectionScheduler(ShardReconnectionSchedulerBase):
3662+
def __init__(self, session):
3663+
self.session = weakref.proxy(session)
3664+
self.already_scheduled = {}
3665+
3666+
def _execute(self, scheduled_key, method, *args, **kwargs):
3667+
try:
3668+
method(*args, **kwargs)
3669+
finally:
3670+
self.already_scheduled[scheduled_key] = False
3671+
3672+
def forced_schedule(self, host_id, shard_id, method, *args, **kwargs):
3673+
scheduled_key = f'{host_id}-{shard_id}'
3674+
self.already_scheduled[scheduled_key] = True
3675+
3676+
if not self.session.is_shutdown:
3677+
self.session.submit(self._execute, scheduled_key, method, *args, **kwargs)
3678+
3679+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3680+
scheduled_key = f'{host_id}-{shard_id}'
3681+
if self.already_scheduled.get(scheduled_key):
3682+
return
3683+
3684+
self.already_scheduled[scheduled_key] = True
3685+
if not self.session.is_shutdown:
3686+
self.session.submit(self._execute, scheduled_key, method, *args, **kwargs)
3687+
3688+
3689+
class ShardReconnectionScheduler(ShardReconnectionSchedulerBase):
3690+
def __init__(self, session, cluster, shard_reconnection_scope, shard_reconnection_policy):
3691+
self.already_scheduled = {}
3692+
self.scopes = {}
3693+
self.session = weakref.proxy(session)
3694+
self.cluster = weakref.proxy(cluster)
3695+
self.shard_reconnection_scope = shard_reconnection_scope
3696+
self.shard_reconnection_policy = shard_reconnection_policy
3697+
self.lock = threading.Lock()
3698+
3699+
def _execute(self, scheduled_key, method, *args, **kwargs):
3700+
try:
3701+
method(*args, **kwargs)
3702+
finally:
3703+
with self.lock:
3704+
self.already_scheduled[scheduled_key] = False
3705+
3706+
def forced_schedule(self, host_id, shard_id, method, *args, **kwargs):
3707+
scope_id = self.shard_reconnection_scope.get_hash(self.cluster, host_id, shard_id)
3708+
scheduled_key = f'{host_id}-{shard_id}'
3709+
3710+
with self.lock:
3711+
self.already_scheduled[scheduled_key] = True
3712+
3713+
scope_info = self.scopes.get(scope_id, 0)
3714+
if not scope_info:
3715+
scope_info = ScopeBucket(self.session, self.shard_reconnection_policy)
3716+
self.scopes[scope_id] = scope_info
3717+
scope_info.add(self._execute, scheduled_key, method,*args, **kwargs)
3718+
return True
3719+
3720+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3721+
scope_id = self.shard_reconnection_scope.get_hash(self.cluster, host_id, shard_id)
3722+
scheduled_key = f'{host_id}-{shard_id}'
3723+
3724+
with self.lock:
3725+
if self.already_scheduled.get(scheduled_key):
3726+
return False
3727+
self.already_scheduled[scheduled_key] = True
3728+
3729+
scope_info = self.scopes.get(scope_id, 0)
3730+
if not scope_info:
3731+
scope_info = ScopeBucket(self.session, self.shard_reconnection_policy)
3732+
self.scopes[scope_id] = scope_info
3733+
scope_info.add(self._execute, scheduled_key, method,*args, **kwargs)
3734+
return True
3735+
3736+
35493737
class _ControlReconnectionHandler(_ReconnectionHandler):
35503738
"""
35513739
Internal
@@ -4432,6 +4620,9 @@ def shutdown(self):
44324620
self._queue.put_nowait((0, 0, None))
44334621
self.join()
44344622

4623+
def empty(self):
4624+
return len(self._scheduled_tasks) == 0 and self._queue.empty()
4625+
44354626
def schedule(self, delay, fn, *args, **kwargs):
44364627
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
44374628

cassandra/policies.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import random
15+
import threading
16+
import time
17+
import weakref
1518

1619
from collections import namedtuple
1720
from functools import lru_cache
@@ -778,6 +781,14 @@ def new_schedule(self):
778781
raise NotImplementedError()
779782

780783

784+
class NoDelayReconnectionPolicy(ReconnectionPolicy):
785+
"""
786+
A :class:`.ReconnectionPolicy` subclass which does not sleep.
787+
"""
788+
def new_schedule(self):
789+
return repeat(0)
790+
791+
781792
class ConstantReconnectionPolicy(ReconnectionPolicy):
782793
"""
783794
A :class:`.ReconnectionPolicy` subclass which sleeps for a fixed delay
@@ -864,6 +875,26 @@ def _add_jitter(self, value):
864875
return min(max(self.base_delay, delay), self.max_delay)
865876

866877

878+
class ShardReconnectionScope(object):
879+
def get_hash(self, cluster, host_id, shard_id):
880+
raise NotImplementedError()
881+
882+
883+
class ShardReconnectionScopeCluster(ShardReconnectionScope):
884+
def get_hash(self, cluster, host_id, shard_id):
885+
return id(cluster)
886+
887+
888+
class ShardReconnectionScopeHost(ShardReconnectionScope):
889+
def get_hash(self, cluster, host_id, shard_id):
890+
return hash(host_id)
891+
892+
893+
class ShardReconnectionScopeShard(ShardReconnectionScope):
894+
def get_hash(self, cluster, host_id, shard_id):
895+
return hash((host_id, shard_id))
896+
897+
867898
class RetryPolicy(object):
868899
"""
869900
A policy that describes whether to retry, rethrow, or ignore coordinator

cassandra/pool.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,6 @@ def __init__(self, host, host_distance, session):
402402
# this is used in conjunction with the connection streams. Not using the connection lock because the connection can be replaced in the lifetime of the pool.
403403
self._stream_available_condition = Condition(Lock())
404404
self._is_replacing = False
405-
self._connecting = set()
406405
self._connections = {}
407406
self._pending_connections = []
408407
# A pool of additional connections which are not used but affect how Scylla
@@ -418,7 +417,6 @@ def __init__(self, host, host_distance, session):
418417
# and are waiting until all requests time out or complete
419418
# so that we can dispose of them.
420419
self._trash = set()
421-
self._shard_connections_futures = []
422420
self.advanced_shardaware_block_until = 0
423421

424422
if host_distance == HostDistance.IGNORED:
@@ -483,25 +481,25 @@ def _get_connection_for_routing_key(self, routing_key=None, keyspace=None, table
483481
self.host,
484482
routing_key
485483
)
486-
if conn.orphaned_threshold_reached and shard_id not in self._connecting:
484+
if conn.orphaned_threshold_reached:
487485
# The connection has met its orphaned stream ID limit
488486
# and needs to be replaced. Start opening a connection
489487
# to the same shard and replace when it is opened.
490-
self._connecting.add(shard_id)
491-
self._session.submit(self._open_connection_to_missing_shard, shard_id)
488+
self._session.shard_reconnection_scheduler.schedule(
489+
self.host.host_id, shard_id, self._open_connection_to_missing_shard, shard_id)
492490
log.debug(
493-
"Connection to shard_id=%i reached orphaned stream limit, replacing on host %s (%s/%i)",
491+
"Scheduling Connection to shard_id=%i reached orphaned stream limit, replacing on host %s (%s/%i)",
494492
shard_id,
495493
self.host,
496494
len(self._connections.keys()),
497495
self.host.sharding_info.shards_count
498496
)
499-
elif shard_id not in self._connecting:
497+
else:
500498
# rate controlled optimistic attempt to connect to a missing shard
501-
self._connecting.add(shard_id)
502-
self._session.submit(self._open_connection_to_missing_shard, shard_id)
499+
self._session.shard_reconnection_scheduler.schedule(
500+
self.host.host_id, shard_id, self._open_connection_to_missing_shard, shard_id)
503501
log.debug(
504-
"Trying to connect to missing shard_id=%i on host %s (%s/%i)",
502+
"Scheduling connection to missing shard_id=%i on host %s (%s/%i)",
505503
shard_id,
506504
self.host,
507505
len(self._connections.keys()),
@@ -609,8 +607,8 @@ def _replace(self, connection):
609607
if connection.features.shard_id in self._connections.keys():
610608
del self._connections[connection.features.shard_id]
611609
if self.host.sharding_info and not self._session.cluster.shard_aware_options.disable:
612-
self._connecting.add(connection.features.shard_id)
613-
self._session.submit(self._open_connection_to_missing_shard, connection.features.shard_id)
610+
self._session.shard_reconnection_scheduler.schedule(
611+
self.host.host_id, connection.features.shard_id, self._open_connection_to_missing_shard, connection.features.shard_id)
614612
else:
615613
connection = self._session.cluster.connection_factory(self.host.endpoint,
616614
on_orphaned_stream_released=self.on_orphaned_stream_released)
@@ -635,9 +633,6 @@ def shutdown(self):
635633
with self._stream_available_condition:
636634
self._stream_available_condition.notify_all()
637635

638-
for future in self._shard_connections_futures:
639-
future.cancel()
640-
641636
connections_to_close = self._connections.copy()
642637
pending_connections_to_close = self._pending_connections.copy()
643638
self._connections.clear()
@@ -843,7 +838,6 @@ def _open_connection_to_missing_shard(self, shard_id):
843838
self._excess_connections.add(conn)
844839
if close_connection:
845840
conn.close()
846-
self._connecting.discard(shard_id)
847841

848842
def _open_connections_for_all_shards(self, skip_shard_id=None):
849843
"""
@@ -856,10 +850,8 @@ def _open_connections_for_all_shards(self, skip_shard_id=None):
856850
for shard_id in range(self.host.sharding_info.shards_count):
857851
if skip_shard_id is not None and skip_shard_id == shard_id:
858852
continue
859-
future = self._session.submit(self._open_connection_to_missing_shard, shard_id)
860-
if isinstance(future, Future):
861-
self._connecting.add(shard_id)
862-
self._shard_connections_futures.append(future)
853+
self._session.shard_reconnection_scheduler.schedule(
854+
self.host.host_id, shard_id, self._open_connection_to_missing_shard, shard_id)
863855

864856
trash_conns = None
865857
with self._lock:

0 commit comments

Comments
 (0)