Skip to content

Commit 98e73a3

Browse files
committed
token-aware-policy: drop _tablets_routing_v1 flag
This flag was introduced to check if server supports tablets. As result driver needs to sync it to policy when control connection is established. Which is an unwanted problem. Let's relay on presence of tablets for given table instead, which will not require any syncing.
1 parent f52edbc commit 98e73a3

File tree

5 files changed

+37
-20
lines changed

5 files changed

+37
-20
lines changed

cassandra/cluster.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,14 +1734,6 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17341734
self.shutdown()
17351735
raise
17361736

1737-
# Update the information about tablet support after connection handshake.
1738-
self.load_balancing_policy._tablets_routing_v1 = self.control_connection._tablets_routing_v1
1739-
child_policy = self.load_balancing_policy.child_policy if hasattr(self.load_balancing_policy, 'child_policy') else None
1740-
while child_policy is not None:
1741-
if hasattr(child_policy, '_tablet_routing_v1'):
1742-
child_policy._tablet_routing_v1 = self.control_connection._tablets_routing_v1
1743-
child_policy = child_policy.child_policy if hasattr(child_policy, 'child_policy') else None
1744-
17451737
self.profile_manager.check_supported() # todo: rename this method
17461738

17471739
if self.idle_heartbeat_interval:

cassandra/policies.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,6 @@ class TokenAwarePolicy(LoadBalancingPolicy):
477477

478478
_child_policy = None
479479
_cluster_metadata = None
480-
_tablets_routing_v1 = False
481480
shuffle_replicas = False
482481
"""
483482
Yield local replicas in a random order.
@@ -489,7 +488,6 @@ def __init__(self, child_policy, shuffle_replicas=False):
489488

490489
def populate(self, cluster, hosts):
491490
self._cluster_metadata = cluster.metadata
492-
self._tablets_routing_v1 = cluster.control_connection._tablets_routing_v1
493491
self._child_policy.populate(cluster, hosts)
494492

495493
def check_supported(self):
@@ -514,17 +512,16 @@ def make_query_plan(self, working_keyspace=None, query=None):
514512
return
515513

516514
replicas = []
517-
if self._tablets_routing_v1:
515+
if self._cluster_metadata._tablets.table_has_tablets(keyspace, query.table):
518516
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
519-
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
517+
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
520518

521519
if tablet is not None:
522520
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
523521
child_plan = child.make_query_plan(keyspace, query)
524522

525523
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
526-
527-
if not replicas:
524+
else:
528525
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
529526

530527
if self.shuffle_replicas:

cassandra/tablets.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ def __init__(self, tablets):
4949
self._tablets = tablets
5050
self._lock = Lock()
5151

52+
def table_has_tablets(self, keyspace, table) -> bool:
53+
return bool(self._tablets.get((keyspace, table), []))
54+
5255
def get_tablet_for_key(self, keyspace, table, t):
5356
tablet = self._tablets.get((keyspace, table), [])
5457
if not tablet:

tests/integration/standard/test_tablets.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from cassandra.cluster import Cluster
5+
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile
66
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
77

88
from tests.integration import PROTOCOL_VERSION, use_cluster, get_cluster
@@ -163,6 +163,36 @@ def test_tablets_shard_awareness(self):
163163
self.query_data_shard_select(self.session)
164164
self.query_data_shard_insert(self.session)
165165

166+
def test_tablets_lbp_in_profile(self):
167+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
168+
execution_profiles={
169+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
170+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
171+
)},
172+
reconnection_policy=ConstantReconnectionPolicy(1))
173+
session = cluster.connect()
174+
try:
175+
self.query_data_host_select(self.session)
176+
self.query_data_host_insert(self.session)
177+
finally:
178+
session.shutdown()
179+
cluster.shutdown()
180+
181+
def test_tablets_shard_awareness_lbp_in_profile(self):
182+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
183+
execution_profiles={
184+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
185+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
186+
)},
187+
reconnection_policy=ConstantReconnectionPolicy(1))
188+
session = cluster.connect()
189+
try:
190+
self.query_data_shard_select(self.session)
191+
self.query_data_shard_insert(self.session)
192+
finally:
193+
session.shutdown()
194+
cluster.shutdown()
195+
166196
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
167197
def recreate_while_reconnecting(_):
168198
# Kill control connection

tests/unit/test_policies.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,6 @@ class TokenAwarePolicyTest(unittest.TestCase):
578578
def test_wrap_round_robin(self):
579579
cluster = Mock(spec=Cluster)
580580
cluster.metadata = Mock(spec=Metadata)
581-
cluster.control_connection._tablets_routing_v1 = False
582581
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
583582
for host in hosts:
584583
host.set_up()
@@ -610,7 +609,6 @@ def get_replicas(keyspace, packed_key):
610609
def test_wrap_dc_aware(self):
611610
cluster = Mock(spec=Cluster)
612611
cluster.metadata = Mock(spec=Metadata)
613-
cluster.control_connection._tablets_routing_v1 = False
614612
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
615613
for host in hosts:
616614
host.set_up()
@@ -740,7 +738,6 @@ def test_statement_keyspace(self):
740738

741739
cluster = Mock(spec=Cluster)
742740
cluster.metadata = Mock(spec=Metadata)
743-
cluster.control_connection._tablets_routing_v1 = False
744741
replicas = hosts[2:]
745742
cluster.metadata.get_replicas.return_value = replicas
746743

@@ -831,7 +828,6 @@ def _assert_shuffle(self, patched_shuffle, keyspace, routing_key):
831828

832829
cluster = Mock(spec=Cluster)
833830
cluster.metadata = Mock(spec=Metadata)
834-
cluster.control_connection._tablets_routing_v1 = False
835831
replicas = hosts[2:]
836832
cluster.metadata.get_replicas.return_value = replicas
837833

@@ -1534,7 +1530,6 @@ def test_query_plan_deferred_to_child(self):
15341530

15351531
def test_wrap_token_aware(self):
15361532
cluster = Mock(spec=Cluster)
1537-
cluster.control_connection._tablets_routing_v1 = False
15381533
hosts = [Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in range(1, 6)]
15391534
for host in hosts:
15401535
host.set_up()

0 commit comments

Comments
 (0)