Skip to content

Commit 7158a8d

Browse files
strobilMykhailo Alipaofekshenawandyakov
authored
feat(ring): specify custom health check func via HeartbeatFn option (#2940)
* specify custom health check func via ShardHealthCheckFn option * ShardHealthCheckFn renamed to HeartbeatFn --------- Co-authored-by: Mykhailo Alipa <[email protected]> Co-authored-by: ofekshenawa <[email protected]> Co-authored-by: Nedyalko Dyakov <[email protected]> Co-authored-by: Nedyalko Dyakov <[email protected]>
1 parent 8d15d03 commit 7158a8d

File tree

1 file changed

+16
-3
lines changed

1 file changed

+16
-3
lines changed

ring.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import (
2424

2525
var errRingShardsDown = errors.New("redis: all ring shards are down")
2626

27+
// defaultHeartbeatFn is the default function used to check the shard liveness
28+
var defaultHeartbeatFn = func(ctx context.Context, client *Client) bool {
29+
err := client.Ping(ctx).Err()
30+
return err == nil || err == pool.ErrPoolTimeout
31+
}
32+
2733
//------------------------------------------------------------------------------
2834

2935
type ConsistentHash interface {
@@ -56,10 +62,14 @@ type RingOptions struct {
5662
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
5763
ClientName string
5864

59-
// Frequency of PING commands sent to check shards availability.
65+
// Frequency of executing HeartbeatFn to check shards availability.
6066
// Shard is considered down after 3 subsequent failed checks.
6167
HeartbeatFrequency time.Duration
6268

69+
// A function used to check the shard liveness
70+
// if not set, defaults to defaultHeartbeatFn
71+
HeartbeatFn func(ctx context.Context, client *Client) bool
72+
6373
// NewConsistentHash returns a consistent hash that is used
6474
// to distribute keys across the shards.
6575
//
@@ -157,6 +167,10 @@ func (opt *RingOptions) init() {
157167
opt.HeartbeatFrequency = 500 * time.Millisecond
158168
}
159169

170+
if opt.HeartbeatFn == nil {
171+
opt.HeartbeatFn = defaultHeartbeatFn
172+
}
173+
160174
if opt.NewConsistentHash == nil {
161175
opt.NewConsistentHash = newRendezvous
162176
}
@@ -474,8 +488,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
474488

475489
// note: `c.List()` return a shadow copy of `[]*ringShard`.
476490
for _, shard := range c.List() {
477-
err := shard.Client.Ping(ctx).Err()
478-
isUp := err == nil || err == pool.ErrPoolTimeout
491+
isUp := c.opt.HeartbeatFn(ctx, shard.Client)
479492
if shard.Vote(isUp) {
480493
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
481494
rebalance = true

0 commit comments

Comments
 (0)