Skip to content

Commit 77f7f03

Browse files
authored
Add ttlcache package (dapr#80)
* Add `ttlcache` package This implements an in-memory cache with a TTL for automatically expiring records Signed-off-by: ItalyPaleAle <[email protected]> * Add delete method Signed-off-by: ItalyPaleAle <[email protected]> --------- Signed-off-by: ItalyPaleAle <[email protected]>
1 parent abe711e commit 77f7f03

File tree

4 files changed

+279
-0
lines changed

4 files changed

+279
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/dapr/kit
33
go 1.20
44

55
require (
6+
github.com/alphadose/haxmap v1.3.1
67
github.com/cenkalti/backoff/v4 v4.2.1
78
github.com/fsnotify/fsnotify v1.7.0
89
github.com/lestrrat-go/httprc v1.0.4

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/alphadose/haxmap v1.3.1 h1:KmZh75duO1tC8pt3LmUwoTYiZ9sh4K52FX8p7/yrlqU=
2+
github.com/alphadose/haxmap v1.3.1/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
13
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
24
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
35
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

ttlcache/ttlcache.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package ttlcache
15+
16+
import (
17+
"sync/atomic"
18+
"time"
19+
20+
"github.com/alphadose/haxmap"
21+
kclock "k8s.io/utils/clock"
22+
)
23+
24+
// Cache is an efficient cache with a TTL.
25+
type Cache[V any] struct {
26+
m *haxmap.Map[string, cacheEntry[V]]
27+
clock kclock.WithTicker
28+
stopped atomic.Bool
29+
runningCh chan struct{}
30+
stopCh chan struct{}
31+
maxTTL int64
32+
}
33+
34+
// CacheOptions are options for NewCache.
35+
type CacheOptions struct {
36+
// Initial size for the cache.
37+
// This is optional, and if empty will be left to the underlying library to decide.
38+
InitialSize int32
39+
40+
// Interval to perform garbage collection.
41+
// This is optional, and defaults to 150s (2.5 minutes).
42+
CleanupInterval time.Duration
43+
44+
// Maximum TTL value in seconds, if greater than 0
45+
MaxTTL int64
46+
47+
// Internal clock property, used for testing
48+
clock kclock.WithTicker
49+
}
50+
51+
// NewCache returns a new cache with a TTL.
52+
func NewCache[V any](opts CacheOptions) *Cache[V] {
53+
var m *haxmap.Map[string, cacheEntry[V]]
54+
if opts.InitialSize > 0 {
55+
m = haxmap.New[string, cacheEntry[V]](uintptr(opts.InitialSize))
56+
} else {
57+
m = haxmap.New[string, cacheEntry[V]]()
58+
}
59+
60+
if opts.CleanupInterval <= 0 {
61+
opts.CleanupInterval = 150 * time.Second
62+
}
63+
64+
if opts.clock == nil {
65+
opts.clock = kclock.RealClock{}
66+
}
67+
68+
c := &Cache[V]{
69+
m: m,
70+
clock: opts.clock,
71+
maxTTL: opts.MaxTTL,
72+
stopCh: make(chan struct{}),
73+
}
74+
c.startBackgroundCleanup(opts.CleanupInterval)
75+
return c
76+
}
77+
78+
// Get returns an item from the cache.
79+
// Items that have expired are not returned.
80+
func (c *Cache[V]) Get(key string) (v V, ok bool) {
81+
val, ok := c.m.Get(key)
82+
if !ok || !val.exp.After(c.clock.Now()) {
83+
return v, false
84+
}
85+
return val.val, true
86+
}
87+
88+
// Set an item in the cache.
89+
func (c *Cache[V]) Set(key string, val V, ttl int64) {
90+
if ttl <= 0 {
91+
panic("invalid TTL: must be > 0")
92+
}
93+
94+
if c.maxTTL > 0 && ttl > c.maxTTL {
95+
ttl = c.maxTTL
96+
}
97+
98+
exp := c.clock.Now().Add(time.Duration(ttl) * time.Second)
99+
c.m.Set(key, cacheEntry[V]{
100+
val: val,
101+
exp: exp,
102+
})
103+
}
104+
105+
// Delete an item from the cache
106+
func (c *Cache[V]) Delete(key string) {
107+
c.m.Del(key)
108+
}
109+
110+
// Cleanup removes all expired entries from the cache.
111+
func (c *Cache[V]) Cleanup() {
112+
now := c.clock.Now()
113+
114+
// Look for all expired keys and then remove them in bulk
115+
// This is more efficient than removing keys one-by-one
116+
// However, this could lead to a race condition where keys that are updated after ForEach ends are deleted nevertheless.
117+
// This is considered acceptable in this case as this is just a cache.
118+
keys := make([]string, 0, c.m.Len())
119+
c.m.ForEach(func(k string, v cacheEntry[V]) bool {
120+
if v.exp.Before(now) {
121+
keys = append(keys, k)
122+
}
123+
return true
124+
})
125+
126+
c.m.Del(keys...)
127+
}
128+
129+
// Reset removes all entries from the cache.
130+
func (c *Cache[V]) Reset() {
131+
// Look for all keys and then remove them in bulk
132+
// This is more efficient than removing keys one-by-one
133+
// However, this could lead to a race condition where keys that are updated after ForEach ends are deleted nevertheless.
134+
// This is considered acceptable in this case as this is just a cache.
135+
keys := make([]string, 0, c.m.Len())
136+
c.m.ForEach(func(k string, v cacheEntry[V]) bool {
137+
keys = append(keys, k)
138+
return true
139+
})
140+
141+
c.m.Del(keys...)
142+
}
143+
144+
func (c *Cache[V]) startBackgroundCleanup(d time.Duration) {
145+
c.runningCh = make(chan struct{})
146+
go func() {
147+
defer close(c.runningCh)
148+
149+
t := c.clock.NewTicker(d)
150+
defer t.Stop()
151+
for {
152+
select {
153+
case <-c.stopCh:
154+
// Stop the background goroutine
155+
return
156+
case <-t.C():
157+
c.Cleanup()
158+
}
159+
}
160+
}()
161+
}
162+
163+
// Stop the cache, stopping the background garbage collection process.
164+
func (c *Cache[V]) Stop() {
165+
if c.stopped.CompareAndSwap(false, true) {
166+
close(c.stopCh)
167+
}
168+
<-c.runningCh
169+
}
170+
171+
// Each item in the cache is stored in a cacheEntry, which includes the value as well as its expiration time.
172+
type cacheEntry[V any] struct {
173+
val V
174+
exp time.Time
175+
}

ttlcache/ttlcache_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package ttlcache
15+
16+
import (
17+
"runtime"
18+
"testing"
19+
"time"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
clocktesting "k8s.io/utils/clock/testing"
24+
)
25+
26+
func TestCache(t *testing.T) {
27+
clock := &clocktesting.FakeClock{}
28+
clock.SetTime(time.Now())
29+
30+
cache := NewCache[string](CacheOptions{
31+
InitialSize: 10,
32+
CleanupInterval: 20 * time.Second,
33+
MaxTTL: 15,
34+
clock: clock,
35+
})
36+
defer cache.Stop()
37+
38+
// Set values in the cache
39+
cache.Set("key1", "val1", 2)
40+
cache.Set("key2", "val2", 5)
41+
cache.Set("key3", "val3", 30) // Max TTL is 15s
42+
cache.Set("key4", "val4", 5)
43+
44+
// Retrieve values
45+
for i := 0; i < 16; i++ {
46+
v, ok := cache.Get("key1")
47+
if i < 2 {
48+
require.True(t, ok)
49+
require.Equal(t, "val1", v)
50+
} else {
51+
require.False(t, ok)
52+
}
53+
54+
v, ok = cache.Get("key2")
55+
if i < 5 {
56+
require.True(t, ok)
57+
require.Equal(t, "val2", v)
58+
} else {
59+
require.False(t, ok)
60+
}
61+
62+
v, ok = cache.Get("key3")
63+
if i < 15 {
64+
require.True(t, ok)
65+
require.Equal(t, "val3", v)
66+
} else {
67+
require.False(t, ok)
68+
}
69+
70+
v, ok = cache.Get("key4")
71+
if i < 1 {
72+
require.True(t, ok)
73+
require.Equal(t, "val4", v)
74+
75+
// Delete from the cache
76+
cache.Delete("key4")
77+
} else {
78+
require.False(t, ok)
79+
}
80+
81+
// Advance the clock
82+
clock.Step(time.Second)
83+
runtime.Gosched()
84+
time.Sleep(20 * time.Millisecond)
85+
}
86+
87+
// Values should still be in the cache as they haven't been cleaned up yet
88+
require.EqualValues(t, 3, cache.m.Len())
89+
90+
// Advance the clock a bit more to make sure the cleanup runs
91+
clock.Step(5 * time.Second)
92+
93+
runtime.Gosched()
94+
time.Sleep(20 * time.Millisecond)
95+
96+
require.EventuallyWithT(t, func(c *assert.CollectT) {
97+
if !assert.EqualValues(c, 0, cache.m.Len()) {
98+
runtime.Gosched()
99+
}
100+
}, time.Second, 50*time.Millisecond)
101+
}

0 commit comments

Comments
 (0)