Skip to content

Commit a31e941

Browse files
authored
fix: panic on cluster redirection to a node with stale role (#695)
* fix: panic on cluster redirection to a node with stale role Signed-off-by: Rueian <[email protected]> * fix: panic on cluster redirection to a node with stale role Signed-off-by: Rueian <[email protected]> --------- Signed-off-by: Rueian <[email protected]>
1 parent 723761f commit a31e941

File tree

3 files changed

+24
-115
lines changed

3 files changed

+24
-115
lines changed

cluster.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ type clusterClient struct {
3737

3838
// NOTE: connrole and conn must be initialized at the same time
3939
type connrole struct {
40-
conn conn
41-
replica bool
42-
hidden bool
40+
conn conn
41+
hidden bool
42+
//replica bool <- this field is removed because a server may have mixed roles at the same time in the future. https://github.com/valkey-io/valkey/issues/1372
4343
}
4444

4545
func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*clusterClient, error) {
@@ -192,12 +192,14 @@ func (c *clusterClient) _refresh() (err error) {
192192
groups := result.parse(c.opt.TLSConfig != nil)
193193
conns := make(map[string]connrole, len(groups))
194194
for master, g := range groups {
195-
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
196-
for _, addr := range g.nodes[1:] {
197-
if c.rOpt != nil {
198-
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
199-
} else {
200-
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
195+
conns[master] = connrole{conn: c.connFn(master, c.opt)}
196+
if c.rOpt != nil {
197+
for _, addr := range g.nodes[1:] {
198+
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt)}
199+
}
200+
} else {
201+
for _, addr := range g.nodes[1:] {
202+
conns[addr] = connrole{conn: c.connFn(addr, c.opt)}
201203
}
202204
}
203205
}
@@ -215,13 +217,9 @@ func (c *clusterClient) _refresh() (err error) {
215217

216218
c.mu.RLock()
217219
for addr, cc := range c.conns {
218-
fresh, ok := conns[addr]
219-
if ok && (cc.replica == fresh.replica || c.rOpt == nil) {
220-
conns[addr] = connrole{
221-
conn: cc.conn,
222-
replica: fresh.replica,
223-
hidden: fresh.hidden,
224-
}
220+
if fresh, ok := conns[addr]; ok {
221+
fresh.conn = cc.conn
222+
conns[addr] = fresh
225223
} else {
226224
removes = append(removes, cc.conn)
227225
}
@@ -397,9 +395,6 @@ func (c *clusterClient) _pick(slot uint16, toReplica bool) (p conn) {
397395
c.mu.RLock()
398396
if slot == cmds.InitSlot {
399397
for _, cc := range c.conns {
400-
if cc.replica {
401-
continue
402-
}
403398
p = cc.conn
404399
break
405400
}
@@ -434,7 +429,7 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
434429
c.mu.Lock()
435430
if cc = c.conns[addr]; cc.conn == nil {
436431
p := c.connFn(addr, c.opt)
437-
cc = connrole{conn: p, replica: false}
432+
cc = connrole{conn: p}
438433
c.conns[addr] = cc
439434
if mode == RedirectMove {
440435
c.pslots[slot] = p
@@ -448,14 +443,10 @@ func (c *clusterClient) redirectOrNew(addr string, prev conn, slot uint16, mode
448443
prev.Close()
449444
}(prev)
450445
p := c.connFn(addr, c.opt)
451-
cc = connrole{conn: p, replica: cc.replica}
446+
cc = connrole{conn: p}
452447
c.conns[addr] = cc
453-
if mode == RedirectMove {
454-
if cc.replica {
455-
c.rslots[slot] = p
456-
} else {
457-
c.pslots[slot] = p
458-
}
448+
if mode == RedirectMove { // MOVED should always point to the primary.
449+
c.pslots[slot] = p
459450
}
460451
}
461452
c.mu.Unlock()

cluster_test.go

Lines changed: 6 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,6 @@ var slotsResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
3333
}},
3434
}}, nil)
3535

36-
var slotsRespWithChangedRole = newResult(RedisMessage{typ: '*', values: []RedisMessage{
37-
{typ: '*', values: []RedisMessage{
38-
{typ: ':', integer: 0},
39-
{typ: ':', integer: 16383},
40-
{typ: '*', values: []RedisMessage{ // master
41-
{typ: '+', string: "127.0.1.1"},
42-
{typ: ':', integer: 1},
43-
{typ: '+', string: ""},
44-
}},
45-
{typ: '*', values: []RedisMessage{ // replica
46-
{typ: '+', string: "127.0.0.1"},
47-
{typ: ':', integer: 0},
48-
{typ: '+', string: ""},
49-
}},
50-
}},
51-
}}, nil)
52-
5336
var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
5437
{typ: '*', values: []RedisMessage{
5538
{typ: ':', integer: 0},
@@ -1937,13 +1920,6 @@ func TestClusterClient_SendToOnlyPrimaryNodes(t *testing.T) {
19371920
t.Fatalf("unexpected err %v", err)
19381921
}
19391922

1940-
t.Run("Do with no slot", func(t *testing.T) {
1941-
c := client.B().Info().Build()
1942-
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" {
1943-
t.Fatalf("unexpected response %v %v", v, err)
1944-
}
1945-
})
1946-
19471923
t.Run("Do", func(t *testing.T) {
19481924
c := client.B().Get().Key("Do").Build()
19491925
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" {
@@ -2443,13 +2419,6 @@ func TestClusterClient_SendToOnlyReplicaNodes(t *testing.T) {
24432419
t.Fatalf("unexpected err %v", err)
24442420
}
24452421

2446-
t.Run("Do with no slot", func(t *testing.T) {
2447-
c := client.B().Info().Build()
2448-
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" {
2449-
t.Fatalf("unexpected response %v %v", v, err)
2450-
}
2451-
})
2452-
24532422
t.Run("Do", func(t *testing.T) {
24542423
c := client.B().Get().Key("Do").Build()
24552424
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" {
@@ -2647,6 +2616,9 @@ func TestClusterClient_SendToOnlyReplicaNodes(t *testing.T) {
26472616
primaryNodeConn.AcquireFn = func() wire {
26482617
return w
26492618
}
2619+
replicaNodeConn.AcquireFn = func() wire {
2620+
return w
2621+
} // Subscribe can work on replicas
26502622
if err := client.Dedicated(func(c DedicatedClient) error {
26512623
return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {})
26522624
}); err != e {
@@ -2988,13 +2960,6 @@ func TestClusterClient_SendReadOperationToReplicaNodesWriteOperationToPrimaryNod
29882960
t.Fatalf("unexpected err %v", err)
29892961
}
29902962

2991-
t.Run("Do with no slot", func(t *testing.T) {
2992-
c := client.B().Info().Build()
2993-
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "INFO" {
2994-
t.Fatalf("unexpected response %v %v", v, err)
2995-
}
2996-
})
2997-
29982963
t.Run("Do read operation", func(t *testing.T) {
29992964
c := client.B().Get().Key("Do").Build()
30002965
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "GET Do" {
@@ -3241,6 +3206,9 @@ func TestClusterClient_SendReadOperationToReplicaNodesWriteOperationToPrimaryNod
32413206
primaryNodeConn.AcquireFn = func() wire {
32423207
return w
32433208
}
3209+
replicaNodeConn.AcquireFn = func() wire {
3210+
return w
3211+
} // Subscribe can work on replicas
32443212
if err := client.Dedicated(func(c DedicatedClient) error {
32453213
return c.Receive(context.Background(), c.B().Subscribe().Channel("a").Build(), func(msg PubSubMessage) {})
32463214
}); err != e {
@@ -5090,55 +5058,6 @@ func TestClusterTopologyRefreshment(t *testing.T) {
50905058
}
50915059
}
50925060
})
5093-
5094-
t.Run("node role are changed", func(t *testing.T) {
5095-
var callCount int64
5096-
refreshWaitCh := make(chan struct{})
5097-
cli, err := newClusterClient(
5098-
&ClientOption{
5099-
InitAddress: []string{"127.0.0.1:0"},
5100-
ClusterOption: ClusterOption{
5101-
ShardsRefreshInterval: time.Second,
5102-
},
5103-
},
5104-
func(dst string, opt *ClientOption) conn {
5105-
return &mockConn{
5106-
DoFn: func(cmd Completed) RedisResult {
5107-
if c := atomic.AddInt64(&callCount, 1); c >= 6 {
5108-
defer func() { recover() }()
5109-
defer close(refreshWaitCh)
5110-
return slotsRespWithChangedRole
5111-
} else if c >= 3 {
5112-
return slotsRespWithChangedRole
5113-
}
5114-
return slotsResp
5115-
},
5116-
}
5117-
},
5118-
newRetryer(defaultRetryDelayFn),
5119-
)
5120-
if err != nil {
5121-
t.Fatalf("unexpected err %v", err)
5122-
}
5123-
5124-
select {
5125-
case <-refreshWaitCh:
5126-
cli.Close()
5127-
5128-
cli.mu.Lock()
5129-
conns := cli.conns
5130-
cli.mu.Unlock()
5131-
if len(conns) != 2 {
5132-
t.Fatalf("unexpected conns %v", conns)
5133-
}
5134-
if cc, ok := conns["127.0.0.1:0"]; !ok || !cc.replica {
5135-
t.Fatalf("unexpected conns %v", conns)
5136-
}
5137-
if cc, ok := conns["127.0.1.1:1"]; !ok || cc.replica {
5138-
t.Fatalf("unexpected conns %v", conns)
5139-
}
5140-
}
5141-
})
51425061
}
51435062

51445063
func TestClusterClientLoadingRetry(t *testing.T) {

internal/cmds/cmds.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const (
1111
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
1212
unsubTag = uint16(1<<9) | noRetTag
1313
// InitSlot indicates that the command be sent to any redis node in cluster
14-
// When SendToReplicas is set, InitSlot command will be sent to primary node
1514
InitSlot = uint16(1 << 14)
1615
// NoSlot indicates that the command has no key slot specified
1716
NoSlot = uint16(1 << 15)

0 commit comments

Comments
 (0)