Skip to content

Commit 70cd971

Browse files
committed
token-aware-policy: drop _tablets_routing_v1 flag
This flag was introduced to check if server supports tablets. As result driver needs to sync it to policy when control connection is established. Which is an unwanted problem. Let's relay on presence of tablets for given table instead, which will not require any syncing.
1 parent b3c63c0 commit 70cd971

File tree

5 files changed

+37
-20
lines changed

5 files changed

+37
-20
lines changed

cassandra/cluster.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,14 +1734,6 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17341734
self.shutdown()
17351735
raise
17361736

1737-
# Update the information about tablet support after connection handshake.
1738-
self.load_balancing_policy._tablets_routing_v1 = self.control_connection._tablets_routing_v1
1739-
child_policy = self.load_balancing_policy.child_policy if hasattr(self.load_balancing_policy, 'child_policy') else None
1740-
while child_policy is not None:
1741-
if hasattr(child_policy, '_tablet_routing_v1'):
1742-
child_policy._tablet_routing_v1 = self.control_connection._tablets_routing_v1
1743-
child_policy = child_policy.child_policy if hasattr(child_policy, 'child_policy') else None
1744-
17451737
self.profile_manager.check_supported() # todo: rename this method
17461738

17471739
if self.idle_heartbeat_interval:

cassandra/policies.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,6 @@ class TokenAwarePolicy(LoadBalancingPolicy):
476476

477477
_child_policy = None
478478
_cluster_metadata = None
479-
_tablets_routing_v1 = False
480479
shuffle_replicas = False
481480
"""
482481
Yield local replicas in a random order.
@@ -488,7 +487,6 @@ def __init__(self, child_policy, shuffle_replicas=False):
488487

489488
def populate(self, cluster, hosts):
490489
self._cluster_metadata = cluster.metadata
491-
self._tablets_routing_v1 = cluster.control_connection._tablets_routing_v1
492490
self._child_policy.populate(cluster, hosts)
493491

494492
def check_supported(self):
@@ -513,17 +511,16 @@ def make_query_plan(self, working_keyspace=None, query=None):
513511
return
514512

515513
replicas = []
516-
if self._tablets_routing_v1:
514+
if self._cluster_metadata._tablets.table_has_tablets(keyspace, query.table):
517515
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
518-
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
516+
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
519517

520518
if tablet is not None:
521519
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
522520
child_plan = child.make_query_plan(keyspace, query)
523521

524522
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
525-
526-
if not replicas:
523+
else:
527524
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
528525

529526
if self.shuffle_replicas:

cassandra/tablets.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ def __init__(self, tablets):
4949
self._tablets = tablets
5050
self._lock = Lock()
5151

52+
def table_has_tablets(self, keyspace, table) -> bool:
53+
return bool(self._tablets.get((keyspace, table), []))
54+
5255
def get_tablet_for_key(self, keyspace, table, t):
5356
tablet = self._tablets.get((keyspace, table), [])
5457
if not tablet:

tests/integration/standard/test_tablets.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from cassandra.cluster import Cluster
5+
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile
66
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
77

88
from tests.integration import PROTOCOL_VERSION, use_cluster, get_cluster
@@ -163,6 +163,36 @@ def test_tablets_shard_awareness(self):
163163
self.query_data_shard_select(self.session)
164164
self.query_data_shard_insert(self.session)
165165

166+
def test_tablets_lbp_in_profile(self):
167+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
168+
execution_profiles={
169+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
170+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
171+
)},
172+
reconnection_policy=ConstantReconnectionPolicy(1))
173+
session = cluster.connect()
174+
try:
175+
self.query_data_host_select(self.session)
176+
self.query_data_host_insert(self.session)
177+
finally:
178+
session.shutdown()
179+
cluster.shutdown()
180+
181+
def test_tablets_shard_awareness_lbp_in_profile(self):
182+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
183+
execution_profiles={
184+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
185+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
186+
)},
187+
reconnection_policy=ConstantReconnectionPolicy(1))
188+
session = cluster.connect()
189+
try:
190+
self.query_data_shard_select(self.session)
191+
self.query_data_shard_insert(self.session)
192+
finally:
193+
session.shutdown()
194+
cluster.shutdown()
195+
166196
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
167197
def recreate_while_reconnecting(_):
168198
# Kill control connection

tests/unit/test_policies.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ class TokenAwarePolicyTest(unittest.TestCase):
582582
def test_wrap_round_robin(self):
583583
cluster = Mock(spec=Cluster)
584584
cluster.metadata = Mock(spec=Metadata)
585-
cluster.control_connection._tablets_routing_v1 = False
586585
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
587586
for host in hosts:
588587
host.set_up()
@@ -614,7 +613,6 @@ def get_replicas(keyspace, packed_key):
614613
def test_wrap_dc_aware(self):
615614
cluster = Mock(spec=Cluster)
616615
cluster.metadata = Mock(spec=Metadata)
617-
cluster.control_connection._tablets_routing_v1 = False
618616
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
619617
for host in hosts:
620618
host.set_up()
@@ -744,7 +742,6 @@ def test_statement_keyspace(self):
744742

745743
cluster = Mock(spec=Cluster)
746744
cluster.metadata = Mock(spec=Metadata)
747-
cluster.control_connection._tablets_routing_v1 = False
748745
replicas = hosts[2:]
749746
cluster.metadata.get_replicas.return_value = replicas
750747

@@ -835,7 +832,6 @@ def _assert_shuffle(self, patched_shuffle, keyspace, routing_key):
835832

836833
cluster = Mock(spec=Cluster)
837834
cluster.metadata = Mock(spec=Metadata)
838-
cluster.control_connection._tablets_routing_v1 = False
839835
replicas = hosts[2:]
840836
cluster.metadata.get_replicas.return_value = replicas
841837

@@ -1538,7 +1534,6 @@ def test_query_plan_deferred_to_child(self):
15381534

15391535
def test_wrap_token_aware(self):
15401536
cluster = Mock(spec=Cluster)
1541-
cluster.control_connection._tablets_routing_v1 = False
15421537
hosts = [Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in range(1, 6)]
15431538
for host in hosts:
15441539
host.set_up()

0 commit comments

Comments
 (0)