Skip to content

Commit a826416

Browse files
committed
Update datastructures
1 parent 4922fc2 commit a826416

File tree

12 files changed

+965
-195
lines changed

12 files changed

+965
-195
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
github.com/pierrec/lz4/v4 v4.1.22
3838
github.com/prometheus/client_golang v1.23.2
3939
github.com/prometheus/client_model v0.6.2
40+
github.com/puzpuzpuz/xsync/v3 v3.5.1
4041
github.com/redis/go-redis/v9 v9.17.2
4142
github.com/rs/zerolog v1.34.0
4243
github.com/spf13/cobra v1.10.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
369369
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
370370
github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws=
371371
github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw=
372+
github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg=
373+
github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
372374
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
373375
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
374376
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=

pkg/cache/cache.go

Lines changed: 62 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,23 @@ package cache
55

66
import (
77
"context"
8-
"fmt"
9-
"hash/maphash"
108
"iter"
119
"sync"
1210
"sync/atomic"
1311
"time"
12+
13+
"github.com/LeeDigitalWorks/zapfs/pkg/utils"
1414
)
1515

1616
// Default number of shards for lock striping
17-
const defaultShardCount = 64
17+
const defaultShardCount = 256
1818

1919
// entry wraps a value with access time for LRU eviction and TTL expiry
2020
type entry[V any] struct {
2121
value V
2222
lastAccess atomic.Int64 // Unix nano timestamp
2323
}
2424

