Skip to content

Commit 29524d2

Browse files
authored
feat: replica selector (#692)
* feat: add reader node selector * doc: add comments * fix: wrong error message * test: add send read-only command to reader node * refactor: use sendtoreplicas and replicaselector * test: remove duplicated cases * test: remove cases * perf: introduce nodes
1 parent 383a2a1 commit 29524d2

File tree

4 files changed

+1454
-33
lines changed

4 files changed

+1454
-33
lines changed

cluster.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
var ErrNoSlot = errors.New("the slot has no redis node")
1919
var ErrReplicaOnlyConflict = errors.New("ReplicaOnly conflicts with SendToReplicas option")
2020
var ErrInvalidShardsRefreshInterval = errors.New("ShardsRefreshInterval must be greater than or equal to 0")
21+
var ErrReplicaOnlyConflictWithReplicaSelector = errors.New("ReplicaOnly conflicts with ReplicaSelector option")
22+
var ErrSendToReplicasNotSet = errors.New("SendToReplicas must be set when ReplicaSelector is set")
2123

2224
type clusterClient struct {
2325
pslots [16384]conn
@@ -42,6 +44,10 @@ type connrole struct {
4244
//replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372
4345
}
4446

47+
var replicaOnlySelector = func(_ uint16, replicas []ReplicaInfo) int {
48+
return util.FastRand(len(replicas))
49+
}
50+
4551
func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) {
4652
client := &clusterClient{
4753
cmd: cmds.NewBuilder(cmds.InitSlot),
@@ -56,6 +62,16 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*
5662
if opt.ReplicaOnly && opt.SendToReplicas != nil {
5763
return nil, ErrReplicaOnlyConflict
5864
}
65+
if opt.ReplicaOnly && opt.ReplicaSelector != nil {
66+
return nil, ErrReplicaOnlyConflictWithReplicaSelector
67+
}
68+
if opt.ReplicaSelector != nil && opt.SendToReplicas == nil {
69+
return nil, ErrSendToReplicasNotSet
70+
}
71+
72+
if opt.SendToReplicas != nil && opt.ReplicaSelector == nil {
73+
opt.ReplicaSelector = replicaOnlySelector
74+
}
5975

6076
if opt.SendToReplicas != nil {
6177
rOpt := *opt
@@ -194,12 +210,12 @@ func (c *clusterClient) _refresh() (err error) {
194210
for master, g := range groups {
195211
conns[master] = connrole{conn: c.connFn(master, c.opt)}
196212
if c.rOpt != nil {
197-
for _, addr := range g.nodes[1:] {
198-
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)}
213+
for _, nodeInfo := range g.nodes[1:] {
214+
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.rOpt)}
199215
}
200216
} else {
201-
for _, addr := range g.nodes[1:] {
202-
conns[addr] = connrole{conn: c.connFn(addr, c.opt)}
217+
for _, nodeInfo := range g.nodes[1:] {
218+
conns[nodeInfo.Addr] = connrole{conn: c.connFn(nodeInfo.Addr, c.opt)}
203219
}
204220
}
205221
}
@@ -234,18 +250,25 @@ func (c *clusterClient) _refresh() (err error) {
234250
nodesCount := len(g.nodes)
235251
for _, slot := range g.slots {
236252
for i := slot[0]; i <= slot[1]; i++ {
237-
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn
253+
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)].Addr].conn
238254
}
239255
}
240-
case c.rOpt != nil: // implies c.opt.SendToReplicas != nil
256+
case c.rOpt != nil:
241257
if len(rslots) == 0 { // lazy init
242258
rslots = make([]conn, 16384)
243259
}
244260
if len(g.nodes) > 1 {
261+
n := len(g.nodes) - 1
245262
for _, slot := range g.slots {
246263
for i := slot[0]; i <= slot[1]; i++ {
247264
pslots[i] = conns[master].conn
248-
rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn
265+
266+
rIndex := c.opt.ReplicaSelector(uint16(i), g.nodes[1:])
267+
if rIndex >= 0 && rIndex < n {
268+
rslots[i] = conns[g.nodes[1+rIndex].Addr].conn
269+
} else {
270+
rslots[i] = conns[master].conn
271+
}
249272
}
250273
}
251274
} else {
@@ -297,8 +320,10 @@ func (c *clusterClient) nodes() []string {
297320
return nodes
298321
}
299322

323+
type nodes []ReplicaInfo
324+
300325
type group struct {
301-
nodes []string
326+
nodes nodes
302327
slots [][2]int64
303328
}
304329

@@ -324,10 +349,10 @@ func parseSlots(slots RedisMessage, defaultAddr string) map[string]group {
324349
g, ok := groups[master]
325350
if !ok {
326351
g.slots = make([][2]int64, 0)
327-
g.nodes = make([]string, 0, len(v.values)-2)
352+
g.nodes = make(nodes, 0, len(v.values)-2)
328353
for i := 2; i < len(v.values); i++ {
329354
if dst := parseEndpoint(defaultAddr, v.values[i].values[0].string, v.values[i].values[1].integer); dst != "" {
330-
g.nodes = append(g.nodes, dst)
355+
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
331356
}
332357
}
333358
}
@@ -345,16 +370,16 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
345370
m := -1
346371
shard, _ := v.AsMap()
347372
slots := shard["slots"].values
348-
nodes := shard["nodes"].values
373+
_nodes := shard["nodes"].values
349374
g := group{
350-
nodes: make([]string, 0, len(nodes)),
375+
nodes: make(nodes, 0, len(_nodes)),
351376
slots: make([][2]int64, len(slots)/2),
352377
}
353378
for i := range g.slots {
354379
g.slots[i][0], _ = slots[i*2].AsInt64()
355380
g.slots[i][1], _ = slots[i*2+1].AsInt64()
356381
}
357-
for _, n := range nodes {
382+
for _, n := range _nodes {
358383
dict, _ := n.AsMap()
359384
if dict["health"].string != "online" {
360385
continue
@@ -367,12 +392,12 @@ func parseShards(shards RedisMessage, defaultAddr string, tls bool) map[string]g
367392
if dict["role"].string == "master" {
368393
m = len(g.nodes)
369394
}
370-
g.nodes = append(g.nodes, dst)
395+
g.nodes = append(g.nodes, ReplicaInfo{Addr: dst})
371396
}
372397
}
373398
if m >= 0 {
374399
g.nodes[0], g.nodes[m] = g.nodes[m], g.nodes[0]
375-
groups[g.nodes[0]] = g
400+
groups[g.nodes[0].Addr] = g
376401
}
377402
}
378403
return groups
@@ -1078,15 +1103,15 @@ func (c *clusterClient) Dedicate() (DedicatedClient, func()) {
10781103

10791104
func (c *clusterClient) Nodes() map[string]Client {
10801105
c.mu.RLock()
1081-
nodes := make(map[string]Client, len(c.conns))
1106+
_nodes := make(map[string]Client, len(c.conns))
10821107
disableCache := c.opt != nil && c.opt.DisableCache
10831108
for addr, cc := range c.conns {
10841109
if !cc.hidden {
1085-
nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
1110+
_nodes[addr] = newSingleClientWithConn(cc.conn, c.cmd, c.retry, disableCache, c.retryHandler)
10861111
}
10871112
}
10881113
c.mu.RUnlock()
1089-
return nodes
1114+
return _nodes
10901115
}
10911116

10921117
func (c *clusterClient) Close() {

0 commit comments

Comments
 (0)