Skip to content

Commit f032c12

Browse files
committed
fix: provide a signal channel to end heartbeat goroutine
1 parent e061db8 commit f032c12

File tree

1 file changed

+29
-19
lines changed

1 file changed

+29
-19
lines changed

ring.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -310,25 +310,30 @@ func (c *ringShards) Random() (*ringShard, error) {
310310
}
311311

312312
// Heartbeat monitors state of each shard in the ring.
313-
func (c *ringShards) Heartbeat(frequency time.Duration) {
313+
func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) {
314314
ticker := time.NewTicker(frequency)
315315
defer ticker.Stop()
316316

317317
ctx := context.Background()
318-
for range ticker.C {
319-
var rebalance bool
320-
321-
for _, shard := range c.List() {
322-
err := shard.Client.Ping(ctx).Err()
323-
isUp := err == nil || err == pool.ErrPoolTimeout
324-
if shard.Vote(isUp) {
325-
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
326-
rebalance = true
318+
for {
319+
select {
320+
case <-ticker.C:
321+
var rebalance bool
322+
323+
for _, shard := range c.List() {
324+
err := shard.Client.Ping(ctx).Err()
325+
isUp := err == nil || err == pool.ErrPoolTimeout
326+
if shard.Vote(isUp) {
327+
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
328+
rebalance = true
329+
}
327330
}
328-
}
329331

330-
if rebalance {
331-
c.rebalance()
332+
if rebalance {
333+
c.rebalance()
334+
}
335+
case <-closeCh:
336+
return
332337
}
333338
}
334339
}
@@ -387,9 +392,10 @@ func (c *ringShards) Close() error {
387392
//------------------------------------------------------------------------------
388393

389394
type ring struct {
390-
opt *RingOptions
391-
shards *ringShards
392-
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
395+
opt *RingOptions
396+
shards *ringShards
397+
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
398+
hearbeatCloseSignal chan struct{}
393399
}
394400

395401
// Ring is a Redis client that uses consistent hashing to distribute
@@ -415,17 +421,20 @@ type Ring struct {
415421
func NewRing(opt *RingOptions) *Ring {
416422
opt.init()
417423

424+
hearbeatCloseSignal := make(chan struct{})
425+
418426
ring := Ring{
419427
ring: &ring{
420-
opt: opt,
421-
shards: newRingShards(opt),
428+
opt: opt,
429+
shards: newRingShards(opt),
430+
hearbeatCloseSignal: hearbeatCloseSignal,
422431
},
423432
}
424433

425434
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
426435
ring.cmdable = ring.Process
427436

428-
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
437+
go ring.shards.Heartbeat(opt.HeartbeatFrequency, hearbeatCloseSignal)
429438

430439
return &ring
431440
}
@@ -713,5 +722,6 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
713722
// It is rare to Close a Ring, as the Ring is meant to be long-lived
714723
// and shared between many goroutines.
715724
func (c *Ring) Close() error {
725+
close(c.hearbeatCloseSignal)
716726
return c.shards.Close()
717727
}

0 commit comments

Comments
 (0)