Skip to content

Commit d177881

Browse files
author
Unnikrishnan G
committed
Adds support for Round Robin Replica selection
1 parent 65c527c commit d177881

File tree

2 files changed

+108
-1
lines changed

2 files changed

+108
-1
lines changed

osscluster.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type ClusterOptions struct {
4848
// Allows routing read-only commands to the random master or slave node.
4949
// It automatically enables ReadOnly.
5050
RouteRandomly bool
51+
// Allows routing read-only commands to the replica nodes in ronund-robin.
52+
// It automatically enables ReadOnly
53+
RouteRoundRobinReplicas bool
5154

5255
// Optional function that returns cluster slots information.
5356
// It is useful to manually create cluster of standalone Redis servers
@@ -584,6 +587,9 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
584587
type clusterSlot struct {
585588
start, end int
586589
nodes []*clusterNode
590+
591+
// Allows node selection to use round-robin selection strategy.
592+
next uint32
587593
}
588594

589595
type clusterSlotSlice []*clusterSlot
@@ -767,7 +773,44 @@ func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
767773
return nodes[randomNodes[0]], nil
768774
}
769775

776+
// slotRoundRobinReplicaNode tries to select a node from the list of replica nodes.
777+
// if no replica nodes are available, returns the primary node.
778+
func (c *clusterState) slotRoundRobinReplicaNode(slot int) (*clusterNode, error) {
779+
cs := c.slotCluster(slot)
780+
if cs == nil {
781+
return c.nodes.Random()
782+
}
783+
784+
switch len(cs.nodes) {
785+
case 0:
786+
return c.nodes.Random()
787+
case 1:
788+
return cs.nodes[0], nil
789+
case 2:
790+
if replica := cs.nodes[1]; !replica.Failing() {
791+
return replica, nil
792+
}
793+
return cs.nodes[0], nil
794+
default:
795+
var replica *clusterNode
796+
for i := 0; i < 10; i++ {
797+
next := atomic.AddUint32(&cs.next, 1)
798+
n := (int(next))%(len(cs.nodes)-1) + 1
799+
replica = cs.nodes[n]
800+
if !replica.Failing() {
801+
return replica, nil
802+
}
803+
}
804+
// All slaves are loading - use master.
805+
return cs.nodes[0], nil
806+
}
807+
}
808+
770809
func (c *clusterState) slotNodes(slot int) []*clusterNode {
810+
return c.slotCluster(slot).nodes
811+
}
812+
813+
func (c *clusterState) slotCluster(slot int) *clusterSlot {
771814
i := sort.Search(len(c.slots), func(i int) bool {
772815
return c.slots[i].end >= slot
773816
})
@@ -776,8 +819,9 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
776819
}
777820
x := c.slots[i]
778821
if slot >= x.start && slot <= x.end {
779-
return x.nodes
822+
return x
780823
}
824+
781825
return nil
782826
}
783827

@@ -1824,6 +1868,9 @@ func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*cluste
18241868
if c.opt.RouteRandomly {
18251869
return state.slotRandomNode(slot)
18261870
}
1871+
if c.opt.RouteRoundRobinReplicas {
1872+
return state.slotRoundRobinReplicaNode(slot)
1873+
}
18271874
return state.slotSlaveNode(slot)
18281875
}
18291876

osscluster_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,6 +1282,66 @@ var _ = Describe("ClusterClient", func() {
12821282

12831283
assertClusterClient()
12841284
})
1285+
1286+
Describe("ClusterClient with RouteRoundRobinReplicas and ClusterSlots with multiple nodes per slot", func() {
1287+
BeforeEach(func() {
1288+
failover = true
1289+
1290+
opt = redisClusterOptions()
1291+
opt.RouteRoundRobinReplicas = true
1292+
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
1293+
slots := []redis.ClusterSlot{{
1294+
Start: 0,
1295+
End: 4999,
1296+
Nodes: []redis.ClusterNode{{
1297+
Addr: ":8220",
1298+
}, {
1299+
Addr: ":8223",
1300+
}},
1301+
}, {
1302+
Start: 5000,
1303+
End: 9999,
1304+
Nodes: []redis.ClusterNode{{
1305+
Addr: ":8221",
1306+
}, {
1307+
Addr: ":8224",
1308+
}},
1309+
}, {
1310+
Start: 10000,
1311+
End: 16383,
1312+
Nodes: []redis.ClusterNode{{
1313+
Addr: ":8222",
1314+
}, {
1315+
Addr: ":8225",
1316+
}},
1317+
}}
1318+
return slots, nil
1319+
}
1320+
client = cluster.newClusterClient(ctx, opt)
1321+
1322+
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
1323+
return master.FlushDB(ctx).Err()
1324+
})
1325+
Expect(err).NotTo(HaveOccurred())
1326+
1327+
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
1328+
Eventually(func() int64 {
1329+
return client.DBSize(ctx).Val()
1330+
}, 30*time.Second).Should(Equal(int64(0)))
1331+
return nil
1332+
})
1333+
Expect(err).NotTo(HaveOccurred())
1334+
})
1335+
1336+
AfterEach(func() {
1337+
failover = false
1338+
1339+
err := client.Close()
1340+
Expect(err).NotTo(HaveOccurred())
1341+
})
1342+
1343+
assertClusterClient()
1344+
})
12851345
})
12861346

12871347
var _ = Describe("ClusterClient without nodes", func() {

0 commit comments

Comments
 (0)