Skip to content

Commit 7545393

Browse files
committed
Only check latencies once every 10 seconds with routeByLatency
`routeByLatency` currently checks latencies any time a server returns a MOVED or READONLY reply. When a shard is down, the ClusterClient chooses to issue the request to a random server, which returns a MOVED reply. This causes a state refresh and a latency update on all servers. This can lead to significant ping load to clusters with a large number of clients. This introduces logic to ping only once every 10 seconds, only performing a latency update on a node during the `GC` function if the latency was set later than 10 seconds ago. Fixes #2782
1 parent 21bd40a commit 7545393

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

osscluster.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import (
2121
"github.com/redis/go-redis/v9/internal/rand"
2222
)
2323

24+
const (
25+
minLatencyMeasurementInterval = 10 * time.Second
26+
)
27+
2428
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
2529

2630
// ClusterOptions are used to configure a cluster client and should be
@@ -309,6 +313,8 @@ type clusterNode struct {
309313
latency uint32 // atomic
310314
generation uint32 // atomic
311315
failing uint32 // atomic
316+
317+
lastLatencyMeasurement int64
312318
}
313319

314320
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -359,6 +365,7 @@ func (n *clusterNode) updateLatency() {
359365
latency = float64(dur) / float64(successes)
360366
}
361367
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
368+
n.SetLastLatencyMeasurement(time.Now())
362369
}
363370

364371
func (n *clusterNode) Latency() time.Duration {
@@ -388,6 +395,10 @@ func (n *clusterNode) Generation() uint32 {
388395
return atomic.LoadUint32(&n.generation)
389396
}
390397

398+
func (n *clusterNode) LastLatencyMeasurement() int64 {
399+
return atomic.LoadInt64(&n.lastLatencyMeasurement)
400+
}
401+
391402
func (n *clusterNode) SetGeneration(gen uint32) {
392403
for {
393404
v := atomic.LoadUint32(&n.generation)
@@ -397,6 +408,15 @@ func (n *clusterNode) SetGeneration(gen uint32) {
397408
}
398409
}
399410

411+
func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
412+
for {
413+
v := atomic.LoadInt64(&n.lastLatencyMeasurement)
414+
if t.Unix() < v || atomic.CompareAndSwapInt64(&n.lastLatencyMeasurement, v, t.Unix()) {
415+
break
416+
}
417+
}
418+
}
419+
400420
//------------------------------------------------------------------------------
401421

402422
type clusterNodes struct {
@@ -484,10 +504,11 @@ func (c *clusterNodes) GC(generation uint32) {
484504
c.mu.Lock()
485505

486506
c.activeAddrs = c.activeAddrs[:0]
507+
now := time.Now()
487508
for addr, node := range c.nodes {
488509
if node.Generation() >= generation {
489510
c.activeAddrs = append(c.activeAddrs, addr)
490-
if c.opt.RouteByLatency {
511+
if c.opt.RouteByLatency && node.LastLatencyMeasurement() < now.Add(-minLatencyMeasurementInterval).Unix() {
491512
go node.updateLatency()
492513
}
493514
continue

0 commit comments

Comments
 (0)