Skip to content

Commit 2df7ee7

Browse files
authored
Merge branch 'master' into dependabot/github_actions/rojopolis/spellcheck-github-actions-0.45.0
2 parents 7ba1f6a + f1ffb55 commit 2df7ee7

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

osscluster.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
"github.com/redis/go-redis/v9/internal/rand"
2222
)
2323

24+
const (
25+
minLatencyMeasurementInterval = 10 * time.Second
26+
)
27+
2428
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
2529

2630
// ClusterOptions are used to configure a cluster client and should be
@@ -316,6 +320,10 @@ type clusterNode struct {
316320
latency uint32 // atomic
317321
generation uint32 // atomic
318322
failing uint32 // atomic
323+
324+
// last time the latency measurement was performed for the node, stored in nanoseconds
325+
// from epoch
326+
lastLatencyMeasurement int64 // atomic
319327
}
320328

321329
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -368,6 +376,7 @@ func (n *clusterNode) updateLatency() {
368376
latency = float64(dur) / float64(successes)
369377
}
370378
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
379+
n.SetLastLatencyMeasurement(time.Now())
371380
}
372381

373382
func (n *clusterNode) Latency() time.Duration {
@@ -397,6 +406,10 @@ func (n *clusterNode) Generation() uint32 {
397406
return atomic.LoadUint32(&n.generation)
398407
}
399408

409+
func (n *clusterNode) LastLatencyMeasurement() int64 {
410+
return atomic.LoadInt64(&n.lastLatencyMeasurement)
411+
}
412+
400413
func (n *clusterNode) SetGeneration(gen uint32) {
401414
for {
402415
v := atomic.LoadUint32(&n.generation)
@@ -406,6 +419,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
406419
}
407420
}
408421

422+
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
423+
for {
424+
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
425+
if t.UnixNano() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.UnixNano()) {
426+
break
427+
}
428+
}
429+
}
430+
409431
//------------------------------------------------------------------------------
410432

411433
type clusterNodes struct {
@@ -493,10 +515,11 @@ func (c *clusterNodes) GC(generation uint32) {
493515
c.mu.Lock()
494516

495517
c.activeAddrs = c.activeAddrs[:0]
518+
now := time.Now()
496519
for addr, node := range c.nodes {
497520
if node.Generation() >= generation {
498521
c.activeAddrs = append(c.activeAddrs, addr)
499-
if c.opt.RouteByLatency {
522+
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).UnixNano() {
500523
go node.updateLatency()
501524
}
502525
continue

0 commit comments

Comments
 (0)