Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (

var errRingShardsDown = errors.New("redis: all ring shards are down")

// defaultHeartbeatFn is the default function used to check the shard liveness
var defaultHeartbeatFn = func(ctx context.Context, client *Client) bool {
err := client.Ping(ctx).Err()
return err == nil || err == pool.ErrPoolTimeout
}

//------------------------------------------------------------------------------

type ConsistentHash interface {
Expand Down Expand Up @@ -56,10 +62,14 @@ type RingOptions struct {
// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
ClientName string

// Frequency of PING commands sent to check shards availability.
// Frequency of executing HeartbeatFn to check shards availability.
// Shard is considered down after 3 subsequent failed checks.
HeartbeatFrequency time.Duration

// A function used to check the shard liveness
// if not set, defaults to defaultHeartbeatFn
HeartbeatFn func(ctx context.Context, client *Client) bool

// NewConsistentHash returns a consistent hash that is used
// to distribute keys across the shards.
//
Expand Down Expand Up @@ -157,6 +167,10 @@ func (opt *RingOptions) init() {
opt.HeartbeatFrequency = 500 * time.Millisecond
}

if opt.HeartbeatFn == nil {
opt.HeartbeatFn = defaultHeartbeatFn
}

if opt.NewConsistentHash == nil {
opt.NewConsistentHash = newRendezvous
}
Expand Down Expand Up @@ -474,8 +488,7 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {

// note: `c.List()` return a shadow copy of `[]*ringShard`.
for _, shard := range c.List() {
err := shard.Client.Ping(ctx).Err()
isUp := err == nil || err == pool.ErrPoolTimeout
isUp := c.opt.HeartbeatFn(ctx, shard.Client)
if shard.Vote(isUp) {
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
rebalance = true
Expand Down
Loading