Skip to content

Commit f8b6883

Browse files
committed
Update TokenAwarePolicy.make_query_plan to schedule to replicas first
End result should be like this: 1. Replicas from the same RACK (if rack is specified) 2. Replicas from the same DC, but Remote RACK (if rack is specified) 3. Replicas from the same DC (if rack is not specified) 3. Replicas from the remote DC 5. Non-replicas in the same order
1 parent bc0317b commit f8b6883

File tree

4 files changed

+2456
-15
lines changed

4 files changed

+2456
-15
lines changed

cassandra/policies.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import random
15+
import threading
1516

1617
from collections import namedtuple
1718
from itertools import islice, cycle, groupby, repeat
@@ -526,14 +527,23 @@ def make_query_plan(self, working_keyspace=None, query=None):
526527
if self.shuffle_replicas:
527528
shuffle(replicas)
528529

529-
for replica in replicas:
530-
if replica.is_up and child.distance(replica) in [HostDistance.LOCAL, HostDistance.LOCAL_RACK]:
531-
yield replica
530+
def yield_in_order(hosts):
531+
for replica in hosts:
532+
if replica.is_up and child.distance(replica) == HostDistance.LOCAL_RACK:
533+
yield replica
532534

533-
for host in child.make_query_plan(keyspace, query):
534-
# skip if we've already listed this host
535-
if host not in replicas or child.distance(host) == HostDistance.REMOTE:
536-
yield host
535+
for replica in hosts:
536+
if replica.is_up and child.distance(replica) == HostDistance.LOCAL:
537+
yield replica
538+
539+
for replica in hosts:
540+
if replica.is_up and child.distance(replica) == HostDistance.REMOTE:
541+
yield replica
542+
543+
# yield replicas: local_rack, local, remote
544+
yield from yield_in_order(replicas)
545+
# yield rest of the cluster: local_rack, local, remote
546+
yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas])
537547

538548
def on_up(self, *args, **kwargs):
539549
return self._child_policy.on_up(*args, **kwargs)

cassandra/pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ def __str__(self):
250250

251251
def __repr__(self):
252252
dc = (" %s" % (self._datacenter,)) if self._datacenter else ""
253-
return "<%s: %s%s>" % (self.__class__.__name__, self.endpoint, dc)
253+
rack = (" %s" % (self._rack,)) if self._rack else ""
254+
return "<%s: %s%s%s>" % (self.__class__.__name__, self.endpoint, dc, rack)
254255

255256

256257
class _ReconnectionHandler(object):

tests/unit/test_policies.py

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ def get_replicas(keyspace, packed_key):
636636

637637
cluster.metadata.get_replicas.side_effect = get_replicas
638638

639-
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=1))
639+
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=2))
640640
policy.populate(cluster, hosts)
641641

642642
for i in range(4):
@@ -648,14 +648,74 @@ def get_replicas(keyspace, packed_key):
648648
assert qplan[0] in replicas
649649
assert qplan[0].datacenter == "dc1"
650650

651-
# then the local non-replica
652-
assert qplan[1] not in replicas
651+
# then the replica from remote DC
652+
assert qplan[1] in replicas
653+
assert qplan[1].datacenter == "dc2"
654+
655+
# then non-replica from local DC
656+
assert qplan[2] not in replicas
657+
assert qplan[2].datacenter == "dc1"
658+
659+
# and only then non-replica from remote DC
660+
assert qplan[3] not in replicas
661+
assert qplan[3].datacenter == "dc2"
662+
663+
assert 4 == len(qplan)
664+
665+
def test_wrap_rack_aware(self):
666+
cluster = Mock(spec=Cluster)
667+
cluster.metadata = Mock(spec=Metadata)
668+
cluster.control_connection._tablets_routing_v1 = False
669+
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(8)]
670+
for host in hosts:
671+
host.set_up()
672+
hosts[0].set_location_info("dc1", "rack1")
673+
hosts[1].set_location_info("dc1", "rack2")
674+
hosts[2].set_location_info("dc2", "rack1")
675+
hosts[3].set_location_info("dc2", "rack2")
676+
hosts[4].set_location_info("dc1", "rack1")
677+
hosts[5].set_location_info("dc1", "rack2")
678+
hosts[6].set_location_info("dc2", "rack1")
679+
hosts[7].set_location_info("dc2", "rack2")
680+
681+
def get_replicas(keyspace, packed_key):
682+
index = struct.unpack('>i', packed_key)[0]
683+
# return one node from each DC
684+
if index % 2 == 0:
685+
return [hosts[0], hosts[1], hosts[2], hosts[3]]
686+
else:
687+
return [hosts[4], hosts[5], hosts[6], hosts[7]]
688+
689+
cluster.metadata.get_replicas.side_effect = get_replicas
690+
691+
policy = TokenAwarePolicy(RackAwareRoundRobinPolicy("dc1", "rack1", used_hosts_per_remote_dc=4))
692+
policy.populate(cluster, hosts)
693+
694+
for i in range(4):
695+
query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name')
696+
qplan = list(policy.make_query_plan(None, query))
697+
replicas = get_replicas(None, struct.pack('>i', i))
698+
699+
print(qplan)
700+
print(replicas)
701+
702+
# first should be replica from local rack local dc
703+
assert qplan[0] in replicas
704+
assert qplan[0].datacenter == "dc1"
705+
assert qplan[0].rack == "rack1"
706+
707+
# second should be replica from remote rack local dc
708+
assert qplan[1] in replicas
653709
assert qplan[1].datacenter == "dc1"
710+
assert qplan[1].rack == "rack2"
654711

655-
# then one of the remotes (used_hosts_per_remote_dc is 1, so we
656-
# shouldn't see two remotes)
712+
# third and forth should be replica from the remote dcs
713+
assert qplan[2] in replicas
657714
assert qplan[2].datacenter == "dc2"
658-
assert 3 == len(qplan)
715+
assert qplan[3] in replicas
716+
assert qplan[3].datacenter == "dc2"
717+
718+
assert 8 == len(qplan)
659719

660720
class FakeCluster:
661721
def __init__(self):
@@ -1403,7 +1463,7 @@ def setUp(self):
14031463

14041464
def _check_init(self, hfp):
14051465
assert hfp._child_policy is self.child_policy
1406-
assert isinstance(hfp._hosts_lock, LockType)
1466+
assert isinstance(hfp._known_hosts_lock, LockType)
14071467

14081468
# we can't use a simple assertIs because we wrap the function
14091469
arg0, arg1 = Mock(name='arg0'), Mock(name='arg1')

0 commit comments

Comments
 (0)