Skip to content

Commit 2f271be

Browse files
committed
Update TokenAwarePolicy.make_query_plan to schedule to replicas first
End result should be like this: 1. Replicas from the same RACK (if rack is specified) 2. Replicas from the same DC, but Remote RACK (if rack is specified) 3. Replicas from the same DC (if rack is not specified) 3. Replicas from the remote DC 5. Non-replicas in the same order
1 parent bc0317b commit 2f271be

File tree

2 files changed

+77
-14
lines changed

2 files changed

+77
-14
lines changed

cassandra/policies.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -526,14 +526,16 @@ def make_query_plan(self, working_keyspace=None, query=None):
526526
if self.shuffle_replicas:
527527
shuffle(replicas)
528528

529-
for replica in replicas:
530-
if replica.is_up and child.distance(replica) in [HostDistance.LOCAL, HostDistance.LOCAL_RACK]:
531-
yield replica
532-
533-
for host in child.make_query_plan(keyspace, query):
534-
# skip if we've already listed this host
535-
if host not in replicas or child.distance(host) == HostDistance.REMOTE:
536-
yield host
529+
def yield_in_order(hosts):
530+
for distance in [HostDistance.LOCAL_RACK, HostDistance.LOCAL, HostDistance.REMOTE]:
531+
for replica in hosts:
532+
if replica.is_up and child.distance(replica) == distance:
533+
yield replica
534+
535+
# yield replicas: local_rack, local, remote
536+
yield from yield_in_order(replicas)
537+
# yield rest of the cluster: local_rack, local, remote
538+
yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas])
537539

538540
def on_up(self, *args, **kwargs):
539541
return self._child_policy.on_up(*args, **kwargs)

tests/unit/test_policies.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ def get_replicas(keyspace, packed_key):
636636

637637
cluster.metadata.get_replicas.side_effect = get_replicas
638638

639-
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=1))
639+
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=2))
640640
policy.populate(cluster, hosts)
641641

642642
for i in range(4):
@@ -648,14 +648,75 @@ def get_replicas(keyspace, packed_key):
648648
assert qplan[0] in replicas
649649
assert qplan[0].datacenter == "dc1"
650650

651-
# then the local non-replica
652-
assert qplan[1] not in replicas
651+
# then the replica from remote DC
652+
assert qplan[1] in replicas
653+
assert qplan[1].datacenter == "dc2"
654+
655+
# then non-replica from local DC
656+
assert qplan[2] not in replicas
657+
assert qplan[2].datacenter == "dc1"
658+
659+
# and only then non-replica from remote DC
660+
assert qplan[3] not in replicas
661+
assert qplan[3].datacenter == "dc2"
662+
663+
assert 4 == len(qplan)
664+
665+
def test_wrap_rack_aware(self):
666+
cluster = Mock(spec=Cluster)
667+
cluster.metadata = Mock(spec=Metadata)
668+
cluster.metadata._tablets = Mock(spec=Tablets)
669+
cluster.metadata._tablets.table_has_tablets.return_value = []
670+
hosts = [Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy) for i in range(8)]
671+
for host in hosts:
672+
host.set_up()
673+
hosts[0].set_location_info("dc1", "rack1")
674+
hosts[1].set_location_info("dc1", "rack2")
675+
hosts[2].set_location_info("dc2", "rack1")
676+
hosts[3].set_location_info("dc2", "rack2")
677+
hosts[4].set_location_info("dc1", "rack1")
678+
hosts[5].set_location_info("dc1", "rack2")
679+
hosts[6].set_location_info("dc2", "rack1")
680+
hosts[7].set_location_info("dc2", "rack2")
681+
682+
def get_replicas(keyspace, packed_key):
683+
index = struct.unpack('>i', packed_key)[0]
684+
# return one node from each DC
685+
if index % 2 == 0:
686+
return [hosts[0], hosts[1], hosts[2], hosts[3]]
687+
else:
688+
return [hosts[4], hosts[5], hosts[6], hosts[7]]
689+
690+
cluster.metadata.get_replicas.side_effect = get_replicas
691+
692+
policy = TokenAwarePolicy(RackAwareRoundRobinPolicy("dc1", "rack1", used_hosts_per_remote_dc=4))
693+
policy.populate(cluster, hosts)
694+
695+
for i in range(4):
696+
query = Statement(routing_key=struct.pack('>i', i), keyspace='keyspace_name')
697+
qplan = list(policy.make_query_plan(None, query))
698+
replicas = get_replicas(None, struct.pack('>i', i))
699+
700+
print(qplan)
701+
print(replicas)
702+
703+
# first should be replica from local rack local dc
704+
assert qplan[0] in replicas
705+
assert qplan[0].datacenter == "dc1"
706+
assert qplan[0].rack == "rack1"
707+
708+
# second should be replica from remote rack local dc
709+
assert qplan[1] in replicas
653710
assert qplan[1].datacenter == "dc1"
711+
assert qplan[1].rack == "rack2"
654712

655-
# then one of the remotes (used_hosts_per_remote_dc is 1, so we
656-
# shouldn't see two remotes)
713+
# third and forth should be replica from the remote dcs
714+
assert qplan[2] in replicas
657715
assert qplan[2].datacenter == "dc2"
658-
assert 3 == len(qplan)
716+
assert qplan[3] in replicas
717+
assert qplan[3].datacenter == "dc2"
718+
719+
assert 8 == len(qplan)
659720

660721
class FakeCluster:
661722
def __init__(self):

0 commit comments

Comments
 (0)