@@ -310,11 +310,10 @@ func (c *ringShards) Random() (*ringShard, error) {
310
310
}
311
311
312
312
// Heartbeat monitors state of each shard in the ring.
313
- func (c * ringShards ) Heartbeat (frequency time. Duration , closeCh chan struct {} ) {
313
+ func (c * ringShards ) Heartbeat (ctx context. Context , frequency time. Duration ) {
314
314
ticker := time .NewTicker (frequency )
315
315
defer ticker .Stop ()
316
316
317
- ctx := context .Background ()
318
317
for {
319
318
select {
320
319
case <- ticker .C :
@@ -332,7 +331,7 @@ func (c *ringShards) Heartbeat(frequency time.Duration, closeCh chan struct{}) {
332
331
if rebalance {
333
332
c .rebalance ()
334
333
}
335
- case <- closeCh :
334
+ case <- ctx . Done () :
336
335
return
337
336
}
338
337
}
@@ -392,10 +391,10 @@ func (c *ringShards) Close() error {
392
391
//------------------------------------------------------------------------------
393
392
394
393
type ring struct {
395
- opt * RingOptions
396
- shards * ringShards
397
- cmdsInfoCache * cmdsInfoCache //nolint:structcheck
398
- hearbeatCloseSignal chan struct {}
394
+ opt * RingOptions
395
+ shards * ringShards
396
+ cmdsInfoCache * cmdsInfoCache //nolint:structcheck
397
+ heartbeatCancelFn context. CancelFunc
399
398
}
400
399
401
400
// Ring is a Redis client that uses consistent hashing to distribute
@@ -421,20 +420,20 @@ type Ring struct {
421
420
func NewRing (opt * RingOptions ) * Ring {
422
421
opt .init ()
423
422
424
- hearbeatCloseSignal := make ( chan struct {} )
423
+ hbCtx , hbCancel := context . WithCancel ( context . Background () )
425
424
426
425
ring := Ring {
427
426
ring : & ring {
428
- opt : opt ,
429
- shards : newRingShards (opt ),
430
- hearbeatCloseSignal : hearbeatCloseSignal ,
427
+ opt : opt ,
428
+ shards : newRingShards (opt ),
429
+ heartbeatCancelFn : hbCancel ,
431
430
},
432
431
}
433
432
434
433
ring .cmdsInfoCache = newCmdsInfoCache (ring .cmdsInfo )
435
434
ring .cmdable = ring .Process
436
435
437
- go ring .shards .Heartbeat (opt .HeartbeatFrequency , hearbeatCloseSignal )
436
+ go ring .shards .Heartbeat (hbCtx , opt .HeartbeatFrequency )
438
437
439
438
return & ring
440
439
}
@@ -722,6 +721,7 @@ func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) er
722
721
// It is rare to Close a Ring, as the Ring is meant to be long-lived
723
722
// and shared between many goroutines.
724
723
func (c * Ring ) Close () error {
725
- close (c .hearbeatCloseSignal )
724
+ c .heartbeatCancelFn ()
725
+
726
726
return c .shards .Close ()
727
727
}
0 commit comments