Skip to content

Commit b15f67e

Browse files
committed
1
1 parent c587c97 commit b15f67e

File tree

3 files changed

+186
-2
lines changed

3 files changed

+186
-2
lines changed

cassandra/cluster.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import atexit
2222
import datetime
23+
import threading
2324
from binascii import hexlify
2425
from collections import defaultdict
2526
from collections.abc import Mapping
@@ -72,7 +73,8 @@
7273
ExponentialReconnectionPolicy, HostDistance,
7374
RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
7475
NoSpeculativeExecutionPolicy, DefaultLoadBalancingPolicy,
75-
NeverRetryPolicy)
76+
NeverRetryPolicy, ConstantReconnectionPolicy, ReconnectionPolicy,
77+
ShardReconnectionScope, ShardReconnectionScopeHost, NoDelayReconnectionPolicy)
7678
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
7779
HostConnectionPool, HostConnection,
7880
NoConnectionsAvailable)
@@ -742,6 +744,32 @@ def auth_provider(self, value):
742744

743745
self._auth_provider = value
744746

747+
_shard_reconnection_policy = None
748+
@property
749+
def shard_reconnection_policy(self):
750+
return self._shard_reconnection_policy
751+
752+
@shard_reconnection_policy.setter
753+
def shard_reconnection_policy(self, srp):
754+
if self._config_mode == _ConfigMode.PROFILES:
755+
raise ValueError(
756+
"Cannot set Cluster.shard_reconnection_policy while using Configuration Profiles. Set this in a profile instead.")
757+
self._shard_reconnection_policy = srp
758+
self._config_mode = _ConfigMode.LEGACY
759+
760+
_shard_reconnection_scope = None
761+
@property
762+
def shard_reconnection_scope(self):
763+
return self._shard_reconnection_scope
764+
765+
@shard_reconnection_scope.setter
766+
def shard_reconnection_scope(self, scope):
767+
if self._config_mode == _ConfigMode.PROFILES:
768+
raise ValueError(
769+
"Cannot set Cluster.shard_reconnection_scope while using Configuration Profiles. Set this in a profile instead.")
770+
self._shard_reconnection_scope = scope
771+
self._config_mode = _ConfigMode.LEGACY
772+
745773
_load_balancing_policy = None
746774
@property
747775
def load_balancing_policy(self):
@@ -1204,6 +1232,8 @@ def __init__(self,
12041232
shard_aware_options=None,
12051233
metadata_request_timeout=None,
12061234
column_encryption_policy=None,
1235+
shard_reconnection_policy=None,
1236+
shard_reconnection_scope=None,
12071237
):
12081238
"""
12091239
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
@@ -1309,6 +1339,24 @@ def __init__(self,
13091339
else:
13101340
self._load_balancing_policy = default_lbp_factory() # set internal attribute to avoid committing to legacy config mode
13111341

1342+
if shard_reconnection_scope is not None:
1343+
if isinstance(shard_reconnection_scope, type):
1344+
raise TypeError("shard_reconnection_scope should not be a class, it should be an instance of that class")
1345+
if not isinstance(shard_reconnection_policy, ShardReconnectionScope):
1346+
raise TypeError("load_balancing_policy should be an instance of class derived from ReconnectionPolicy")
1347+
self.shard_reconnection_scope = shard_reconnection_scope
1348+
else:
1349+
self._shard_reconnection_scope = ShardReconnectionScopeHost() # set internal attribute to avoid committing to legacy config mode
1350+
1351+
if shard_reconnection_policy is not None:
1352+
if isinstance(shard_reconnection_policy, type):
1353+
raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
1354+
if not isinstance(shard_reconnection_policy, ReconnectionPolicy):
1355+
raise TypeError("load_balancing_policy should be an instance of class derived from ReconnectionPolicy")
1356+
self.shard_reconnection_policy = shard_reconnection_policy
1357+
else:
1358+
self._shard_reconnection_policy = ConstantReconnectionPolicy(2, 0) # set internal attribute to avoid committing to legacy config mode
1359+
13121360
if reconnection_policy is not None:
13131361
if isinstance(reconnection_policy, type):
13141362
raise TypeError("reconnection_policy should not be a class, it should be an instance of that class")
@@ -2707,6 +2755,11 @@ def __init__(self, cluster, hosts, keyspace=None):
27072755
self._protocol_version = self.cluster.protocol_version
27082756

27092757
self.encoder = Encoder()
2758+
if isinstance(cluster.shard_reconnection_policy, NoDelayReconnectionPolicy):
2759+
self.shard_reconnection_scheduler = NoDelayShardReconnectionScheduler(self)
2760+
else:
2761+
self.shard_reconnection_scheduler = ShardReconnectionScheduler(
2762+
self, cluster, cluster.shard_reconnection_scope, cluster.shard_reconnection_policy)
27102763

27112764
# create connection pools in parallel
27122765
self._initial_connect_futures = set()
@@ -3546,6 +3599,105 @@ class UserTypeDoesNotExist(Exception):
35463599
pass
35473600

35483601

3602+
class ScopeBucket(object):
3603+
def __init__(self, session, shard_reconnection_policy):
3604+
self._items = []
3605+
self.last_run = None
3606+
self.session = weakref.proxy(session)
3607+
self.policy = shard_reconnection_policy
3608+
self.lock = threading.Lock()
3609+
self.running = False
3610+
self.schedule = self.policy.new_schedule()
3611+
3612+
def add(self, method, *args, **kwargs):
3613+
if self.session.is_shutdown:
3614+
return
3615+
3616+
with self.lock:
3617+
self._items.append([method, args, kwargs])
3618+
if not self.running:
3619+
if not self.session.is_shutdown:
3620+
self.session.submit(self.run)
3621+
3622+
def get_delay(self):
3623+
try:
3624+
return next(self.schedule)
3625+
except StopIteration:
3626+
self.schedule = self.policy.new_schedule()
3627+
return next(self.schedule)
3628+
3629+
def run(self):
3630+
if self.session.is_shutdown:
3631+
return
3632+
3633+
with self.lock:
3634+
try:
3635+
item = self._items.pop()
3636+
self.running = True
3637+
except IndexError:
3638+
self.running = False
3639+
return
3640+
3641+
method, args, kwargs = item
3642+
try:
3643+
method(*args, **kwargs)
3644+
finally:
3645+
if not self.session.is_shutdown:
3646+
delay = self.get_delay()
3647+
if delay:
3648+
self.session.cluster.scheduler.schedule(delay, self.run)
3649+
else:
3650+
self.session.submit(self.run)
3651+
3652+
3653+
class ShardReconnectionSchedulerBase(object):
3654+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3655+
raise NotImplementedError()
3656+
3657+
3658+
class NoDelayShardReconnectionScheduler(ShardReconnectionSchedulerBase):
3659+
def __init__(self, session):
3660+
self.session = weakref.proxy(session)
3661+
3662+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3663+
if not self.session.is_shutdown:
3664+
self.session.submit(method, *args, **kwargs)
3665+
3666+
3667+
class ShardReconnectionScheduler(ShardReconnectionSchedulerBase):
3668+
def __init__(self, session, cluster, shard_reconnection_scope, shard_reconnection_policy):
3669+
self.already_scheduled = {}
3670+
self.scopes = {}
3671+
self.session = weakref.proxy(session)
3672+
self.cluster = weakref.proxy(cluster)
3673+
self.shard_reconnection_scope = shard_reconnection_scope
3674+
self.shard_reconnection_policy = shard_reconnection_policy
3675+
self.lock = threading.Lock()
3676+
3677+
def _execute(self, scheduled_key, method, *args, **kwargs):
3678+
try:
3679+
method(*args, **kwargs)
3680+
finally:
3681+
with self.lock:
3682+
self.already_scheduled[scheduled_key] = False
3683+
3684+
def schedule(self, host_id, shard_id, method, *args, **kwargs):
3685+
scope_id = self.shard_reconnection_scope.get_hash(self.cluster, host_id, shard_id)
3686+
scheduled_key = f'{host_id}-{shard_id}'
3687+
3688+
with self.lock:
3689+
if self.already_scheduled.get(scheduled_key):
3690+
return False
3691+
self.already_scheduled[scheduled_key] = True
3692+
3693+
scope_info = self.scopes.get(scope_id, 0)
3694+
if not scope_info:
3695+
scope_info = ScopeBucket(self.session, self.shard_reconnection_policy)
3696+
self.scopes[scope_id] = scope_info
3697+
scope_info.add(self._execute, scheduled_key, method,*args, **kwargs)
3698+
return True
3699+
3700+
35493701
class _ControlReconnectionHandler(_ReconnectionHandler):
35503702
"""
35513703
Internal

