Skip to content

Commit bc0317b

Browse files
committed
token-aware-policy: drop _tablets_routing_v1 flag
This flag was introduced to check if server supports tablets. As result driver needs to sync it to policy when control connection is established. Which is an unwanted problem. Let's relay on presence of tablets for given table instead, which will not require any syncing.
1 parent b3c63c0 commit bc0317b

File tree

5 files changed

+81
-28
lines changed

5 files changed

+81
-28
lines changed

cassandra/cluster.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,14 +1734,6 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17341734
self.shutdown()
17351735
raise
17361736

1737-
# Update the information about tablet support after connection handshake.
1738-
self.load_balancing_policy._tablets_routing_v1 = self.control_connection._tablets_routing_v1
1739-
child_policy = self.load_balancing_policy.child_policy if hasattr(self.load_balancing_policy, 'child_policy') else None
1740-
while child_policy is not None:
1741-
if hasattr(child_policy, '_tablet_routing_v1'):
1742-
child_policy._tablet_routing_v1 = self.control_connection._tablets_routing_v1
1743-
child_policy = child_policy.child_policy if hasattr(child_policy, 'child_policy') else None
1744-
17451737
self.profile_manager.check_supported() # todo: rename this method
17461738

17471739
if self.idle_heartbeat_interval:

cassandra/policies.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,6 @@ class TokenAwarePolicy(LoadBalancingPolicy):
476476

