Skip to content

Commit 0e3b9a6

Browse files
committed
updates to avoid double counting values
1 parent 3a9d9c1 commit 0e3b9a6

File tree

2 files changed

+138
-59
lines changed

2 files changed

+138
-59
lines changed

db/revision_cache_lru.go

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ type revCacheValue struct {
133133
lock sync.RWMutex
134134
deleted bool
135135
removed bool
136-
itemBytes atomic.Int64
136+
itemBytes atomic.Int64 // atomic count of number of bytes used by this item in the cache, this does not include delta bytes
137137
collectionID uint32
138-
valuePopulated atomic.Bool
139-
deltaLock sync.Mutex // synchronizes GetDelta across multiple clients for each fromRevision
138+
valuePopulated atomic.Bool // atonic boolean to indicate that this value has been loaded/stored
139+
deltaLock sync.Mutex // synchronizes GetDelta across multiple clients for each fromRevision
140140
}
141141

142142
// Creates a revision cache with the given capacity and an optional loader function.
@@ -181,17 +181,20 @@ func (rc *LRURevisionCache) Peek(ctx context.Context, docID, revID string, colle
181181
// Attempt to update the delta on a revision cache entry. If the entry is no longer resident in the cache,
182182
// fails silently
183183
func (rc *LRURevisionCache) UpdateDelta(ctx context.Context, docID, revID string, collectionID uint32, toDelta RevisionDelta) {
184-
value := rc.getValueForDeltaUpdate(ctx, docID, revID, collectionID, toDelta)
184+
value := rc.updateDeltaForValue(ctx, docID, revID, collectionID, toDelta)
185185
if value != nil {
186-
value.updateDelta(toDelta)
187186
// check for memory based eviction
188187
rc.revCacheMemoryBasedEviction(ctx)
189188
}
190189
}
191190

192-
func (rc *LRURevisionCache) getValueForDeltaUpdate(ctx context.Context, docID, revID string, collectionID uint32, newDelta RevisionDelta) (value *revCacheValue) {
191+
// updateDeltaForValue will update the delta for a given revision cache value if present in the cache. We need to hold onto the
192+
// rev cache lock until this process finishes so no other thead can remove/evict this value while updating it and the underlying memory stats.
193+
// We will use the valuePopulated populated boolean to check if we can change the delta on the value, we cannot use value
194+
// lock here given we already hold the rev cache lock in this process.
195+
func (rc *LRURevisionCache) updateDeltaForValue(ctx context.Context, docID, revID string, collectionID uint32, newDelta RevisionDelta) (value *revCacheValue) {
193196
if docID == "" || revID == "" {
194-
return
197+
return nil
195198
}
196199
key := IDAndRev{DocID: docID, RevID: revID, CollectionID: collectionID}
197200
rc.lock.Lock()
@@ -201,12 +204,15 @@ func (rc *LRURevisionCache) getValueForDeltaUpdate(ctx context.Context, docID, r
201204
value = elem.Value.(*revCacheValue)
202205
} else {
203206
// elem not found so return early
204-
return
207+
return nil
205208
}
206209

210+
// Check if the value has finished being loaded into yet. If it is still being loaded then this item is not
211+
// eligible for delta update as the value will be being written to in the loading process and its not safe to
212+
// read/write to it on this thread.
207213
if !value.valuePopulated.Load() {
208214
// item is not eligible for delta update when value has not been populated yet
209-
return
215+
return nil
210216
}
211217

212218
// alter overall memory here then release lock to call update delta
@@ -216,18 +222,21 @@ func (rc *LRURevisionCache) getValueForDeltaUpdate(ctx context.Context, docID, r
216222
previousDeltaBytes = value.delta.totalDeltaBytes
217223
}
218224
diffInBytes = newDelta.totalDeltaBytes - previousDeltaBytes
219-
if diffInBytes != 0 {
220-
value.itemBytes.Add(diffInBytes)
221-
}
222225
rc.currMemoryUsage.Add(diffInBytes)
223226
rc.cacheMemoryBytesStat.Add(diffInBytes)
227+
// update delta
228+
value.delta = &newDelta
224229

225230
return value
226231
}
227232

228-
func (rc *LRURevisionCache) getValueForDeltaUpdateCV(ctx context.Context, docID string, cv *Version, collectionID uint32, newDelta RevisionDelta) (value *revCacheValue) {
233+
// updateDeltaForValueCV will update the delta for a given revision cache value if present in the cache. We need to hold onto the
234+
// rev cache lock until this process finishes so no other thead can remove/evict this value while updating it and the underlying memory stats.
235+
// We will use the valuePopulated populated boolean to check if we can change the delta on the value, we cannot use value
236+
// lock here given we already hold the rev cache lock in this process.
237+
func (rc *LRURevisionCache) updateDeltaForValueCV(ctx context.Context, docID string, cv *Version, collectionID uint32, newDelta RevisionDelta) (value *revCacheValue) {
229238
if cv == nil {
230-
return
239+
return nil
231240
}
232241
key := IDandCV{DocID: docID, Source: cv.SourceID, Version: cv.Value, CollectionID: collectionID}
233242
rc.lock.Lock()
@@ -237,12 +246,15 @@ func (rc *LRURevisionCache) getValueForDeltaUpdateCV(ctx context.Context, docID
237246
value = elem.Value.(*revCacheValue)
238247
} else {
239248
// elem not found so return early
240-
return
249+
return nil
241250
}
242251

252+
// Check if the value has finished being loaded into yet. If it is still being loaded then this item is not
253+
// eligible for delta update as the value will be being written to in the loading thread and its not safe to
254+
// read/write to it on this thread.
243255
if !value.valuePopulated.Load() {
244256
// item is not eligible for delta update when value has not been populated yet
245-
return
257+
return nil
246258
}
247259

248260
// alter overall memory here then release lock to call update delta
@@ -252,19 +264,17 @@ func (rc *LRURevisionCache) getValueForDeltaUpdateCV(ctx context.Context, docID
252264
previousDeltaBytes = value.delta.totalDeltaBytes
253265
}
254266
diffInBytes = newDelta.totalDeltaBytes - previousDeltaBytes
255-
if diffInBytes != 0 {
256-
value.itemBytes.Add(diffInBytes)
257-
}
258267
rc.currMemoryUsage.Add(diffInBytes)
259268
rc.cacheMemoryBytesStat.Add(diffInBytes)
269+
// update delta
270+
value.delta = &newDelta
260271

261272
return value
262273
}
263274

264275
func (rc *LRURevisionCache) UpdateDeltaCV(ctx context.Context, docID string, cv *Version, collectionID uint32, toDelta RevisionDelta) {
265-
value := rc.getValueForDeltaUpdateCV(ctx, docID, cv, collectionID, toDelta)
276+
value := rc.updateDeltaForValueCV(ctx, docID, cv, collectionID, toDelta)
266277
if value != nil {
267-
value.updateDelta(toDelta)
268278
// check for memory based eviction
269279
rc.revCacheMemoryBasedEviction(ctx)
270280
}
@@ -435,7 +445,7 @@ func (rc *LRURevisionCache) upsertDocToCache(ctx context.Context, cvKey IDandCV,
435445
if found {
436446
revItem := existingElem.Value.(*revCacheValue)
437447
// decrement item bytes by the removed item
438-
rc._decrRevCacheMemoryUsage(ctx, -revItem.getItemBytes())
448+
rc._decrRevCacheMemoryUsage(ctx, -revItem._getItemAndDeltaBytes())
439449
rc.lruList.Remove(existingElem)
440450
newItem = false
441451
}
@@ -542,7 +552,7 @@ func (rc *LRURevisionCache) addToRevMapPostLoad(ctx context.Context, docID, revI
542552
}
543553
// if CV map and rev map are targeting different list elements, update to have both use the cv map element
544554
rc.cache[legacyKey] = cvElem
545-
rc._decrRevCacheMemoryUsage(ctx, -revElem.Value.(*revCacheValue).getItemBytes())
555+
rc._decrRevCacheMemoryUsage(ctx, -revElem.Value.(*revCacheValue)._getItemAndDeltaBytes())
546556
rc.cacheNumItems.Add(-1)
547557
rc.lruList.Remove(revElem)
548558
} else {
@@ -584,7 +594,7 @@ func (rc *LRURevisionCache) addToHLVMapPostLoad(ctx context.Context, docID, revI
584594
}
585595
// if CV map and rev map are targeting different list elements, update to have both use the cv map element
586596
rc.cache[legacyKey] = cvElem
587-
rc._decrRevCacheMemoryUsage(ctx, -revElem.Value.(*revCacheValue).getItemBytes())
597+
rc._decrRevCacheMemoryUsage(ctx, -revElem.Value.(*revCacheValue)._getItemAndDeltaBytes())
588598
rc.cacheNumItems.Add(-1)
589599
rc.lruList.Remove(revElem)
590600
} else {
@@ -634,7 +644,7 @@ func (rc *LRURevisionCache) removeFromCacheByCV(ctx context.Context, docID strin
634644
delete(rc.hlvCache, key)
635645
// remove from rev lookup map too
636646
delete(rc.cache, legacyKey)
637-
rc._decrRevCacheMemoryUsage(ctx, -elem.getItemBytes())
647+
rc._decrRevCacheMemoryUsage(ctx, -elem._getItemAndDeltaBytes())
638648
rc.cacheNumItems.Add(-1)
639649
}
640650

@@ -656,7 +666,7 @@ func (rc *LRURevisionCache) removeFromCacheByRev(ctx context.Context, docID, rev
656666
hlvKey := IDandCV{DocID: docID, Source: elem.cv.SourceID, Version: elem.cv.Value, CollectionID: collectionID}
657667
rc.lruList.Remove(element)
658668
// decrement the overall memory bytes count
659-
rc._decrRevCacheMemoryUsage(ctx, -elem.getItemBytes())
669+
rc._decrRevCacheMemoryUsage(ctx, -elem._getItemAndDeltaBytes())
660670
delete(rc.cache, key)
661671
// remove from CV lookup map too
662672
delete(rc.hlvCache, hlvKey)
@@ -739,7 +749,7 @@ func (rc *LRURevisionCache) _numberCapacityEviction() (numItemsEvicted int64, nu
739749
}
740750
}
741751
numItemsEvicted++
742-
numBytesEvicted += value.getItemBytes()
752+
numBytesEvicted += value._getItemAndDeltaBytes()
743753
}
744754
return numItemsEvicted, numBytesEvicted
745755
}
@@ -916,17 +926,22 @@ func (value *revCacheValue) store(docRev DocumentRevision) {
916926
value.valuePopulated.Store(true) // now we have stored the doc revision in the cache, we can allow eviction
917927
}
918928

919-
func (value *revCacheValue) updateDelta(toDelta RevisionDelta) {
920-
value.lock.Lock()
921-
defer value.lock.Unlock()
922-
value.delta = &toDelta
923-
}
924-
925929
// getItemBytes atomically retrieves the rev cache items overall memory footprint
926930
func (value *revCacheValue) getItemBytes() int64 {
927931
return value.itemBytes.Load()
928932
}
929933

934+
// _getItemAndDeltaBytes will retrieve the total bytes used by the rev cache value including delta if present. The rev
935+
// cache lock should be acquired to use this function.
936+
func (value *revCacheValue) _getItemAndDeltaBytes() int64 {
937+
var totalBytes int64
938+
totalBytes += value.getItemBytes()
939+
if value.delta != nil {
940+
totalBytes += value.delta.totalDeltaBytes
941+
}
942+
return totalBytes
943+
}
944+
930945
// CalculateBytes will calculate the bytes from revisions in the document, body and channels on the document
931946
func (rev *DocumentRevision) CalculateBytes() {
932947
var totalBytes int
@@ -1024,7 +1039,7 @@ func (rc *LRURevisionCache) evictBasedOffMemoryUsage(ctx context.Context) int64
10241039
}
10251040
}
10261041
numItemsRemoved++
1027-
valueBytes := value.getItemBytes()
1042+
valueBytes := value._getItemAndDeltaBytes()
10281043
numBytesRemoved += valueBytes
10291044
}
10301045
}
@@ -1049,7 +1064,8 @@ func (rc *LRURevisionCache) _decrRevCacheMemoryUsage(ctx context.Context, bytesC
10491064
}
10501065

