-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhypercache_expiration.go
More file actions
149 lines (121 loc) · 3.86 KB
/
hypercache_expiration.go
File metadata and controls
149 lines (121 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package hypercache
import (
"context"
"time"
"github.com/hyp3rd/hypercache/internal/constants"
"github.com/hyp3rd/hypercache/pkg/backend"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)
// startExpirationRoutine launches the expiration loop and listens to manual triggers and stop signals.
func (hyperCache *HyperCache[T]) startExpirationRoutine(ctx context.Context) {
go func() {
var tick *time.Ticker
if hyperCache.expirationInterval > 0 {
tick = time.NewTicker(hyperCache.expirationInterval)
}
for {
if hyperCache.handleExpirationSelect(ctx, tick) { // returns true when loop should exit
return
}
}
}()
}
// handleExpirationSelect processes one select iteration; returns true if caller should exit.
func (hyperCache *HyperCache[T]) handleExpirationSelect(ctx context.Context, tick *time.Ticker) bool {
var tickC <-chan time.Time
if tick != nil {
tickC = tick.C
}
select {
case <-tickC:
// scheduled expiration
hyperCache.expirationLoop(ctx)
case <-hyperCache.expirationTriggerCh:
// manual/coalesced trigger
hyperCache.expirationLoop(ctx)
hyperCache.expirationSignalPending.Store(false)
// drain any queued triggers quickly
for draining := true; draining; {
select {
case <-hyperCache.expirationTriggerCh:
// keep draining
default:
draining = false
}
}
case <-hyperCache.evictCh:
// manual eviction trigger
hyperCache.evictionLoop(ctx)
case <-ctx.Done():
if tick != nil {
tick.Stop()
}
return true
case <-hyperCache.stop:
if tick != nil {
tick.Stop()
}
return true
}
return false
}
// execTriggerExpiration coalesces and optionally debounces expiration triggers to avoid flooding the channel.
func (hyperCache *HyperCache[T]) execTriggerExpiration() {
// Optional debounce: if configured, drop triggers that arrive within the interval.
if d := hyperCache.expirationDebounceInterval; d > 0 {
last := time.Unix(0, hyperCache.lastExpirationTrigger.Load())
if time.Since(last) < d {
// record backpressure metric
hyperCache.StatsCollector.Incr(constants.StatIncr, 1)
return
}
}
// Coalesce: if a signal is already pending, skip enqueueing another.
if hyperCache.expirationSignalPending.Swap(true) {
hyperCache.StatsCollector.Incr(constants.StatIncr, 1)
return
}
select {
case hyperCache.expirationTriggerCh <- true:
hyperCache.lastExpirationTrigger.Store(time.Now().UnixNano())
default:
// channel full; keep pending flag set and record metric
hyperCache.StatsCollector.Incr(constants.StatIncr, 1)
}
}
// expirationLoop runs in a worker goroutine and removes expired items from the cache.
func (hyperCache *HyperCache[T]) expirationLoop(ctx context.Context) {
hyperCache.workerPool.Enqueue(func() error {
hyperCache.StatsCollector.Incr("expiration_loop_count", 1)
defer hyperCache.StatsCollector.Timing("expiration_loop_duration", time.Now().UnixNano())
var (
expiredCount int64
items []*cache.Item
err error
)
// get all expired items
items, err = hyperCache.List(ctx,
backend.WithSortBy(constants.SortByExpiration.String()),
backend.WithFilterFunc(func(item *cache.Item) bool {
return item.Expiration > 0 && time.Since(item.LastAccess) > item.Expiration
}))
if err != nil {
return err
}
// iterate all expired items and remove them
for _, item := range items {
expiredCount++
err := hyperCache.Remove(ctx, item.Key)
if err != nil {
return err
}
hyperCache.itemPoolManager.Put(item)
hyperCache.StatsCollector.Incr("item_expired_count", 1)
}
hyperCache.StatsCollector.Gauge("item_count", int64(hyperCache.backend.Count(ctx)))
hyperCache.StatsCollector.Gauge("expired_item_count", expiredCount)
return nil
})
}
// TriggerExpiration exposes a manual expiration trigger (debounced/coalesced internally).
func (hyperCache *HyperCache[T]) TriggerExpiration() { hyperCache.execTriggerExpiration() }