477477
_child_policy = None
478478
_cluster_metadata = None
479-
_tablets_routing_v1 = False
480479
shuffle_replicas = False
481480
"""
482481
Yield local replicas in a random order.
@@ -488,7 +487,6 @@ def __init__(self, child_policy, shuffle_replicas=False):
488487

489488
def populate(self, cluster, hosts):
490489
self._cluster_metadata = cluster.metadata
491-
self._tablets_routing_v1 = cluster.control_connection._tablets_routing_v1
492490
self._child_policy.populate(cluster, hosts)
493491

494492
def check_supported(self):
@@ -513,17 +511,16 @@ def make_query_plan(self, working_keyspace=None, query=None):
513511
return
514512

515513
replicas = []
516-
if self._tablets_routing_v1:
514+
if self._cluster_metadata._tablets.table_has_tablets(keyspace, query.table):
517515
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
518-
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
516+
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
519517

520518
if tablet is not None:
521519
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
522520
child_plan = child.make_query_plan(keyspace, query)
523521

524522
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
525-
526-
if not replicas:
523+
else:
527524
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
528525

529526
if self.shuffle_replicas:

cassandra/tablets.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ def __init__(self, tablets):
4949
self._tablets = tablets
5050
self._lock = Lock()
5151

52+
def table_has_tablets(self, keyspace, table) -> bool:
53+
return bool(self._tablets.get((keyspace, table), []))
54+
5255
def get_tablet_for_key(self, keyspace, table, t):
5356
tablet = self._tablets.get((keyspace, table), [])
5457
if not tablet:

tests/integration/standard/test_tablets.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from cassandra.cluster import Cluster
5+
from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile
66
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy
77

88
from tests.integration import PROTOCOL_VERSION, use_cluster, get_cluster
@@ -163,6 +163,36 @@ def test_tablets_shard_awareness(self):
163163
self.query_data_shard_select(self.session)
164164
self.query_data_shard_insert(self.session)
165165

166+
def test_tablets_lbp_in_profile(self):
167+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
168+
execution_profiles={
169+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
170+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
171+
)},
172+
reconnection_policy=ConstantReconnectionPolicy(1))
173+
session = cluster.connect()
174+
try:
175+
self.query_data_host_select(self.session)
176+
self.query_data_host_insert(self.session)
177+
finally:
178+
session.shutdown()
179+
cluster.shutdown()
180+
181+
def test_tablets_shard_awareness_lbp_in_profile(self):
182+
cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
183+
execution_profiles={
184+
EXEC_PROFILE_DEFAULT: ExecutionProfile(
185+
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
186+
)},
187+
reconnection_policy=ConstantReconnectionPolicy(1))
188+
session = cluster.connect()
189+
try:
190+
self.query_data_shard_select(self.session)
191+
self.query_data_shard_insert(self.session)
192+
finally:
193+
session.shutdown()
194+
cluster.shutdown()
195+
166196
def test_tablets_invalidation_drop_ks_while_reconnecting(self):
167197
def recreate_while_reconnecting(_):
168198
# Kill control connection

tests/unit/test_policies.py

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from cassandra.connection import DefaultEndPoint, UnixSocketEndPoint
3737
from cassandra.pool import Host
3838
from cassandra.query import Statement
39+
from cassandra.tablets import Tablets, Tablet
3940

4041

4142
class LoadBalancingPolicyTest(unittest.TestCase):
@@ -582,7 +583,8 @@ class TokenAwarePolicyTest(unittest.TestCase):
582583
def test_wrap_round_robin(self):
583584
cluster = Mock(spec=Cluster)
584585
cluster.metadata = Mock(spec=Metadata)
585-
cluster.control_connection._tablets_routing_v1 = False
586+
cluster.metadata._tablets = Mock(spec=Tablets)
587+
cluster.metadata._tablets.table_has_tablets.return_value = []
586588
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
587589
for host in hosts:
588590
host.set_up()
@@ -614,7 +616,8 @@ def get_replicas(keyspace, packed_key):
614616
def test_wrap_dc_aware(self):
615617
cluster = Mock(spec=Cluster)
616618
cluster.metadata = Mock(spec=Metadata)
617-
cluster.control_connection._tablets_routing_v1 = False
619+
cluster.metadata._tablets = Mock(spec=Tablets)
620+
cluster.metadata._tablets.table_has_tablets.return_value = []
618621
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
619622
for host in hosts:
620623
host.set_up()
@@ -744,9 +747,10 @@ def test_statement_keyspace(self):
744747

745748
cluster = Mock(spec=Cluster)
746749
cluster.metadata = Mock(spec=Metadata)
747-
cluster.control_connection._tablets_routing_v1 = False
750+
cluster.metadata._tablets = Mock(spec=Tablets)
748751
replicas = hosts[2:]
749752
cluster.metadata.get_replicas.return_value = replicas
753+
cluster.metadata._tablets.table_has_tablets.return_value = []
750754

751755
child_policy = Mock()
752756
child_policy.make_query_plan.return_value = hosts
@@ -803,7 +807,8 @@ def test_shuffles_if_given_keyspace_and_routing_key(self):
803807
804808
@test_category policy
805809
"""
806-
self._assert_shuffle(keyspace='keyspace', routing_key='routing_key')
810+
self._assert_shuffle(cluster=self._prepare_cluster_with_vnodes(), keyspace='keyspace', routing_key='routing_key')
811+
self._assert_shuffle(cluster=self._prepare_cluster_with_tablets(), keyspace='keyspace', routing_key='routing_key')
807812

808813
def test_no_shuffle_if_given_no_keyspace(self):
809814
"""
@@ -814,7 +819,8 @@ def test_no_shuffle_if_given_no_keyspace(self):
814819
815820
@test_category policy
816821
"""
817-
self._assert_shuffle(keyspace=None, routing_key='routing_key')
822+
self._assert_shuffle(cluster=self._prepare_cluster_with_vnodes(), keyspace=None, routing_key='routing_key')
823+
self._assert_shuffle(cluster=self._prepare_cluster_with_tablets(), keyspace=None, routing_key='routing_key')
818824

819825
def test_no_shuffle_if_given_no_routing_key(self):
820826
"""
@@ -825,27 +831,47 @@ def test_no_shuffle_if_given_no_routing_key(self):
825831
826832
@test_category policy
827833
"""
828-
self._assert_shuffle(keyspace='keyspace', routing_key=None)
834+
self._assert_shuffle(cluster=self._prepare_cluster_with_vnodes(), keyspace='keyspace', routing_key=None)
835+
self._assert_shuffle(cluster=self._prepare_cluster_with_tablets(), keyspace='keyspace', routing_key=None)
829836