25-
// shard is a single partition of the cache with its own lock
26-
type shard[K comparable, V any] struct {
27-
mu sync.RWMutex
28-
data map[K]*entry[V]
29-
}
30-
3125
// Cache is a high-performance concurrent cache with lock striping.
3226
//
3327
// Features:
@@ -52,17 +46,20 @@ type shard[K comparable, V any] struct {
5246
type Cache[K comparable, V any] struct {
5347
ctx context.Context
5448

55-
shards []*shard[K, V]
56-
numShards uint64
57-
seed maphash.Seed
49+
// Use ShardedMap for the underlying storage
50+
store *utils.ShardedMap[K, *entry[V]]
51+
numShards int
52+
53+
// Shard-level locks for LRU eviction (need write access to evict)
54+
shardLocks []sync.Mutex
5855

5956
// Optional load function for cache misses
6057
loadFunc func(ctx context.Context, key K) (V, error)
6158

6259
hasLoaded atomic.Bool
6360

64-
// Max size per shard (0 = unlimited)
65-
maxSizePerShard int
61+
// Max size (0 = unlimited)
62+
maxSize int
6663

6764
// TTL expiry (0 = no expiry)
6865
expiry time.Duration
@@ -76,14 +73,10 @@ type Cache[K comparable, V any] struct {
7673
type Option[K comparable, V any] func(*Cache[K, V])
7774

7875
// WithMaxSize sets the maximum total number of entries in the cache.
79-
// The limit is distributed across shards. When a shard exceeds its limit,
80-
// the least recently accessed entry in that shard is evicted.
76+
// When capacity is reached, the least recently accessed entry is evicted.
8177
func WithMaxSize[K comparable, V any](maxSize int) Option[K, V] {
8278
return func(c *Cache[K, V]) {
83-
c.maxSizePerShard = maxSize / int(c.numShards)
84-
if c.maxSizePerShard < 1 && maxSize > 0 {
85-
c.maxSizePerShard = 1
86-
}
79+
c.maxSize = maxSize
8780
}
8881
}
8982

@@ -98,19 +91,17 @@ func WithExpiry[K comparable, V any](expiry time.Duration) Option[K, V] {
9891

9992
// WithNumShards sets the number of shards for lock striping.
10093
// More shards = less contention but slightly more memory overhead.
101-
// Default is 32 shards.
94+
// Default is 64 shards.
10295
func WithNumShards[K comparable, V any](numShards int) Option[K, V] {
10396
return func(c *Cache[K, V]) {
10497
if numShards < 1 {
10598
numShards = 1
10699
}
107-
c.numShards = uint64(numShards)
108-
c.shards = make([]*shard[K, V], numShards)
109-
for i := range c.shards {
110-
c.shards[i] = &shard[K, V]{
111-
data: make(map[K]*entry[V]),
112-
}
113-
}
100+
c.numShards = numShards
101+
c.shardLocks = make([]sync.Mutex, numShards)
102+
c.store = utils.NewShardedMap[K, *entry[V]](
103+
utils.WithShardCount[K, *entry[V]](numShards),
104+
)
114105
}
115106
}
116107

@@ -128,17 +119,14 @@ func New[K comparable, V any](ctx context.Context, opts ...Option[K, V]) *Cache[
128119
c := &Cache[K, V]{
129120
ctx: ctx,
130121
numShards: defaultShardCount,
131-
seed: maphash.MakeSeed(),
132122
cleanupStop: make(chan struct{}),
133123
}
134124

135-
// Initialize default shards
136-
c.shards = make([]*shard[K, V], c.numShards)
137-
for i := range c.shards {
138-
c.shards[i] = &shard[K, V]{
139-
data: make(map[K]*entry[V]),
140-
}
141-
}
125+
// Initialize default store
126+
c.shardLocks = make([]sync.Mutex, c.numShards)
127+
c.store = utils.NewShardedMap[K, *entry[V]](
128+
utils.WithShardCount[K, *entry[V]](c.numShards),
129+
)
142130

143131
// Apply options (may override numShards)
144132
for _, opt := range opts {
@@ -176,15 +164,9 @@ func (c *Cache[K, V]) cleanup() {
176164
now := time.Now().UnixNano()
177165
expiryNanos := c.expiry.Nanoseconds()
178166

179-
for _, s := range c.shards {
180-
s.mu.Lock()
181-
for key, e := range s.data {
182-
if now-e.lastAccess.Load() > expiryNanos {
183-
delete(s.data, key)
184-
}
185-
}
186-
s.mu.Unlock()
187-
}
167+
c.store.DeleteIf(func(_ K, e *entry[V]) bool {
168+
return now-e.lastAccess.Load() > expiryNanos
169+
})
188170
}
189171

190172
// Stop stops the cleanup goroutine. Call this when the cache is no longer needed.
@@ -195,32 +177,10 @@ func (c *Cache[K, V]) Stop() {
195177
}
196178
}
197179

198-
// getShard returns the shard for a given key using consistent hashing
199-
func (c *Cache[K, V]) getShard(key K) *shard[K, V] {
200-
var h maphash.Hash
201-
h.SetSeed(c.seed)
202-
203-
switch k := any(key).(type) {
204-
case string:
205-
h.WriteString(k)
206-
case []byte:
207-
h.Write(k)
208-
default:
209-
h.WriteString(fmt.Sprint(key))
210-
}
211-
212-
return c.shards[h.Sum64()%c.numShards]
213-
}
214-
215180
// Get retrieves a value from the cache.
216181
// Returns the value and true if found (and not expired), or zero value and false otherwise.
217182
func (c *Cache[K, V]) Get(key K) (V, bool) {
218-
s := c.getShard(key)
219-
220-
s.mu.RLock()
221-
e, exists := s.data[key]
222-
s.mu.RUnlock()
223-
183+
e, exists := c.store.Load(key)
224184
if !exists {
225185
var zero V
226186
return zero, false
@@ -267,71 +227,55 @@ func (c *Cache[K, V]) GetOrLoad(ctx context.Context, key K) (V, error) {
267227

268228
// Set adds or updates a value in the cache.
269229
func (c *Cache[K, V]) Set(key K, value V) {
270-
s := c.getShard(key)
271-
272-
s.mu.Lock()
273-
defer s.mu.Unlock()
230+
e := &entry[V]{value: value}
231+
e.lastAccess.Store(time.Now().UnixNano())
274232

275-
// Check if we need to evict before adding (per-shard limit)
276-
if c.maxSizePerShard > 0 && len(s.data) >= c.maxSizePerShard {
277-
if _, exists := s.data[key]; !exists {
278-
c.evictOldestInShard(s)
233+
// Check if we need to evict before storing (avoid deadlock by checking size outside lock)
234+
if c.maxSize > 0 {
235+
// Check size before acquiring lock to avoid deadlock
236+
currentSize := c.store.Len()
237+
if currentSize >= c.maxSize {
238+
c.evictOldest()
279239
}
280240
}
281241

282-
e := &entry[V]{value: value}
283-
e.lastAccess.Store(time.Now().UnixNano())
284-
s.data[key] = e
242+
c.store.Store(key, e)
285243
}
286244

287-
// evictOldestInShard removes the least recently accessed entry from a shard.
288-
// Caller must hold the shard's write lock.
289-
func (c *Cache[K, V]) evictOldestInShard(s *shard[K, V]) {
245+
// evictOldest removes the least recently accessed entry from the cache.
246+
func (c *Cache[K, V]) evictOldest() {
290247
var oldestKey K
291248
var oldestTime int64 = 0
292249
first := true
293250

294-
for k, e := range s.data {
251+
c.store.Range(func(k K, e *entry[V]) bool {
295252
accessTime := e.lastAccess.Load()
296253
if first || accessTime < oldestTime {
297254
oldestKey = k
298255
oldestTime = accessTime
299256
first = false
300257
}
301-
}
258+
return true
259+
})
302260

303261
if !first {
304-
delete(s.data, oldestKey)
262+
c.store.Delete(oldestKey)
305263
}
306264
}
307265

308266
// Delete removes a key from the cache.
309267
func (c *Cache[K, V]) Delete(key K) {
310-
s := c.getShard(key)
311-
312-
s.mu.Lock()
313-
delete(s.data, key)
314-
s.mu.Unlock()
268+
c.store.Delete(key)
315269
}
316270

317271
// Size returns the current number of entries across all shards.
318272
func (c *Cache[K, V]) Size() int {
319-
total := 0
320-
for _, s := range c.shards {
321-
s.mu.RLock()
322-
total += len(s.data)
323-
s.mu.RUnlock()
324-
}
325-
return total
273+
return c.store.Len()
326274
}
327275

328276
// Clear removes all entries from the cache.
329277
func (c *Cache[K, V]) Clear() {
330-
for _, s := range c.shards {
331-
s.mu.Lock()
332-
s.data = make(map[K]*entry[V])
333-
s.mu.Unlock()
334-
}
278+
c.store.Clear()
335279
}
336280

337281
// Entity represents a cache entry for bulk loading
@@ -346,54 +290,36 @@ type Entity[K, V any] struct {
346290
func (c *Cache[K, V]) Load(seq iter.Seq2[Entity[K, V], error]) error {
347291
const batchSize = 1000
348292

293+
now := time.Now().UnixNano()
349294
var batch []Entity[K, V]
350295
for entity, err := range seq {
351296
if err != nil {
352297
return err
353298
}
354299
batch = append(batch, entity)
355300
if len(batch) >= batchSize {
356-
c.applyBatch(batch)
301+
c.applyBatch(batch, now)
357302
batch = batch[:0]
358303
}
359304
}
360305

361306
if len(batch) > 0 {
362-
c.applyBatch(batch)
307+
c.applyBatch(batch, now)
363308
}
364309

365310
c.hasLoaded.Store(true)
366311
return nil
367312
}
368313

369-
func (c *Cache[K, V]) applyBatch(batch []Entity[K, V]) {
370-
// Group entities by shard to minimize lock acquisitions
371-
shardBatches := make(map[*shard[K, V]][]Entity[K, V])
372-
314+
func (c *Cache[K, V]) applyBatch(batch []Entity[K, V], now int64) {
373315
for _, entity := range batch {
374-
s := c.getShard(entity.Key)
375-
shardBatches[s] = append(shardBatches[s], entity)
376-
}
377-
378-
// Apply each shard's batch with a single lock acquisition
379-
now := time.Now().UnixNano()
380-
for s, entities := range shardBatches {
381-
s.mu.Lock()
382-
for _, entity := range entities {
383-
if entity.IsDeleted {
384-
delete(s.data, entity.Key)
385-
} else {
386-
if c.maxSizePerShard > 0 && len(s.data) >= c.maxSizePerShard {
387-
if _, exists := s.data[entity.Key]; !exists {
388-
c.evictOldestInShard(s)
389-
}
390-
}
391-
e := &entry[V]{value: entity.Value}
392-
e.lastAccess.Store(now)
393-
s.data[entity.Key] = e
394-
}
316+
if entity.IsDeleted {
317+
c.store.Delete(entity.Key)
318+
} else {
319+
e := &entry[V]{value: entity.Value}
320+
e.lastAccess.Store(now)
321+
c.store.Store(entity.Key, e)
395322
}
396-
s.mu.Unlock()
397323
}
398324
}
399325

@@ -416,23 +342,12 @@ func (c *Cache[K, V]) Iter() iter.Seq2[K, V] {
416342
now := time.Now().UnixNano()
417343
expiryNanos := c.expiry.Nanoseconds()
418344

419-
for _, s := range c.shards {
420-
s.mu.RLock()
421-
for key, e := range s.data {
422-
// Skip expired entries
423-
if c.expiry > 0 && now-e.lastAccess.Load() > expiryNanos {
424-
continue
425-
}
426-
// Must unlock before calling yield to avoid deadlock
427-
val := e.value
428-
s.mu.RUnlock()
429-
430-
if !yield(key, val) {
431-
return
432-
}
433-
s.mu.RLock()
345+
c.store.Range(func(key K, e *entry[V]) bool {
346+
// Skip expired entries
347+
if c.expiry > 0 && now-e.lastAccess.Load() > expiryNanos {
348+
return true // continue
434349
}
435-
s.mu.RUnlock()
436-
}
350+
return yield(key, e.value)
351+
})
437352
}
438353
}

pkg/file/grpc_file.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ func putStreamBuffer(buf *[]byte) {
5555

5656
// uuidCache caches objectID -> UUID mappings to avoid repeated SHA-1 hashing.
5757
// UUID v5 uses SHA-1 which is ~200ns per call; caching reduces this to ~35ns.
58-
var uuidCache = utils.NewShardedMap[uuid.UUID]()
58+
// Uses LockFreeMap with GrowOnly since this cache only grows (never shrinks).
59+
var uuidCache = utils.NewLockFreeMap[string, uuid.UUID](utils.WithLockFreeGrowOnly[string, uuid.UUID]())
5960

6061
// objectIDToUUID converts a string object ID to a deterministic UUID.
6162
// Uses UUID v5 (SHA-1 based) to ensure the same objectID always maps to the same UUID.

0 commit comments

Comments
 (0)