Skip to content

Commit 8b19c31

Browse files
authored
Make FailoverClient(sentinel mode) more robust (#1655)
* add failover option AllowDisconnectedSlaves * checkout unrelated changes * Make NewFailoverClient (sentinel mode) robust. * fix lint issue * Fix bug * Fix lint issue * add comment * checkout unrelated changes * Refine code * checkout unrelated changes
1 parent f594401 commit 8b19c31

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

internal/rand/rand.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,8 @@ func (s *source) Seed(seed int64) {
4343
s.src.Seed(seed)
4444
s.mu.Unlock()
4545
}
46+
47+
// Shuffle pseudo-randomizes the order of elements.
48+
// n is the number of elements.
49+
// swap swaps the elements with indexes i and j.
50+
func Shuffle(n int, swap func(i, j int)) { pseudo.Shuffle(n, swap) }

sentinel.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ type FailoverOptions struct {
3636
// Route all commands to slave read-only nodes.
3737
SlaveOnly bool
3838

39+
// Use slaves disconnected with master when cannot get connected slaves
40+
// Now, this option only works in RandomSlaveAddr function.
41+
UseDisconnectedSlaves bool
42+
43+
// Client queries sentinels in a random order
44+
QuerySentinelRandomly bool
3945
// Following options are copied from Options struct.
4046

4147
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
@@ -433,10 +439,22 @@ func (c *sentinelFailover) closeSentinel() error {
433439
}
434440

435441
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
436-
addresses, err := c.slaveAddrs(ctx)
442+
if c.opt == nil {
443+
return "", errors.New("opt is nil")
444+
}
445+
446+
addresses, err := c.slaveAddrs(ctx, false)
437447
if err != nil {
438448
return "", err
439449
}
450+
451+
if len(addresses) == 0 && c.opt.UseDisconnectedSlaves {
452+
addresses, err = c.slaveAddrs(ctx, true)
453+
if err != nil {
454+
return "", err
455+
}
456+
}
457+
440458
if len(addresses) == 0 {
441459
return c.MasterAddr(ctx)
442460
}
@@ -466,6 +484,11 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
466484
_ = c.closeSentinel()
467485
}
468486

487+
if c.opt.QuerySentinelRandomly {
488+
rand.Shuffle(len(c.sentinelAddrs), func(i, j int) {
489+
c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i]
490+
})
491+
}
469492
for i, sentinelAddr := range c.sentinelAddrs {
470493
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
471494

@@ -488,7 +511,7 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
488511
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
489512
}
490513

491-
func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
514+
func (c *sentinelFailover) slaveAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
492515
c.mu.RLock()
493516
sentinel := c.sentinel
494517
c.mu.RUnlock()
@@ -510,6 +533,13 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
510533
}
511534
_ = c.closeSentinel()
512535
}
536+
if c.opt.QuerySentinelRandomly {
537+
rand.Shuffle(len(c.sentinelAddrs), func(i, j int) {
538+
c.sentinelAddrs[i], c.sentinelAddrs[j] = c.sentinelAddrs[j], c.sentinelAddrs[i]
539+
})
540+
}
541+
542+
var sentinelReachable bool
513543

514544
for i, sentinelAddr := range c.sentinelAddrs {
515545
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
@@ -521,15 +551,21 @@ func (c *sentinelFailover) slaveAddrs(ctx context.Context) ([]string, error) {
521551
_ = sentinel.Close()
522552
continue
523553
}
524-
554+
sentinelReachable = true
555+
addrs := parseSlaveAddrs(slaves, useDisconnected)
556+
if len(addrs) == 0 {
557+
continue
558+
}
525559
// Push working sentinel to the top.
526560
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
527561
c.setSentinel(ctx, sentinel)
528562

529-
addrs := parseSlaveAddrs(slaves)
530563
return addrs, nil
531564
}
532565

566+
if sentinelReachable {
567+
return []string{}, nil
568+
}
533569
return []string{}, errors.New("redis: all sentinels specified in configuration are unreachable")
534570
}
535571

@@ -550,12 +586,11 @@ func (c *sentinelFailover) getSlaveAddrs(ctx context.Context, sentinel *Sentinel
550586
c.opt.MasterName, err)
551587
return []string{}
552588
}
553-
return parseSlaveAddrs(addrs)
589+
return parseSlaveAddrs(addrs, false)
554590
}
555591

556-
func parseSlaveAddrs(addrs []interface{}) []string {
592+
func parseSlaveAddrs(addrs []interface{}, keepDisconnected bool) []string {
557593
nodes := make([]string, 0, len(addrs))
558-
559594
for _, node := range addrs {
560595
ip := ""
561596
port := ""
@@ -577,8 +612,12 @@ func parseSlaveAddrs(addrs []interface{}) []string {
577612

578613
for _, flag := range flags {
579614
switch flag {
580-
case "s_down", "o_down", "disconnected":
615+
case "s_down", "o_down":
581616
isDown = true
617+
case "disconnected":
618+
if !keepDisconnected {
619+
isDown = true
620+
}
582621
}
583622
}
584623

@@ -705,7 +744,7 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
705744
Addr: masterAddr,
706745
}}
707746

708-
slaveAddrs, err := failover.slaveAddrs(ctx)
747+
slaveAddrs, err := failover.slaveAddrs(ctx, false)
709748
if err != nil {
710749
return nil, err
711750
}

0 commit comments

Comments
 (0)