cassandra/policies.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import random
15+
import threading
16+
import time
17+
import weakref
1518

1619
from collections import namedtuple
1720
from functools import lru_cache
@@ -778,6 +781,14 @@ def new_schedule(self):
778781
raise NotImplementedError()
779782

780783

784+
class NoDelayReconnectionPolicy(ReconnectionPolicy):
785+
"""
786+
A :class:`.ReconnectionPolicy` subclass which does not sleep.
787+
"""
788+
def new_schedule(self):
789+
return repeat(0)
790+
791+
781792
class ConstantReconnectionPolicy(ReconnectionPolicy):
782793
"""
783794
A :class:`.ReconnectionPolicy` subclass which sleeps for a fixed delay
@@ -864,6 +875,26 @@ def _add_jitter(self, value):
864875
return min(max(self.base_delay, delay), self.max_delay)
865876

866877

878+
class ShardReconnectionScope(object):
879+
def get_hash(self, cluster, host_id, shard_id):
880+
raise NotImplementedError()
881+
882+
883+
class ShardReconnectionScopeCluster(ShardReconnectionScope):
884+
def get_hash(self, cluster, host_id, shard_id):
885+
return id(cluster)
886+
887+
888+
class ShardReconnectionScopeHost(ShardReconnectionScope):
889+
def get_hash(self, cluster, host_id, shard_id):
890+
return hash(host_id)
891+
892+
893+
class ShardReconnectionScopeShard(ShardReconnectionScope):
894+
def get_hash(self, cluster, host_id, shard_id):
895+
return hash((host_id, shard_id))
896+
897+
867898
class RetryPolicy(object):
868899
"""
869900
A policy that describes whether to retry, rethrow, or ignore coordinator

cassandra/pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,8 @@ def _replace(self, connection):
610610
del self._connections[connection.features.shard_id]
611611
if self.host.sharding_info and not self._session.cluster.shard_aware_options.disable:
612612
self._connecting.add(connection.features.shard_id)
613-
self._session.submit(self._open_connection_to_missing_shard, connection.features.shard_id)
613+
self._session.shard_reconnection_scheduler.schedule(
614+
self.host.host_id, connection.features.shard_id, self._open_connection_to_missing_shard, connection.features.shard_id)
614615
else:
615616
connection = self._session.cluster.connection_factory(self.host.endpoint,
616617
on_orphaned_stream_released=self.on_orphaned_stream_released)

0 commit comments

Comments
 (0)