diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ef48cef..c477eef5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ adhere to [Semantic Versioning](http://semver.org/spec/v2.0.0.html) starting v1. - Remove dependency: github.com/pkg/errors (#443) - Add public Cache.RemainingCost() method +- Implement public Cache.Iter() method +- Make tests concurrent **Fixed** diff --git a/cache.go b/cache.go index c224fe2a..44cd4825 100644 --- a/cache.go +++ b/cache.go @@ -25,8 +25,6 @@ var ( setBufSize = 32 * 1024 ) -const itemSize = int64(unsafe.Sizeof(storeItem[any]{})) - func zeroValue[T any]() T { var zero T return zero @@ -40,19 +38,19 @@ type Key = z.Key // from as many goroutines as you want. type Cache[K Key, V any] struct { // storedItems is the central concurrent hashmap where key-value items are stored. - storedItems store[V] + storedItems store[K, V] // cachePolicy determines what gets let in to the cache and what gets kicked out. - cachePolicy *defaultPolicy[V] + cachePolicy *defaultPolicy[K, V] // getBuf is a custom ring buffer implementation that gets pushed to when // keys are read. getBuf *ringBuffer // setBuf is a buffer allowing us to batch/drop Sets during times of high // contention. - setBuf chan *Item[V] + setBuf chan *Item[K, V] // onEvict is called for item evictions. - onEvict func(*Item[V]) + onEvict func(*Item[K, V]) // onReject is called when an item is rejected via admission policy. - onReject func(*Item[V]) + onReject func(*Item[K, V]) // onExit is called whenever a value goes out of scope from the cache. onExit (func(V)) // KeyToHash function is used to customize the key hashing algorithm. @@ -74,6 +72,8 @@ type Cache[K Key, V any] struct { // Metrics contains a running log of important statistics like hits, misses, // and dropped items. Metrics *Metrics + // itemSize is size of each item im store + itemSize int64 } // Config is passed to NewCache for creating new Cache instances. @@ -127,10 +127,10 @@ type Config[K Key, V any] struct { Metrics bool // OnEvict is called for every eviction with the evicted item. - OnEvict func(item *Item[V]) + OnEvict func(item *Item[K, V]) // OnReject is called for every rejection done via the policy. - OnReject func(item *Item[V]) + OnReject func(item *Item[K, V]) // OnExit is called whenever a value is removed from cache. This can be // used to do manual memory deallocation. Would also be called on eviction @@ -191,14 +191,15 @@ const ( ) // Item is a full representation of what's stored in the cache for each key-value pair. -type Item[V any] struct { - flag itemFlag - Key uint64 - Conflict uint64 - Value V - Cost int64 - Expiration time.Time - wait chan struct{} +type Item[K Key, V any] struct { + flag itemFlag + Key uint64 + OriginalKey K + Conflict uint64 + Value V + Cost int64 + Expiration time.Time + wait chan struct{} } // NewCache returns a new Cache instance and any configuration errors, if any. @@ -219,18 +220,19 @@ func NewCache[K Key, V any](config *Config[K, V]) (*Cache[K, V], error) { case config.TtlTickerDurationInSec == 0: config.TtlTickerDurationInSec = bucketDurationSecs } - policy := newPolicy[V](config.NumCounters, config.MaxCost) + policy := newPolicy[K, V](config.NumCounters, config.MaxCost) cache := &Cache[K, V]{ - storedItems: newStore[V](), + storedItems: newStore[K, V](), cachePolicy: policy, getBuf: newRingBuffer(policy, config.BufferItems), - setBuf: make(chan *Item[V], setBufSize), + setBuf: make(chan *Item[K, V], setBufSize), keyToHash: config.KeyToHash, stop: make(chan struct{}), done: make(chan struct{}), cost: config.Cost, ignoreInternalCost: config.IgnoreInternalCost, cleanupTicker: time.NewTicker(time.Duration(config.TtlTickerDurationInSec) * time.Second / 2), + itemSize: int64(unsafe.Sizeof(storeItem[K, V]{})), } cache.storedItems.SetShouldUpdateFn(config.ShouldUpdate) cache.onExit = func(val V) { @@ -238,13 +240,13 @@ func NewCache[K Key, V any](config *Config[K, V]) (*Cache[K, V], error) { config.OnExit(val) } } - cache.onEvict = func(item *Item[V]) { + cache.onEvict = func(item *Item[K, V]) { if config.OnEvict != nil { config.OnEvict(item) } cache.onExit(item.Value) } - cache.onReject = func(item *Item[V]) { + cache.onReject = func(item *Item[K, V]) { if config.OnReject != nil { config.OnReject(item) } @@ -271,7 +273,7 @@ func (c *Cache[K, V]) Wait() { return } wait := make(chan struct{}) - c.setBuf <- &Item[V]{wait: wait} + c.setBuf <- &Item[K, V]{wait: wait} <-wait } @@ -337,13 +339,14 @@ func (c *Cache[K, V]) SetWithTTL(key K, value V, cost int64, ttl time.Duration) } keyHash, conflictHash := c.keyToHash(key) - i := &Item[V]{ - flag: itemNew, - Key: keyHash, - Conflict: conflictHash, - Value: value, - Cost: cost, - Expiration: expiration, + i := &Item[K, V]{ + flag: itemNew, + Key: keyHash, + OriginalKey: key, + Conflict: conflictHash, + Value: value, + Cost: cost, + Expiration: expiration, } // cost is eventually updated. The expiration must also be immediately updated // to prevent items from being prematurely removed from the map. @@ -380,7 +383,7 @@ func (c *Cache[K, V]) Del(key K) { // So we must push the same item to `setBuf` with the deletion flag. // This ensures that if a set is followed by a delete, it will be // applied in the correct order. - c.setBuf <- &Item[V]{ + c.setBuf <- &Item[K, V]{ flag: itemDelete, Key: keyHash, Conflict: conflictHash, @@ -414,6 +417,13 @@ func (c *Cache[K, V]) GetTTL(key K) (time.Duration, bool) { return time.Until(expiration), true } +// Iter iterates the elements of the Map, passing them to the callback. +// It guarantees that any key in the Map will be visited only once. +// The set of keys visited by Iter is non-deterministic. +func (c *Cache[K, V]) Iter(cb func(k K, v V) (stop bool)) { + c.storedItems.Iter(cb) +} + // Close stops all goroutines and closes all channels. func (c *Cache[K, V]) Close() { if c == nil || c.isClosed.Load() { @@ -516,7 +526,7 @@ func (c *Cache[K, V]) processItems() { } } } - onEvict := func(i *Item[V]) { + onEvict := func(i *Item[K, V]) { if ts, has := startTs[i.Key]; has { c.Metrics.trackEviction(int64(time.Since(ts) / time.Second)) delete(startTs, i.Key) @@ -539,7 +549,7 @@ func (c *Cache[K, V]) processItems() { } if !c.ignoreInternalCost { // Add the cost of internally storing the object. - i.Cost += itemSize + i.Cost += c.itemSize } switch i.flag { diff --git a/cache_test.go b/cache_test.go index e2c9c789..c97be85c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -22,6 +22,8 @@ import ( var wait = time.Millisecond * 10 func TestCacheKeyToHash(t *testing.T) { + t.Parallel() + keyToHashCount := 0 c, err := NewCache(&Config[int, int]{ NumCounters: 10, @@ -45,6 +47,8 @@ func TestCacheKeyToHash(t *testing.T) { } func TestCacheMaxCost(t *testing.T) { + t.Parallel() + charset := "abcdefghijklmnopqrstuvwxyz0123456789" key := func() []byte { k := make([]byte, 2) @@ -96,6 +100,8 @@ func TestCacheMaxCost(t *testing.T) { } func TestUpdateMaxCost(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 10, MaxCost: 10, @@ -122,6 +128,8 @@ func TestUpdateMaxCost(t *testing.T) { } func TestRemainingCost(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 10, MaxCost: 100, @@ -136,7 +144,7 @@ func TestRemainingCost(t *testing.T) { _, ok := c.Get(1) require.True(t, ok) - expectedUsed := 1 + itemSize + expectedUsed := 1 + c.itemSize require.Equal(t, c.MaxCost()-expectedUsed, c.RemainingCost()) // Set rejected due to exceeding capacity @@ -155,6 +163,8 @@ func TestRemainingCost(t *testing.T) { } func TestNewCache(t *testing.T) { + t.Parallel() + _, err := NewCache(&Config[int, int]{ NumCounters: 0, }) @@ -184,6 +194,8 @@ func TestNewCache(t *testing.T) { } func TestNilCache(t *testing.T) { + t.Parallel() + var c *Cache[int, int] val, ok := c.Get(1) require.False(t, ok) @@ -196,6 +208,8 @@ func TestNilCache(t *testing.T) { } func TestMultipleClose(t *testing.T) { + t.Parallel() + var c *Cache[int, int] c.Close() @@ -212,6 +226,8 @@ func TestMultipleClose(t *testing.T) { } func TestSetAfterClose(t *testing.T) { + t.Parallel() + c, err := newTestCache() require.NoError(t, err) require.NotNil(t, c) @@ -221,6 +237,8 @@ func TestSetAfterClose(t *testing.T) { } func TestClearAfterClose(t *testing.T) { + t.Parallel() + c, err := newTestCache() require.NoError(t, err) require.NotNil(t, c) @@ -230,6 +248,8 @@ func TestClearAfterClose(t *testing.T) { } func TestGetAfterClose(t *testing.T) { + t.Parallel() + c, err := newTestCache() require.NoError(t, err) require.NotNil(t, c) @@ -242,6 +262,8 @@ func TestGetAfterClose(t *testing.T) { } func TestDelAfterClose(t *testing.T) { + t.Parallel() + c, err := newTestCache() require.NoError(t, err) require.NotNil(t, c) @@ -253,6 +275,8 @@ func TestDelAfterClose(t *testing.T) { } func TestCacheProcessItems(t *testing.T) { + t.Parallel() + m := &sync.Mutex{} evicted := make(map[uint64]struct{}) c, err := NewCache(&Config[int, int]{ @@ -263,7 +287,7 @@ func TestCacheProcessItems(t *testing.T) { Cost: func(value int) int64 { return int64(value) }, - OnEvict: func(item *Item[int]) { + OnEvict: func(item *Item[int, int]) { m.Lock() defer m.Unlock() evicted[item.Key] = struct{}{} @@ -275,30 +299,32 @@ func TestCacheProcessItems(t *testing.T) { var conflict uint64 key, conflict = z.KeyToHash(1) - c.setBuf <- &Item[int]{ - flag: itemNew, - Key: key, - Conflict: conflict, - Value: 1, - Cost: 0, + c.setBuf <- &Item[int, int]{ + flag: itemNew, + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, + Cost: 0, } time.Sleep(wait) require.True(t, c.cachePolicy.Has(1)) require.Equal(t, int64(1), c.cachePolicy.Cost(1)) key, conflict = z.KeyToHash(1) - c.setBuf <- &Item[int]{ - flag: itemUpdate, - Key: key, - Conflict: conflict, - Value: 2, - Cost: 0, + c.setBuf <- &Item[int, int]{ + flag: itemUpdate, + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 2, + Cost: 0, } time.Sleep(wait) require.Equal(t, int64(2), c.cachePolicy.Cost(1)) key, conflict = z.KeyToHash(1) - c.setBuf <- &Item[int]{ + c.setBuf <- &Item[int, int]{ flag: itemDelete, Key: key, Conflict: conflict, @@ -311,36 +337,40 @@ func TestCacheProcessItems(t *testing.T) { require.False(t, c.cachePolicy.Has(1)) key, conflict = z.KeyToHash(2) - c.setBuf <- &Item[int]{ - flag: itemNew, - Key: key, - Conflict: conflict, - Value: 2, - Cost: 3, + c.setBuf <- &Item[int, int]{ + flag: itemNew, + Key: key, + OriginalKey: 2, + Conflict: conflict, + Value: 2, + Cost: 3, } key, conflict = z.KeyToHash(3) - c.setBuf <- &Item[int]{ - flag: itemNew, - Key: key, - Conflict: conflict, - Value: 3, - Cost: 3, + c.setBuf <- &Item[int, int]{ + flag: itemNew, + Key: key, + OriginalKey: 3, + Conflict: conflict, + Value: 3, + Cost: 3, } key, conflict = z.KeyToHash(4) - c.setBuf <- &Item[int]{ - flag: itemNew, - Key: key, - Conflict: conflict, - Value: 3, - Cost: 3, + c.setBuf <- &Item[int, int]{ + flag: itemNew, + Key: key, + OriginalKey: 4, + Conflict: conflict, + Value: 3, + Cost: 3, } key, conflict = z.KeyToHash(5) - c.setBuf <- &Item[int]{ - flag: itemNew, - Key: key, - Conflict: conflict, - Value: 3, - Cost: 5, + c.setBuf <- &Item[int, int]{ + flag: itemNew, + Key: key, + OriginalKey: 5, + Conflict: conflict, + Value: 3, + Cost: 5, } time.Sleep(wait) m.Lock() @@ -351,10 +381,12 @@ func TestCacheProcessItems(t *testing.T) { require.NotNil(t, recover()) }() c.Close() - c.setBuf <- &Item[int]{flag: itemNew} + c.setBuf <- &Item[int, int]{flag: itemNew} } func TestCacheGet(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -365,10 +397,11 @@ func TestCacheGet(t *testing.T) { require.NoError(t, err) key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } c.storedItems.Set(&i) val, ok := c.Get(1) @@ -388,6 +421,44 @@ func TestCacheGet(t *testing.T) { require.Zero(t, val) } +func TestCacheSetIter(t *testing.T) { + t.Parallel() + + c, err := NewCache(&Config[string, int]{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + IgnoreInternalCost: true, + Metrics: true, + }) + require.NoError(t, err) + + expectedValues := map[string]int{ + "a": 1, + "b": 2, + "c": 3, + "d": 4, + } + for k, v := range expectedValues { + key, conflict := z.KeyToHash(k) + i := Item[string, int]{ + Key: key, + OriginalKey: k, + Conflict: conflict, + Value: v, + } + c.storedItems.Set(&i) + } + + resultMap := make(map[string]int) + c.Iter(func(k string, v int) (stop bool) { + resultMap[k] = v + return false + }) + + require.Equal(t, expectedValues, resultMap) +} + // retrySet calls SetWithTTL until the item is accepted by the cache. func retrySet(t *testing.T, c *Cache[int, int], key, value int, cost int64, ttl time.Duration) { for { @@ -406,6 +477,8 @@ func retrySet(t *testing.T, c *Cache[int, int], key, value int, cost int64, ttl } func TestCacheSet(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -426,12 +499,13 @@ func TestCacheSet(t *testing.T) { <-c.done for i := 0; i < setBufSize; i++ { key, conflict := z.KeyToHash(1) - c.setBuf <- &Item[int]{ - flag: itemUpdate, - Key: key, - Conflict: conflict, - Value: 1, - Cost: 1, + c.setBuf <- &Item[int, int]{ + flag: itemUpdate, + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, + Cost: 1, } } require.False(t, c.Set(2, 2, 1)) @@ -445,6 +519,8 @@ func TestCacheSet(t *testing.T) { } func TestCacheInternalCost(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -462,6 +538,8 @@ func TestCacheInternalCost(t *testing.T) { } func TestRecacheWithTTL(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -504,6 +582,8 @@ func TestRecacheWithTTL(t *testing.T) { } func TestCacheSetWithTTL(t *testing.T) { + t.Parallel() + m := &sync.Mutex{} evicted := make(map[uint64]struct{}) c, err := NewCache(&Config[int, int]{ @@ -512,7 +592,7 @@ func TestCacheSetWithTTL(t *testing.T) { IgnoreInternalCost: true, BufferItems: 64, Metrics: true, - OnEvict: func(item *Item[int]) { + OnEvict: func(item *Item[int, int]) { m.Lock() defer m.Unlock() evicted[item.Key] = struct{}{} @@ -555,6 +635,8 @@ func TestCacheSetWithTTL(t *testing.T) { } func TestCacheDel(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -580,6 +662,8 @@ func TestCacheDel(t *testing.T) { } func TestCacheDelWithTTL(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -598,6 +682,8 @@ func TestCacheDelWithTTL(t *testing.T) { } func TestCacheGetTTL(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -663,6 +749,8 @@ func TestCacheGetTTL(t *testing.T) { } func TestCacheClear(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -689,6 +777,8 @@ func TestCacheClear(t *testing.T) { } func TestCacheMetrics(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -707,10 +797,14 @@ func TestCacheMetrics(t *testing.T) { } func TestMetrics(t *testing.T) { + t.Parallel() + newMetrics() } func TestNilMetrics(t *testing.T) { + t.Parallel() + var m *Metrics for _, f := range []func() uint64{ m.Hits, @@ -728,6 +822,8 @@ func TestNilMetrics(t *testing.T) { } func TestMetricsAddGet(t *testing.T) { + t.Parallel() + m := newMetrics() m.add(hit, 1, 1) m.add(hit, 2, 2) @@ -740,6 +836,8 @@ func TestMetricsAddGet(t *testing.T) { } func TestMetricsRatio(t *testing.T) { + t.Parallel() + m := newMetrics() require.Equal(t, float64(0), m.Ratio()) @@ -754,6 +852,8 @@ func TestMetricsRatio(t *testing.T) { } func TestMetricsString(t *testing.T) { + t.Parallel() + m := newMetrics() m.add(hit, 1, 1) m.add(miss, 1, 1) @@ -788,6 +888,8 @@ func TestMetricsString(t *testing.T) { } func TestCacheMetricsClear(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -821,6 +923,8 @@ func init() { } func TestBlockOnClear(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 100, MaxCost: 10, @@ -853,6 +957,8 @@ func TestBlockOnClear(t *testing.T) { // Regression test for bug https://github.com/hypermodeinc/ristretto/issues/167 func TestDropUpdates(t *testing.T) { + t.Parallel() + originalSetBufSize := setBufSize defer func() { setBufSize = originalSetBufSize }() @@ -886,7 +992,7 @@ func TestDropUpdates(t *testing.T) { BufferItems: 64, IgnoreInternalCost: true, Metrics: true, - OnEvict: func(item *Item[string]) { + OnEvict: func(item *Item[int, string]) { handler(nil, item.Value) }, }) @@ -922,6 +1028,8 @@ func TestDropUpdates(t *testing.T) { } func TestRistrettoCalloc(t *testing.T) { + t.Parallel() + maxCacheSize := 1 << 20 config := &Config[int, []byte]{ // Use 5% of cache memory for storing counters. @@ -962,6 +1070,8 @@ func TestRistrettoCalloc(t *testing.T) { } func TestRistrettoCallocTTL(t *testing.T) { + t.Parallel() + maxCacheSize := 1 << 20 config := &Config[int, []byte]{ // Use 5% of cache memory for storing counters. @@ -1011,6 +1121,8 @@ func newTestCache() (*Cache[int, int], error) { } func TestCacheWithTTL(t *testing.T) { + t.Parallel() + // There may be a race condition, so run the test multiple times. const try = 10 diff --git a/policy.go b/policy.go index bff4ba89..6c18926f 100644 --- a/policy.go +++ b/policy.go @@ -19,11 +19,11 @@ const ( lfuSample = 5 ) -func newPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] { - return newDefaultPolicy[V](numCounters, maxCost) +func newPolicy[K Key, V any](numCounters, maxCost int64) *defaultPolicy[K, V] { + return newDefaultPolicy[K, V](numCounters, maxCost) } -type defaultPolicy[V any] struct { +type defaultPolicy[K Key, V any] struct { sync.Mutex admit *tinyLFU evict *sampledLFU @@ -34,8 +34,8 @@ type defaultPolicy[V any] struct { metrics *Metrics } -func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] { - p := &defaultPolicy[V]{ +func newDefaultPolicy[K Key, V any](numCounters, maxCost int64) *defaultPolicy[K, V] { + p := &defaultPolicy[K, V]{ admit: newTinyLFU(numCounters), evict: newSampledLFU(maxCost), itemsCh: make(chan []uint64, 3), @@ -46,7 +46,7 @@ func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] { return p } -func (p *defaultPolicy[V]) CollectMetrics(metrics *Metrics) { +func (p *defaultPolicy[K, V]) CollectMetrics(metrics *Metrics) { p.metrics = metrics p.evict.metrics = metrics } @@ -56,7 +56,7 @@ type policyPair struct { cost int64 } -func (p *defaultPolicy[V]) processItems() { +func (p *defaultPolicy[K, V]) processItems() { for { select { case items := <-p.itemsCh: @@ -70,7 +70,7 @@ func (p *defaultPolicy[V]) processItems() { } } -func (p *defaultPolicy[V]) Push(keys []uint64) bool { +func (p *defaultPolicy[K, V]) Push(keys []uint64) bool { if p.isClosed { return false } @@ -92,7 +92,7 @@ func (p *defaultPolicy[V]) Push(keys []uint64) bool { // Add decides whether the item with the given key and cost should be accepted by // the policy. It returns the list of victims that have been evicted and a boolean // indicating whether the incoming item should be accepted. -func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) { +func (p *defaultPolicy[K, V]) Add(key uint64, cost int64) ([]*Item[K, V], bool) { p.Lock() defer p.Unlock() @@ -126,7 +126,7 @@ func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) { // O(lg N). sample := make([]*policyPair, 0, lfuSample) // As items are evicted they will be appended to victims. - victims := make([]*Item[V], 0) + victims := make([]*Item[K, V], 0) // Delete victims until there's enough space or a minKey is found that has // more hits than incoming item. @@ -156,7 +156,7 @@ func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) { sample[minId] = sample[len(sample)-1] sample = sample[:len(sample)-1] // Store victim in evicted victims slice. - victims = append(victims, &Item[V]{ + victims = append(victims, &Item[K, V]{ Key: minKey, Conflict: 0, Cost: minCost, @@ -168,33 +168,33 @@ func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) { return victims, true } -func (p *defaultPolicy[V]) Has(key uint64) bool { +func (p *defaultPolicy[K, V]) Has(key uint64) bool { p.Lock() _, exists := p.evict.keyCosts[key] p.Unlock() return exists } -func (p *defaultPolicy[V]) Del(key uint64) { +func (p *defaultPolicy[K, V]) Del(key uint64) { p.Lock() p.evict.del(key) p.Unlock() } -func (p *defaultPolicy[V]) Cap() int64 { +func (p *defaultPolicy[K, V]) Cap() int64 { p.Lock() capacity := p.evict.getMaxCost() - p.evict.used p.Unlock() return capacity } -func (p *defaultPolicy[V]) Update(key uint64, cost int64) { +func (p *defaultPolicy[K, V]) Update(key uint64, cost int64) { p.Lock() p.evict.updateIfHas(key, cost) p.Unlock() } -func (p *defaultPolicy[V]) Cost(key uint64) int64 { +func (p *defaultPolicy[K, V]) Cost(key uint64) int64 { p.Lock() if cost, found := p.evict.keyCosts[key]; found { p.Unlock() @@ -204,14 +204,14 @@ func (p *defaultPolicy[V]) Cost(key uint64) int64 { return -1 } -func (p *defaultPolicy[V]) Clear() { +func (p *defaultPolicy[K, V]) Clear() { p.Lock() p.admit.clear() p.evict.clear() p.Unlock() } -func (p *defaultPolicy[V]) Close() { +func (p *defaultPolicy[K, V]) Close() { if p.isClosed { return } @@ -225,14 +225,14 @@ func (p *defaultPolicy[V]) Close() { p.isClosed = true } -func (p *defaultPolicy[V]) MaxCost() int64 { +func (p *defaultPolicy[K, V]) MaxCost() int64 { if p == nil || p.evict == nil { return 0 } return p.evict.getMaxCost() } -func (p *defaultPolicy[V]) UpdateMaxCost(maxCost int64) { +func (p *defaultPolicy[K, V]) UpdateMaxCost(maxCost int64) { if p == nil || p.evict == nil { return } diff --git a/policy_test.go b/policy_test.go index db6abdbb..8eab4b0c 100644 --- a/policy_test.go +++ b/policy_test.go @@ -13,21 +13,27 @@ import ( ) func TestPolicy(t *testing.T) { + t.Parallel() + defer func() { require.Nil(t, recover()) }() - newPolicy[int](100, 10) + newPolicy[int, int](100, 10) } func TestPolicyMetrics(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.CollectMetrics(newMetrics()) require.NotNil(t, p.metrics) require.NotNil(t, p.evict.metrics) } func TestPolicyProcessItems(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.itemsCh <- []uint64{1, 2, 2} time.Sleep(wait) p.Lock() @@ -45,7 +51,9 @@ func TestPolicyProcessItems(t *testing.T) { } func TestPolicyPush(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) require.True(t, p.Push([]uint64{})) keepCount := 0 @@ -58,7 +66,9 @@ func TestPolicyPush(t *testing.T) { } func TestPolicyAdd(t *testing.T) { - p := newDefaultPolicy[int](1000, 100) + t.Parallel() + + p := newDefaultPolicy[int, int](1000, 100) if victims, added := p.Add(1, 101); victims != nil || added { t.Fatal("can't add an item bigger than entire cache") } @@ -87,14 +97,18 @@ func TestPolicyAdd(t *testing.T) { } func TestPolicyHas(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) require.True(t, p.Has(1)) require.False(t, p.Has(2)) } func TestPolicyDel(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) p.Del(1) p.Del(2) @@ -103,13 +117,17 @@ func TestPolicyDel(t *testing.T) { } func TestPolicyCap(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) require.Equal(t, int64(9), p.Cap()) } func TestPolicyUpdate(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) p.Update(1, 2) p.Lock() @@ -118,14 +136,18 @@ func TestPolicyUpdate(t *testing.T) { } func TestPolicyCost(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 2) require.Equal(t, int64(2), p.Cost(1)) require.Equal(t, int64(-1), p.Cost(2)) } func TestPolicyClear(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) p.Add(2, 2) p.Add(3, 3) @@ -137,29 +159,37 @@ func TestPolicyClear(t *testing.T) { } func TestPolicyClose(t *testing.T) { + t.Parallel() + defer func() { require.NotNil(t, recover()) }() - p := newDefaultPolicy[int](100, 10) + p := newDefaultPolicy[int, int](100, 10) p.Add(1, 1) p.Close() p.itemsCh <- []uint64{1} } func TestPushAfterClose(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Close() require.False(t, p.Push([]uint64{1, 2})) } func TestAddAfterClose(t *testing.T) { - p := newDefaultPolicy[int](100, 10) + t.Parallel() + + p := newDefaultPolicy[int, int](100, 10) p.Close() p.Add(1, 1) } func TestSampledLFUAdd(t *testing.T) { + t.Parallel() + e := newSampledLFU(4) e.add(1, 1) e.add(2, 2) @@ -169,6 +199,8 @@ func TestSampledLFUAdd(t *testing.T) { } func TestSampledLFUDel(t *testing.T) { + t.Parallel() + e := newSampledLFU(4) e.add(1, 1) e.add(2, 2) @@ -180,6 +212,8 @@ func TestSampledLFUDel(t *testing.T) { } func TestSampledLFUUpdate(t *testing.T) { + t.Parallel() + e := newSampledLFU(4) e.add(1, 1) require.True(t, e.updateIfHas(1, 2)) @@ -188,6 +222,8 @@ func TestSampledLFUUpdate(t *testing.T) { } func TestSampledLFUClear(t *testing.T) { + t.Parallel() + e := newSampledLFU(4) e.add(1, 1) e.add(2, 2) @@ -198,6 +234,8 @@ func TestSampledLFUClear(t *testing.T) { } func TestSampledLFURoom(t *testing.T) { + t.Parallel() + e := newSampledLFU(16) e.add(1, 1) e.add(2, 2) @@ -206,6 +244,8 @@ func TestSampledLFURoom(t *testing.T) { } func TestSampledLFUSample(t *testing.T) { + t.Parallel() + e := newSampledLFU(16) e.add(4, 4) e.add(5, 5) @@ -226,6 +266,8 @@ func TestSampledLFUSample(t *testing.T) { } func TestTinyLFUIncrement(t *testing.T) { + t.Parallel() + a := newTinyLFU(4) a.Increment(1) a.Increment(1) @@ -239,6 +281,8 @@ func TestTinyLFUIncrement(t *testing.T) { } func TestTinyLFUEstimate(t *testing.T) { + t.Parallel() + a := newTinyLFU(8) a.Increment(1) a.Increment(1) @@ -248,6 +292,8 @@ func TestTinyLFUEstimate(t *testing.T) { } func TestTinyLFUPush(t *testing.T) { + t.Parallel() + a := newTinyLFU(16) a.Push([]uint64{1, 2, 2, 3, 3, 3}) require.Equal(t, int64(1), a.Estimate(1)) @@ -257,6 +303,8 @@ func TestTinyLFUPush(t *testing.T) { } func TestTinyLFUClear(t *testing.T) { + t.Parallel() + a := newTinyLFU(16) a.Push([]uint64{1, 3, 3, 3}) a.clear() diff --git a/ring_test.go b/ring_test.go index ae236456..2271e0b8 100644 --- a/ring_test.go +++ b/ring_test.go @@ -26,6 +26,8 @@ func (c *testConsumer) Push(items []uint64) bool { } func TestRingDrain(t *testing.T) { + t.Parallel() + drains := 0 r := newRingBuffer(&testConsumer{ push: func(items []uint64) { @@ -40,6 +42,8 @@ func TestRingDrain(t *testing.T) { } func TestRingReset(t *testing.T) { + t.Parallel() + drains := 0 r := newRingBuffer(&testConsumer{ push: func(items []uint64) { @@ -54,6 +58,8 @@ func TestRingReset(t *testing.T) { } func TestRingConsumer(t *testing.T) { + t.Parallel() + mu := &sync.Mutex{} drainItems := make(map[uint64]struct{}) r := newRingBuffer(&testConsumer{ diff --git a/sketch_test.go b/sketch_test.go index 8e6a9477..f3875500 100644 --- a/sketch_test.go +++ b/sketch_test.go @@ -12,6 +12,8 @@ import ( ) func TestSketch(t *testing.T) { + t.Parallel() + defer func() { require.NotNil(t, recover()) }() @@ -22,6 +24,8 @@ func TestSketch(t *testing.T) { } func TestSketchIncrement(t *testing.T) { + t.Parallel() + s := newCmSketch(16) s.Increment(1) s.Increment(5) @@ -35,6 +39,8 @@ func TestSketchIncrement(t *testing.T) { } func TestSketchEstimate(t *testing.T) { + t.Parallel() + s := newCmSketch(16) s.Increment(1) s.Increment(1) @@ -43,6 +49,8 @@ func TestSketchEstimate(t *testing.T) { } func TestSketchReset(t *testing.T) { + t.Parallel() + s := newCmSketch(16) s.Increment(1) s.Increment(1) @@ -53,6 +61,8 @@ func TestSketchReset(t *testing.T) { } func TestSketchClear(t *testing.T) { + t.Parallel() + s := newCmSketch(16) for i := 0; i < 16; i++ { s.Increment(uint64(i)) @@ -64,6 +74,8 @@ func TestSketchClear(t *testing.T) { } func TestNext2Power(t *testing.T) { + t.Parallel() + sz := 12 << 30 szf := float64(sz) * 0.01 val := int64(szf) diff --git a/store.go b/store.go index 929dc86e..822012fd 100644 --- a/store.go +++ b/store.go @@ -13,11 +13,12 @@ import ( type updateFn[V any] func(cur, prev V) bool // TODO: Do we need this to be a separate struct from Item? -type storeItem[V any] struct { - key uint64 - conflict uint64 - value V - expiration time.Time +type storeItem[K Key, V any] struct { + key uint64 + originalKey K + conflict uint64 + value V + expiration time.Time } // store is the interface fulfilled by all hash map implementations in this @@ -26,7 +27,7 @@ type storeItem[V any] struct { // in Ristretto. // // Every store is safe for concurrent usage. -type store[V any] interface { +type store[K Key, V any] interface { // Get returns the value associated with the key parameter. Get(uint64, uint64) (V, bool) // Expiration returns the expiration time for this key. @@ -34,57 +35,84 @@ type store[V any] interface { // Set adds the key-value pair to the Map or updates the value if it's // already present. The key-value pair is passed as a pointer to an // item object. - Set(*Item[V]) + Set(*Item[K, V]) // Del deletes the key-value pair from the Map. Del(uint64, uint64) (uint64, V) // Update attempts to update the key with a new value and returns true if // successful. - Update(*Item[V]) (V, bool) + Update(*Item[K, V]) (V, bool) // Cleanup removes items that have an expired TTL. - Cleanup(policy *defaultPolicy[V], onEvict func(item *Item[V])) + Cleanup(policy *defaultPolicy[K, V], onEvict func(item *Item[K, V])) // Clear clears all contents of the store. - Clear(onEvict func(item *Item[V])) + Clear(onEvict func(item *Item[K, V])) SetShouldUpdateFn(f updateFn[V]) + // Iter iterates the elements of the Map, passing them to the callback. + // It guarantees that any key in the Map will be visited only once. + // The set of keys visited by Iter is non-deterministic. + Iter(cb func(k K, v V) (stop bool)) } // newStore returns the default store implementation. -func newStore[V any]() store[V] { - return newShardedMap[V]() +func newStore[K Key, V any]() store[K, V] { + return newShardedMap[K, V]() } const numShards uint64 = 256 -type shardedMap[V any] struct { - shards []*lockedMap[V] - expiryMap *expirationMap[V] +type shardedMap[K Key, V any] struct { + shards []*lockedMap[K, V] + expiryMap *expirationMap[K, V] } -func newShardedMap[V any]() *shardedMap[V] { - sm := &shardedMap[V]{ - shards: make([]*lockedMap[V], int(numShards)), - expiryMap: newExpirationMap[V](), +func newShardedMap[K Key, V any]() *shardedMap[K, V] { + sm := &shardedMap[K, V]{ + shards: make([]*lockedMap[K, V], int(numShards)), + expiryMap: newExpirationMap[K, V](), } for i := range sm.shards { - sm.shards[i] = newLockedMap[V](sm.expiryMap) + sm.shards[i] = newLockedMap[K, V](sm.expiryMap) } return sm } -func (m *shardedMap[V]) SetShouldUpdateFn(f updateFn[V]) { +func (m *shardedMap[_, V]) SetShouldUpdateFn(f updateFn[V]) { for i := range m.shards { m.shards[i].setShouldUpdateFn(f) } } -func (sm *shardedMap[V]) Get(key, conflict uint64) (V, bool) { +// Iter iterates the elements of the Map, passing them to the callback. +// It guarantees that any key in the Map will be visited only once. +// The set of keys visited by Iter is non-deterministic. +func (sm *shardedMap[K, V]) Iter(cb func(k K, v V) (stop bool)) { + for _, shard := range sm.shards { + stopped := func() bool { + shard.RLock() + defer shard.RUnlock() + + for _, v := range shard.data { + if stop := cb(v.originalKey, v.value); stop { + return true + } + } + return false + }() + + if stopped { + break + } + } +} + +func (sm *shardedMap[K, V]) Get(key, conflict uint64) (V, bool) { return sm.shards[key%numShards].get(key, conflict) } -func (sm *shardedMap[V]) Expiration(key uint64) time.Time { +func (sm *shardedMap[K, V]) Expiration(key uint64) time.Time { return sm.shards[key%numShards].Expiration(key) } -func (sm *shardedMap[V]) Set(i *Item[V]) { +func (sm *shardedMap[K, V]) Set(i *Item[K, V]) { if i == nil { // If item is nil make this Set a no-op. return @@ -93,35 +121,35 @@ func (sm *shardedMap[V]) Set(i *Item[V]) { sm.shards[i.Key%numShards].Set(i) } -func (sm *shardedMap[V]) Del(key, conflict uint64) (uint64, V) { +func (sm *shardedMap[K, V]) Del(key, conflict uint64) (uint64, V) { return sm.shards[key%numShards].Del(key, conflict) } -func (sm *shardedMap[V]) Update(newItem *Item[V]) (V, bool) { +func (sm *shardedMap[K, V]) Update(newItem *Item[K, V]) (V, bool) { return sm.shards[newItem.Key%numShards].Update(newItem) } -func (sm *shardedMap[V]) Cleanup(policy *defaultPolicy[V], onEvict func(item *Item[V])) { +func (sm *shardedMap[K, V]) Cleanup(policy *defaultPolicy[K, V], onEvict func(item *Item[K, V])) { sm.expiryMap.cleanup(sm, policy, onEvict) } -func (sm *shardedMap[V]) Clear(onEvict func(item *Item[V])) { +func (sm *shardedMap[K, V]) Clear(onEvict func(item *Item[K, V])) { for i := uint64(0); i < numShards; i++ { sm.shards[i].Clear(onEvict) } sm.expiryMap.clear() } -type lockedMap[V any] struct { +type lockedMap[K Key, V any] struct { sync.RWMutex - data map[uint64]storeItem[V] - em *expirationMap[V] + data map[uint64]storeItem[K, V] + em *expirationMap[K, V] shouldUpdate updateFn[V] } -func newLockedMap[V any](em *expirationMap[V]) *lockedMap[V] { - return &lockedMap[V]{ - data: make(map[uint64]storeItem[V]), +func newLockedMap[K Key, V any](em *expirationMap[K, V]) *lockedMap[K, V] { + return &lockedMap[K, V]{ + data: make(map[uint64]storeItem[K, V]), em: em, shouldUpdate: func(cur, prev V) bool { return true @@ -129,11 +157,11 @@ func newLockedMap[V any](em *expirationMap[V]) *lockedMap[V] { } } -func (m *lockedMap[V]) setShouldUpdateFn(f updateFn[V]) { +func (m *lockedMap[K, V]) setShouldUpdateFn(f updateFn[V]) { m.shouldUpdate = f } -func (m *lockedMap[V]) get(key, conflict uint64) (V, bool) { +func (m *lockedMap[K, V]) get(key, conflict uint64) (V, bool) { m.RLock() item, ok := m.data[key] m.RUnlock() @@ -151,13 +179,13 @@ func (m *lockedMap[V]) get(key, conflict uint64) (V, bool) { return item.value, true } -func (m *lockedMap[V]) Expiration(key uint64) time.Time { +func (m *lockedMap[K, V]) Expiration(key uint64) time.Time { m.RLock() defer m.RUnlock() return m.data[key].expiration } -func (m *lockedMap[V]) Set(i *Item[V]) { +func (m *lockedMap[K, V]) Set(i *Item[K, V]) { if i == nil { // If the item is nil make this Set a no-op. return @@ -183,15 +211,16 @@ func (m *lockedMap[V]) Set(i *Item[V]) { m.em.add(i.Key, i.Conflict, i.Expiration) } - m.data[i.Key] = storeItem[V]{ - key: i.Key, - conflict: i.Conflict, - value: i.Value, - expiration: i.Expiration, + m.data[i.Key] = storeItem[K, V]{ + key: i.Key, + originalKey: i.OriginalKey, + conflict: i.Conflict, + value: i.Value, + expiration: i.Expiration, } } -func (m *lockedMap[V]) Del(key, conflict uint64) (uint64, V) { +func (m *lockedMap[K, V]) Del(key, conflict uint64) (uint64, V) { m.Lock() defer m.Unlock() item, ok := m.data[key] @@ -210,7 +239,7 @@ func (m *lockedMap[V]) Del(key, conflict uint64) (uint64, V) { return item.conflict, item.value } -func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) { +func (m *lockedMap[K, V]) Update(newItem *Item[K, V]) (V, bool) { m.Lock() defer m.Unlock() item, ok := m.data[newItem.Key] @@ -225,27 +254,29 @@ func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) { } m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration) - m.data[newItem.Key] = storeItem[V]{ - key: newItem.Key, - conflict: newItem.Conflict, - value: newItem.Value, - expiration: newItem.Expiration, + m.data[newItem.Key] = storeItem[K, V]{ + key: newItem.Key, + originalKey: newItem.OriginalKey, + conflict: newItem.Conflict, + value: newItem.Value, + expiration: newItem.Expiration, } return item.value, true } -func (m *lockedMap[V]) Clear(onEvict func(item *Item[V])) { +func (m *lockedMap[K, V]) Clear(onEvict func(item *Item[K, V])) { m.Lock() defer m.Unlock() - i := &Item[V]{} + i := &Item[K, V]{} if onEvict != nil { for _, si := range m.data { i.Key = si.key i.Conflict = si.conflict i.Value = si.value + i.OriginalKey = si.originalKey onEvict(i) } } - m.data = make(map[uint64]storeItem[V]) + m.data = make(map[uint64]storeItem[K, V]) } diff --git a/store_test.go b/store_test.go index e7923cba..82a6d87d 100644 --- a/store_test.go +++ b/store_test.go @@ -14,12 +14,15 @@ import ( ) func TestStoreSetGet(t *testing.T) { - s := newStore[int]() + t.Parallel() + + s := newStore[int, int]() key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 2, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 2, } s.Set(&i) val, ok := s.Get(key, conflict) @@ -33,10 +36,11 @@ func TestStoreSetGet(t *testing.T) { require.Equal(t, 3, val) key, conflict = z.KeyToHash(2) - i = Item[int]{ - Key: key, - Conflict: conflict, - Value: 2, + i = Item[int, int]{ + Key: key, + OriginalKey: 2, + Conflict: conflict, + Value: 2, } s.Set(&i) val, ok = s.Get(key, conflict) @@ -45,12 +49,15 @@ func TestStoreSetGet(t *testing.T) { } func TestStoreDel(t *testing.T) { - s := newStore[int]() + t.Parallel() + + s := newStore[int, int]() key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } s.Set(&i) s.Del(key, conflict) @@ -61,14 +68,47 @@ func TestStoreDel(t *testing.T) { s.Del(2, 0) } +func TestStoreSetIter(t *testing.T) { + t.Parallel() + + s := newStore[string, int]() + expectedValues := map[string]int{ + "a": 1, + "b": 2, + "c": 3, + "d": 4, + } + for k, v := range expectedValues { + key, conflict := z.KeyToHash(k) + i := Item[string, int]{ + Key: key, + OriginalKey: k, + Conflict: conflict, + Value: v, + } + s.Set(&i) + } + + resultMap := make(map[string]int) + s.Iter(func(k string, v int) (stop bool) { + resultMap[k] = v + return false + }) + + require.Equal(t, expectedValues, resultMap) +} + func TestStoreClear(t *testing.T) { - s := newStore[uint64]() + t.Parallel() + + s := newStore[uint64, uint64]() for i := uint64(0); i < 1000; i++ { key, conflict := z.KeyToHash(i) - it := Item[uint64]{ - Key: key, - Conflict: conflict, - Value: i, + it := Item[uint64, uint64]{ + Key: key, + OriginalKey: i, + Conflict: conflict, + Value: i, } s.Set(&it) } @@ -82,17 +122,20 @@ func TestStoreClear(t *testing.T) { } func TestShouldUpdate(t *testing.T) { + t.Parallel() + // Create a should update function where the value only increases. - s := newStore[int]() + s := newStore[int, int]() s.SetShouldUpdateFn(func(cur, prev int) bool { return cur > prev }) key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 2, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 2, } s.Set(&i) i.Value = 1 @@ -105,12 +148,15 @@ func TestShouldUpdate(t *testing.T) { } func TestStoreUpdate(t *testing.T) { - s := newStore[int]() + t.Parallel() + + s := newStore[int, int]() key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } s.Set(&i) i.Value = 2 @@ -134,10 +180,11 @@ func TestStoreUpdate(t *testing.T) { require.Equal(t, 3, val) key, conflict = z.KeyToHash(2) - i = Item[int]{ - Key: key, - Conflict: conflict, - Value: 2, + i = Item[int, int]{ + Key: key, + OriginalKey: 2, + Conflict: conflict, + Value: 2, } _, ok = s.Update(&i) require.False(t, ok) @@ -147,22 +194,26 @@ func TestStoreUpdate(t *testing.T) { } func TestStoreCollision(t *testing.T) { - s := newShardedMap[int]() + t.Parallel() + + s := newShardedMap[int, int]() s.shards[1].Lock() - s.shards[1].data[1] = storeItem[int]{ - key: 1, - conflict: 0, - value: 1, + s.shards[1].data[1] = storeItem[int, int]{ + key: 1, + originalKey: 1, + conflict: 0, + value: 1, } s.shards[1].Unlock() val, ok := s.Get(1, 1) require.False(t, ok) require.Empty(t, val) - i := Item[int]{ - Key: 1, - Conflict: 1, - Value: 2, + i := Item[int, int]{ + Key: 1, + OriginalKey: 1, + Conflict: 1, + Value: 2, } s.Set(&i) val, ok = s.Get(1, 0) @@ -182,14 +233,17 @@ func TestStoreCollision(t *testing.T) { } func TestStoreExpiration(t *testing.T) { - s := newStore[int]() + t.Parallel() + + s := newStore[int, int]() key, conflict := z.KeyToHash(1) expiration := time.Now().Add(time.Second) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, - Expiration: expiration, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, + Expiration: expiration, } s.Set(&i) val, ok := s.Get(key, conflict) @@ -212,12 +266,13 @@ func TestStoreExpiration(t *testing.T) { } func BenchmarkStoreGet(b *testing.B) { - s := newStore[int]() + s := newStore[int, int]() key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } s.Set(&i) b.SetBytes(1) @@ -229,15 +284,16 @@ func BenchmarkStoreGet(b *testing.B) { } func BenchmarkStoreSet(b *testing.B) { - s := newStore[int]() + s := newStore[int, int]() key, conflict := z.KeyToHash(1) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } s.Set(&i) } @@ -245,21 +301,23 @@ func BenchmarkStoreSet(b *testing.B) { } func BenchmarkStoreUpdate(b *testing.B) { - s := newStore[int]() + s := newStore[int, int]() key, conflict := z.KeyToHash(1) - i := Item[int]{ - Key: key, - Conflict: conflict, - Value: 1, + i := Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 1, } s.Set(&i) b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - s.Update(&Item[int]{ - Key: key, - Conflict: conflict, - Value: 2, + s.Update(&Item[int, int]{ + Key: key, + OriginalKey: 1, + Conflict: conflict, + Value: 2, }) } }) diff --git a/stress_test.go b/stress_test.go index a084b540..2ef7727f 100644 --- a/stress_test.go +++ b/stress_test.go @@ -19,6 +19,8 @@ import ( ) func TestStressSetGet(t *testing.T) { + t.Parallel() + c, err := NewCache(&Config[int, int]{ NumCounters: 1000, MaxCost: 100, @@ -56,6 +58,8 @@ func TestStressSetGet(t *testing.T) { } func TestStressHitRatio(t *testing.T) { + t.Parallel() + key := sim.NewZipfian(1.0001, 1, 1000) c, err := NewCache(&Config[uint64, uint64]{ NumCounters: 1000, diff --git a/ttl.go b/ttl.go index 87c70ca8..2218412b 100644 --- a/ttl.go +++ b/ttl.go @@ -29,20 +29,20 @@ func cleanupBucket(t time.Time) int64 { type bucket map[uint64]uint64 // expirationMap is a map of bucket number to the corresponding bucket. -type expirationMap[V any] struct { +type expirationMap[K Key, V any] struct { sync.RWMutex buckets map[int64]bucket lastCleanedBucketNum int64 } -func newExpirationMap[V any]() *expirationMap[V] { - return &expirationMap[V]{ +func newExpirationMap[K Key, V any]() *expirationMap[K, V] { + return &expirationMap[K, V]{ buckets: make(map[int64]bucket), lastCleanedBucketNum: cleanupBucket(time.Now()), } } -func (m *expirationMap[_]) add(key, conflict uint64, expiration time.Time) { +func (m *expirationMap[_, _]) add(key, conflict uint64, expiration time.Time) { if m == nil { return } @@ -64,7 +64,7 @@ func (m *expirationMap[_]) add(key, conflict uint64, expiration time.Time) { b[key] = conflict } -func (m *expirationMap[_]) update(key, conflict uint64, oldExpTime, newExpTime time.Time) { +func (m *expirationMap[_, _]) update(key, conflict uint64, oldExpTime, newExpTime time.Time) { if m == nil { return } @@ -92,7 +92,7 @@ func (m *expirationMap[_]) update(key, conflict uint64, oldExpTime, newExpTime t newBucket[key] = conflict } -func (m *expirationMap[_]) del(key uint64, expiration time.Time) { +func (m *expirationMap[_, _]) del(key uint64, expiration time.Time) { if m == nil { return } @@ -110,7 +110,7 @@ func (m *expirationMap[_]) del(key uint64, expiration time.Time) { // cleanup removes all the items in the bucket that was just completed. It deletes // those items from the store, and calls the onEvict function on those items. // This function is meant to be called periodically. -func (m *expirationMap[V]) cleanup(store store[V], policy *defaultPolicy[V], onEvict func(item *Item[V])) int { +func (m *expirationMap[K, V]) cleanup(store store[K, V], policy *defaultPolicy[K, V], onEvict func(item *Item[K, V])) int { if m == nil { return 0 } @@ -144,7 +144,7 @@ func (m *expirationMap[V]) cleanup(store store[V], policy *defaultPolicy[V], onE _, value := store.Del(key, conflict) if onEvict != nil { - onEvict(&Item[V]{Key: key, + onEvict(&Item[K, V]{Key: key, Conflict: conflict, Value: value, Cost: cost, @@ -161,7 +161,7 @@ func (m *expirationMap[V]) cleanup(store store[V], policy *defaultPolicy[V], onE // clear clears the expirationMap, the caller is responsible for properly // evicting the referenced items -func (m *expirationMap[V]) clear() { +func (m *expirationMap[K, V]) clear() { if m == nil { return } diff --git a/ttl_test.go b/ttl_test.go index 63c216d8..12ab671a 100644 --- a/ttl_test.go +++ b/ttl_test.go @@ -16,26 +16,28 @@ import ( // It verifies that expired items are correctly evicted from the store and that // non-expired items remain in the store. func TestExpirationMapCleanup(t *testing.T) { + t.Parallel() + // Create a new expiration map - em := newExpirationMap[int]() + em := newExpirationMap[int, int]() // Create a new store - s := newShardedMap[int]() + s := newShardedMap[int, int]() // Create a new policy - p := newDefaultPolicy[int](100, 10) + p := newDefaultPolicy[int, int](100, 10) // Add items to the store and expiration map now := time.Now() - i1 := &Item[int]{Key: 1, Conflict: 1, Value: 100, Expiration: now.Add(1 * time.Second)} + i1 := &Item[int, int]{Key: 1, Conflict: 1, Value: 100, Expiration: now.Add(1 * time.Second)} s.Set(i1) em.add(i1.Key, i1.Conflict, i1.Expiration) - i2 := &Item[int]{Key: 2, Conflict: 2, Value: 200, Expiration: now.Add(3 * time.Second)} + i2 := &Item[int, int]{Key: 2, Conflict: 2, Value: 200, Expiration: now.Add(3 * time.Second)} s.Set(i2) em.add(i2.Key, i2.Conflict, i2.Expiration) // Create a map to store evicted items evictedItems := make(map[uint64]int) - evictedItemsOnEvictFunc := func(item *Item[int]) { + evictedItemsOnEvictFunc := func(item *Item[int, int]) { evictedItems[item.Key] = item.Value }