10511066
// incrRevCacheMemoryUsage atomically increases overall memory usage for cache and the actual rev cache objects usage.
1052-
// You do not need to hold rev cache lock when using this function
1067+
// You do not need to hold rev cache lock when using this function. NOTE: caller should only pass in the value bytes
1068+
// excluding any delta size as UpdateDelta functions will update memory stats for us.
10531069
func (rc *LRURevisionCache) incrRevCacheMemoryUsage(ctx context.Context, bytesCount int64) {
10541070
// We need to keep track of the current LRURevisionCache memory usage AND the overall usage of the cache. We need
10551071
// overall memory usage for the stat added to show rev cache usage plus we need the current rev cache capacity of the

db/revision_cache_test.go

Lines changed: 87 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2466,20 +2466,30 @@ func TestUpdateDeltaRevCacheMemoryStatPanicSingleEntry(t *testing.T) {
24662466
revCacheDelta2 := newRevCacheDelta(firstDelta, "1-abc", docRev, false, nil)
24672467

24682468
// Thread 1: UpdateDelta - start
2469-
value := cache.getValue(ctx, "doc1", "1-abc", testCollectionID, false)
2470-
if value != nil {
2471-
// Thread 2: Remove - start - drop value underneath UpdateDelta thread
2472-
cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID)
2473-
// Thread 2: Remove - end
2474-
outGoingBytes := value.updateDelta(revCacheDelta2)
2475-
if outGoingBytes != 0 {
2476-
cache.currMemoryUsage.Add(outGoingBytes)
2477-
cache.cacheMemoryBytesStat.Add(outGoingBytes)
2469+
var wg sync.WaitGroup
2470+
wg.Add(2)
2471+
go func() {
2472+
for i := 0; i < 1000; i++ {
2473+
cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta2)
24782474
}
2479-
// check for memory based eviction
2480-
cache.revCacheMemoryBasedEviction(ctx)
2481-
}
2475+
wg.Done()
2476+
}()
24822477
// Thread 1: UpdateDelta - end
2478+
// Thread 2: Remove - start
2479+
go func() {
2480+
for i := 0; i < 1000; i++ {
2481+
cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID)
2482+
if i == 999 {
2483+
break
2484+
}
2485+
_, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta)
2486+
require.NoError(t, err, "Error adding to cache")
2487+
}
2488+
wg.Done()
2489+
}()
2490+
// Thread 2: Remove - end
2491+
2492+
wg.Wait()
24832493

