@@ -22,12 +22,20 @@ import (
22
22
23
23
var errRingShardsDown = errors .New ("redis: all ring shards are down" )
24
24
25
+ // defaultShardHealthCheckFn is the default function used to check the shard liveness
26
+ var defaultShardHealthCheckFn = func (ctx context.Context , client * Client ) bool {
27
+ err := client .Ping (ctx ).Err ()
28
+ return err == nil || err == pool .ErrPoolTimeout
29
+ }
30
+
25
31
//------------------------------------------------------------------------------
26
32
27
33
type ConsistentHash interface {
28
34
Get (string ) string
29
35
}
30
36
37
+ type ShardHealthCheckFn func (ctx context.Context , client * Client ) bool
38
+
31
39
type rendezvousWrapper struct {
32
40
* rendezvous.Rendezvous
33
41
}
@@ -54,10 +62,14 @@ type RingOptions struct {
54
62
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
55
63
ClientName string
56
64
57
- // Frequency of PING commands sent to check shards availability.
65
+ // Frequency of executing ShardHealthCheckFn to check shards availability.
58
66
// Shard is considered down after 3 subsequent failed checks.
59
67
HeartbeatFrequency time.Duration
60
68
69
+ // A function used to check the shard liveness
70
+ // if not set, defaults to defaultShardHealthCheckFn
71
+ ShardHealthCheckFn ShardHealthCheckFn
72
+
61
73
// NewConsistentHash returns a consistent hash that is used
62
74
// to distribute keys across the shards.
63
75
//
@@ -113,6 +125,10 @@ func (opt *RingOptions) init() {
113
125
opt .HeartbeatFrequency = 500 * time .Millisecond
114
126
}
115
127
128
+ if opt .ShardHealthCheckFn == nil {
129
+ opt .ShardHealthCheckFn = defaultShardHealthCheckFn
130
+ }
131
+
116
132
if opt .NewConsistentHash == nil {
117
133
opt .NewConsistentHash = newRendezvous
118
134
}
@@ -408,8 +424,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
408
424
var rebalance bool
409
425
410
426
for _ , shard := range c .List () {
411
- err := shard .Client .Ping (ctx ).Err ()
412
- isUp := err == nil || err == pool .ErrPoolTimeout
427
+ isUp := c .opt .ShardHealthCheckFn (ctx , shard .Client )
413
428
if shard .Vote (isUp ) {
414
429
internal .Logger .Printf (ctx , "ring shard state changed: %s" , shard )
415
430
rebalance = true
0 commit comments