Skip to content
Merged
56 changes: 40 additions & 16 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,52 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}

var (
masterAddr string
wg sync.WaitGroup
once sync.Once
done = make(chan struct{})
)

for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
wg.Add(1)
go func(i int, addr string) {
defer wg.Done()
select {
case <-done:
return
default:
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
addr, c.opt.MasterName, err)
_ = sentinelCli.Close()
return
}

masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
_ = sentinel.Close()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
once.Do(func() {
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
// Push working sentinel to the top
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinelCli)
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
close(done)
})
}
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
continue
}
}(i, sentinelAddr)
}

// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
go func() {
wg.Wait()
once.Do(func() { close(done) })
}()

addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
}
<-done

if masterAddr != "" {
return masterAddr, nil
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
}

Expand Down
Loading