Skip to content

Commit 5a0ec2b

Browse files
authored
wait for goroutines to finish (#423)
Fixes #422
1 parent fced5e3 commit 5a0ec2b

File tree

4 files changed

+14
-0
lines changed

4 files changed

+14
-0
lines changed

cache.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type Cache[K Key, V any] struct {
7272
keyToHash func(K) (uint64, uint64)
7373
// stop is used to stop the processItems goroutine.
7474
stop chan struct{}
75+
done chan struct{}
7576
// indicates whether cache is closed.
7677
isClosed atomic.Bool
7778
// cost calculates cost from a value.
@@ -227,6 +228,7 @@ func NewCache[K Key, V any](config *Config[K, V]) (*Cache[K, V], error) {
227228
setBuf: make(chan *Item[V], setBufSize),
228229
keyToHash: config.KeyToHash,
229230
stop: make(chan struct{}),
231+
done: make(chan struct{}),
230232
cost: config.Cost,
231233
ignoreInternalCost: config.IgnoreInternalCost,
232234
cleanupTicker: time.NewTicker(time.Duration(config.TtlTickerDurationInSec) * time.Second / 2),
@@ -422,7 +424,9 @@ func (c *Cache[K, V]) Close() {
422424

423425
// Block until processItems goroutine is returned.
424426
c.stop <- struct{}{}
427+
<-c.done
425428
close(c.stop)
429+
close(c.done)
426430
close(c.setBuf)
427431
c.cachePolicy.Close()
428432
c.cleanupTicker.Stop()
@@ -438,6 +442,7 @@ func (c *Cache[K, V]) Clear() {
438442
}
439443
// Block until processItems goroutine is returned.
440444
c.stop <- struct{}{}
445+
<-c.done
441446

442447
// Clear out the setBuf channel.
443448
loop:
@@ -556,6 +561,7 @@ func (c *Cache[K, V]) processItems() {
556561
case <-c.cleanupTicker.C:
557562
c.storedItems.Cleanup(c.cachePolicy, onEvict)
558563
case <-c.stop:
564+
c.done <- struct{}{}
559565
return
560566
}
561567
}

cache_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ func TestCacheSet(t *testing.T) {
385385
require.Equal(t, 2, val)
386386

387387
c.stop <- struct{}{}
388+
<-c.done
388389
for i := 0; i < setBufSize; i++ {
389390
key, conflict := z.KeyToHash(1)
390391
c.setBuf <- &Item[int]{
@@ -399,6 +400,7 @@ func TestCacheSet(t *testing.T) {
399400
require.Equal(t, uint64(1), c.Metrics.SetsDropped())
400401
close(c.setBuf)
401402
close(c.stop)
403+
close(c.done)
402404

403405
c = nil
404406
require.False(t, c.Set(1, 1, 1))

policy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type defaultPolicy[V any] struct {
4040
evict *sampledLFU
4141
itemsCh chan []uint64
4242
stop chan struct{}
43+
done chan struct{}
4344
isClosed bool
4445
metrics *Metrics
4546
}
@@ -50,6 +51,7 @@ func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
5051
evict: newSampledLFU(maxCost),
5152
itemsCh: make(chan []uint64, 3),
5253
stop: make(chan struct{}),
54+
done: make(chan struct{}),
5355
}
5456
go p.processItems()
5557
return p
@@ -73,6 +75,7 @@ func (p *defaultPolicy[V]) processItems() {
7375
p.admit.Push(items)
7476
p.Unlock()
7577
case <-p.stop:
78+
p.done <- struct{}{}
7679
return
7780
}
7881
}
@@ -226,7 +229,9 @@ func (p *defaultPolicy[V]) Close() {
226229

227230
// Block until the p.processItems goroutine returns.
228231
p.stop <- struct{}{}
232+
<-p.done
229233
close(p.stop)
234+
close(p.done)
230235
close(p.itemsCh)
231236
p.isClosed = true
232237
}

policy_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func TestPolicyProcessItems(t *testing.T) {
3131
p.Unlock()
3232

3333
p.stop <- struct{}{}
34+
<-p.done
3435
p.itemsCh <- []uint64{3, 3, 3}
3536
time.Sleep(wait)
3637
p.Lock()

0 commit comments

Comments
 (0)