Skip to content

Commit 1492628

Browse files
authored
Merge pull request #2157 from kavu/fix/ring_heartbeat_leak
fix: provide a signal channel to end heartbeat goroutine
2 parents 6343311 + 20d0ca2 commit 1492628

File tree

1 file changed

+30
-20
lines changed

1 file changed

+30
-20
lines changed

ring.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -310,25 +310,29 @@ func (c *ringShards) Random() (*ringShard, error) {
310310
}
311311

312312
// Heartbeat monitors state of each shard in the ring.
313-
func (c *ringShards) Heartbeat(frequency time.Duration) {
313+
func (c *ringShards) Heartbeat(ctx context.Context, frequency time.Duration) {
314314
ticker := time.NewTicker(frequency)
315315
defer ticker.Stop()
316316

317-
ctx := context.Background()
318-
for range ticker.C {
319-
var rebalance bool
320-
321-
for _, shard := range c.List() {
322-
err := shard.Client.Ping(ctx).Err()
323-
isUp := err == nil || err == pool.ErrPoolTimeout
324-
if shard.Vote(isUp) {
325-
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", shard)
326-
rebalance = true
317+
for {
318+
select {
319+
case <-ticker.C:
320+
var rebalance bool
321+
322+
for _, shard := range c.List() {
323+
err := shard.Client.Ping(ctx).Err()
324+
isUp := err == nil || err == pool.ErrPoolTimeout
325+
if shard.Vote(isUp) {
326+
internal.Logger.Printf(ctx, "ring shard state changed: %s", shard)
327+
rebalance = true
328+
}
327329
}
328-
}
329330

330-
if rebalance {
331-
c.rebalance()
331+
if rebalance {
332+
c.rebalance()
333+
}
334+
case <-ctx.Done():
335+
return
332336
}
333337
}
334338
}
@@ -387,9 +391,10 @@ func (c *ringShards) Close() error {
387391
//------------------------------------------------------------------------------
388392

389393
type ring struct {
390-
opt *RingOptions
391-
shards *ringShards
392-
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
394+
opt *RingOptions
395+
shards *ringShards
396+
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
397+
heartbeatCancelFn context.CancelFunc
393398
}
394399

395400
// Ring is a Redis client that uses consistent hashing to distribute
@@ -415,17 +420,20 @@ type Ring struct {
415420
func NewRing(opt *RingOptions) *Ring {
416421
opt.init()
417422

423+
hbCtx, hbCancel := context.WithCancel(context.Background())
424+
418425
ring := Ring{
419426
ring: &ring{
420-
opt: opt,
421-
shards: newRingShards(opt),
427+
opt: opt,
428+
shards: newRingShards(opt),
429+
heartbeatCancelFn: hbCancel,
422430
},
423431
}
424432

425433
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
426434
ring.cmdable = ring.Process
427435

428-
go ring.shards.Heartbeat(opt.HeartbeatFrequency)
436+
go ring.shards.Heartbeat(hbCtx, opt.HeartbeatFrequency)
429437

430438
return &ring
431439
}
@@ -713,5 +721,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
713721
// It is rare to Close a Ring, as the Ring is meant to be long-lived
714722
// and shared between many goroutines.
715723
func (c *Ring) Close() error {
724+
c.heartbeatCancelFn()
725+
716726
return c.shards.Close()
717727
}

0 commit comments

Comments
 (0)