24842494
assert.Equal(t, 0, cache.lruList.Len())
24852495
assert.Equal(t, int64(0), memoryBytesCounted.Value())
@@ -2511,21 +2521,74 @@ func TestUpdateDeltaRevCacheMemoryStatPanicMultipleEntries(t *testing.T) {
25112521

25122522
revCacheDelta2 := newRevCacheDelta(firstDelta, "1-abc", docRev2, false, nil)
25132523

2524+
var wg sync.WaitGroup
2525+
wg.Add(2)
25142526
// Thread 1: UpdateDelta - start
2515-
value := cache.getValue(ctx, "doc2", "1-abc", testCollectionID, false)
2516-
if value != nil {
2517-
// Thread 2: Remove - start - drop value underneath UpdateDelta thread
2518-
cache.RemoveWithRev(ctx, "doc2", "1-abc", testCollectionID)
2519-
// Thread 2: Remove - end
2520-
outGoingBytes := value.updateDelta(revCacheDelta2)
2521-
if outGoingBytes != 0 {
2522-
cache.currMemoryUsage.Add(outGoingBytes)
2523-
cache.cacheMemoryBytesStat.Add(outGoingBytes)
2527+
go func() {
2528+
for i := 0; i < 1000; i++ {
2529+
cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta2)
25242530
}
2525-
// check for memory based eviction
2526-
cache.revCacheMemoryBasedEviction(ctx)
2527-
}
2531+
wg.Done()
2532+
}()
25282533
// Thread 1: UpdateDelta - end
2534+
go func() {
2535+
for i := 0; i < 1000; i++ {
2536+
cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID)
2537+
if i == 999 {
2538+
break
2539+
}
2540+
_, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta)
2541+
require.NoError(t, err, "Error adding to cache")
2542+
}
2543+
wg.Done()
2544+
}()
2545+
2546+
wg.Wait()
2547+
2548+
assert.Equal(t, 0, cache.lruList.Len())
2549+
assert.Equal(t, int64(0), memoryBytesCounted.Value())
2550+
}
2551+
2552+
func TestInconsistentMemoryForUpdateDelta(t *testing.T) {
2553+
2554+
cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}
2555+
backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID)
2556+
cacheOptions := &RevisionCacheOptions{
2557+
MaxItemCount: 5000,
2558+
MaxBytes: 0,
2559+
}
2560+
cache := NewLRURevisionCache(cacheOptions, backingStoreMap, &cacheHitCounter, &cacheMissCounter, &cacheNumItems, &memoryBytesCounted)
2561+
2562+
firstDelta := bytes.Repeat([]byte("a"), 1000)
2563+
ctx := base.TestCtx(t)
2564+
2565+
// Trigger load into cache
2566+
docRev, err := cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta)
2567+
require.NoError(t, err, "Error adding to cache")
2568+
2569+
revCacheDelta := newRevCacheDelta(firstDelta, "1-abc", docRev, false, nil)
2570+
2571+
var wg sync.WaitGroup
2572+
wg.Add(2)
2573+
go func() {
2574+
for i := 0; i < 1000; i++ {
2575+
cache.UpdateDelta(ctx, "doc1", "1-abc", testCollectionID, revCacheDelta)
2576+
}
2577+
wg.Done()
2578+
}()
2579+
go func() {
2580+
for i := 0; i < 1000; i++ {
2581+
cache.RemoveWithRev(ctx, "doc1", "1-abc", testCollectionID)
2582+
if i == 999 {
2583+
break
2584+
}
2585+
_, err = cache.GetWithRev(ctx, "doc1", "1-abc", testCollectionID, RevCacheIncludeDelta)
2586+
require.NoError(t, err, "Error adding to cache")
2587+
}
2588+
wg.Done()
2589+
}()
2590+
2591+
wg.Wait()
25292592

25302593
assert.Equal(t, 0, cache.lruList.Len())
25312594
assert.Equal(t, int64(0), memoryBytesCounted.Value())

0 commit comments

Comments
 (0)