830-
@patch('cassandra.policies.shuffle')
831-
def _assert_shuffle(self, patched_shuffle, keyspace, routing_key):
837+
def _prepare_cluster_with_vnodes(self):
832838
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
833839
for host in hosts:
834840
host.set_up()
841+
cluster = Mock(spec=Cluster)
842+
cluster.metadata = Mock(spec=Metadata)
843+
cluster.metadata._tablets = Mock(spec=Tablets)
844+
cluster.metadata.all_hosts.return_value = hosts
845+
cluster.metadata.get_replicas.return_value = hosts[2:]
846+
cluster.metadata._tablets.table_has_tablets.return_value = False
847+
return cluster
835848

849+
def _prepare_cluster_with_tablets(self):
850+
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(4)]
851+
for host in hosts:
852+
host.set_up()
836853
cluster = Mock(spec=Cluster)
837854
cluster.metadata = Mock(spec=Metadata)
838-
cluster.control_connection._tablets_routing_v1 = False
839-
replicas = hosts[2:]
840-
cluster.metadata.get_replicas.return_value = replicas
855+
cluster.metadata._tablets = Mock(spec=Tablets)
856+
cluster.metadata.all_hosts.return_value = hosts
857+
cluster.metadata.get_replicas.return_value = hosts[2:]
858+
cluster.metadata._tablets.table_has_tablets.return_value = True
859+
cluster.metadata._tablets.get_tablet_for_key.return_value = Tablet(replicas=[(h.host_id, 0) for h in hosts[2:]])
860+
return cluster
841861

862+
@patch('cassandra.policies.shuffle')
863+
def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key):
864+
hosts = cluster.metadata.all_hosts()
865+
replicas = cluster.metadata.get_replicas()
842866
child_policy = Mock()
843867
child_policy.make_query_plan.return_value = hosts
844868
child_policy.distance.return_value = HostDistance.LOCAL
845869

846870
policy = TokenAwarePolicy(child_policy, shuffle_replicas=True)
847871
policy.populate(cluster, hosts)
848872

873+
is_tablets = cluster.metadata._tablets.table_has_tablets()
874+
849875
cluster.metadata.get_replicas.reset_mock()
850876
child_policy.make_query_plan.reset_mock()
851877
query = Statement(routing_key=routing_key)
@@ -858,7 +884,11 @@ def _assert_shuffle(self, patched_shuffle, keyspace, routing_key):
858884
else:
859885
assert set(replicas) == set(qplan[:2])
860886
assert hosts[:2] == qplan[2:]
861-
child_policy.make_query_plan.assert_called_once_with(keyspace, query)
887+
if is_tablets:
888+
child_policy.make_query_plan.assert_called_with(keyspace, query)
889+
assert child_policy.make_query_plan.call_count == 2
890+
else:
891+
child_policy.make_query_plan.assert_called_once_with(keyspace, query)
862892
assert patched_shuffle.call_count == 1
863893

864894

@@ -1538,7 +1568,6 @@ def test_query_plan_deferred_to_child(self):
15381568

15391569
def test_wrap_token_aware(self):
15401570
cluster = Mock(spec=Cluster)
1541-
cluster.control_connection._tablets_routing_v1 = False
15421571
hosts = [Host(DefaultEndPoint("127.0.0.{}".format(i)), SimpleConvictionPolicy) for i in range(1, 6)]
15431572
for host in hosts:
15441573
host.set_up()
@@ -1547,6 +1576,8 @@ def get_replicas(keyspace, packed_key):
15471576
return hosts[:2]
15481577

15491578
cluster.metadata.get_replicas.side_effect = get_replicas
1579+
cluster.metadata._tablets = Mock(spec=Tablets)
1580+
cluster.metadata._tablets.table_has_tablets.return_value = []
15501581

15511582
child_policy = TokenAwarePolicy(RoundRobinPolicy())
15521583

0 commit comments

Comments
 (0)