Skip to content

Commit 224320e

Browse files
[fix][ml] Fix memory leak due to duplicated RangeCache value retain operations (#23955)
Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 20b3b22)
1 parent fab143b commit 224320e

File tree

2 files changed

+62
-67
lines changed

2 files changed

+62
-67
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java

Lines changed: 33 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -284,13 +284,19 @@ private Value getValueFromWrapper(Key key, EntryWrapper<Key, Value> valueWrapper
284284
}
285285
}
286286

287+
/**
288+
* @apiNote the returned value must be released if it's not null
289+
*/
287290
private Value getValueMatchingEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry) {
288291
Value valueMatchingEntry = EntryWrapper.getValueMatchingMapEntry(entry);
289292
return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
290293
}
291294

292295
// validates that the value matches the key and that the value has not been recycled
293296
// which are possible due to the lack of exclusive locks in the cache and the use of reference counted objects
297+
/**
298+
* @apiNote the returned value must be released if it's not null
299+
*/
294300
private Value getRetainedValueMatchingKey(Key key, Value value) {
295301
if (value == null) {
296302
// the wrapper has been recycled and contains another key
@@ -350,7 +356,7 @@ public Pair<Integer, Long> removeRange(Key first, Key last, boolean lastInclusiv
350356
RemovalCounters counters = RemovalCounters.create();
351357
Map<Key, EntryWrapper<Key, Value>> subMap = entries.subMap(first, true, last, lastInclusive);
352358
for (Map.Entry<Key, EntryWrapper<Key, Value>> entry : subMap.entrySet()) {
353-
removeEntry(entry, counters, true);
359+
removeEntry(entry, counters);
354360
}
355361
return handleRemovalResult(counters);
356362
}
@@ -361,84 +367,48 @@ enum RemoveEntryResult {
361367
BREAK_LOOP;
362368
}
363369

364-
private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry, RemovalCounters counters,
365-
boolean skipInvalid) {
366-
return removeEntry(entry, counters, skipInvalid, x -> true);
370+
private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry, RemovalCounters counters) {
371+
return removeEntry(entry, counters, x -> true);
367372
}
368373

369374
private RemoveEntryResult removeEntry(Map.Entry<Key, EntryWrapper<Key, Value>> entry, RemovalCounters counters,
370-
boolean skipInvalid, Predicate<Value> removeCondition) {
375+
Predicate<Value> removeCondition) {
371376
Key key = entry.getKey();
372377
EntryWrapper<Key, Value> entryWrapper = entry.getValue();
373378
Value value = getValueMatchingEntry(entry);
374379
if (value == null) {
375-
// the wrapper has already been recycled and contains another key
376-
if (!skipInvalid) {
377-
EntryWrapper<Key, Value> removed = entries.remove(key);
378-
if (removed != null) {
379-
// log and remove the entry without releasing the value
380-
log.info("Key {} does not match the entry's value wrapper's key {}, removed entry by key without "
381-
+ "releasing the value", key, entryWrapper.getKey());
382-
counters.entryRemoved(removed.getSize());
383-
return RemoveEntryResult.ENTRY_REMOVED;
384-
}
385-
}
386-
return RemoveEntryResult.CONTINUE_LOOP;
387-
}
388-
try {
389-
// add extra retain to avoid value being released while we are removing it
390-
value.retain();
391-
} catch (IllegalReferenceCountException e) {
392-
// Value was already released
393-
if (!skipInvalid) {
394-
// remove the specific entry without releasing the value
395-
if (entries.remove(key, entryWrapper)) {
396-
log.info("Value was already released for key {}, removed entry without releasing the value", key);
397-
counters.entryRemoved(entryWrapper.getSize());
398-
return RemoveEntryResult.ENTRY_REMOVED;
399-
}
400-
}
380+
// the wrapper has already been recycled or contains another key
381+
entries.remove(key, entryWrapper);
401382
return RemoveEntryResult.CONTINUE_LOOP;
402383
}
403-
if (!value.matchesKey(key)) {
404-
// this is unexpected since the IdentityWrapper.getValue(key) already checked that the value matches the key
405-
log.warn("Unexpected race condition. Value {} does not match the key {}. Removing entry.", value, key);
406-
}
407384
try {
408385
if (!removeCondition.test(value)) {
409386
return RemoveEntryResult.BREAK_LOOP;
410387
}
411-
if (!skipInvalid) {
412-
// remove the specific entry
413-
boolean entryRemoved = entries.remove(key, entryWrapper);
414-
if (entryRemoved) {
415-
counters.entryRemoved(entryWrapper.getSize());
416-
// check that the value hasn't been recycled in between
417-
// there should be at least 2 references since this method adds one and the cache should have
418-
// one reference. it is valid that the value contains references even after the key has been
419-
// removed from the cache
420-
if (value.refCnt() > 1) {
421-
entryWrapper.recycle();
422-
// remove the cache reference
423-
value.release();
424-
} else {
425-
log.info("Unexpected refCnt {} for key {}, removed entry without releasing the value",
426-
value.refCnt(), key);
427-
}
428-
}
429-
} else if (skipInvalid && value.refCnt() > 1 && entries.remove(key, entryWrapper)) {
430-
// when skipInvalid is true, we don't remove the entry if it doesn't match matches the key
431-
// or the refCnt is invalid
388+
// remove the specific entry
389+
boolean entryRemoved = entries.remove(key, entryWrapper);
390+
if (entryRemoved) {
432391
counters.entryRemoved(entryWrapper.getSize());
433-
entryWrapper.recycle();
434-
// remove the cache reference
435-
value.release();
392+
// check that the value hasn't been recycled in between
393+
// there should be at least 2 references since this method adds one and the cache should have
394+
// one reference. it is valid that the value contains references even after the key has been
395+
// removed from the cache
396+
if (value.refCnt() > 1) {
397+
entryWrapper.recycle();
398+
// remove the cache reference
399+
value.release();
400+
} else {
401+
log.info("Unexpected refCnt {} for key {}, removed entry without releasing the value",
402+
value.refCnt(), key);
403+
}
404+
return RemoveEntryResult.ENTRY_REMOVED;
405+
} else {
406+
return RemoveEntryResult.CONTINUE_LOOP;
436407
}
437408
} finally {
438409
// remove the extra retain
439410
value.release();
440411
}
441-
return RemoveEntryResult.ENTRY_REMOVED;
442412
}
443413

444414
private Pair<Integer, Long> handleRemovalResult(RemovalCounters counters) {
@@ -464,7 +434,7 @@ public Pair<Integer, Long> evictLeastAccessedEntries(long minSize) {
464434
if (entry == null) {
465435
break;
466436
}
467-
removeEntry(entry, counters, false);
437+
removeEntry(entry, counters);
468438
}
469439
return handleRemovalResult(counters);
470440
}
@@ -484,7 +454,7 @@ public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
484454
if (entry == null) {
485455
break;
486456
}
487-
if (removeEntry(entry, counters, false, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp)
457+
if (removeEntry(entry, counters, value -> timestampExtractor.getTimestamp(value) <= maxTimestamp)
488458
== RemoveEntryResult.BREAK_LOOP) {
489459
break;
490460
}
@@ -518,7 +488,7 @@ public Pair<Integer, Long> clear() {
518488
if (entry == null) {
519489
break;
520490
}
521-
removeEntry(entry, counters, false);
491+
removeEntry(entry, counters);
522492
}
523493
return handleRemovalResult(counters);
524494
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import com.google.common.collect.Lists;
2828
import io.netty.util.AbstractReferenceCounted;
2929
import io.netty.util.ReferenceCounted;
30+
import java.util.Map;
3031
import java.util.concurrent.Executors;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.TimeUnit;
34+
import java.util.stream.Collectors;
3335
import lombok.Cleanup;
3436
import lombok.Data;
3537
import org.apache.commons.lang3.tuple.Pair;
3638
import org.awaitility.Awaitility;
39+
import org.testng.annotations.DataProvider;
3740
import org.testng.annotations.Test;
3841

3942
public class RangeCacheTest {
@@ -140,9 +143,14 @@ public void customWeighter() {
140143
assertEquals(cache.getNumberOfEntries(), 2);
141144
}
142145

146+
@DataProvider
147+
public static Object[][] retainBeforeEviction() {
148+
return new Object[][]{ { true }, { false } };
149+
}
143150

144-
@Test
145-
public void customTimeExtraction() {
151+
152+
@Test(dataProvider = "retainBeforeEviction")
153+
public void customTimeExtraction(boolean retain) {
146154
RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
147155

148156
cache.put(1, new RefString("1"));
@@ -152,13 +160,30 @@ public void customTimeExtraction() {
152160

153161
assertEquals(cache.getSize(), 10);
154162
assertEquals(cache.getNumberOfEntries(), 4);
163+
final var retainedEntries = cache.getRange(1, 4444);
164+
for (final var entry : retainedEntries) {
165+
assertEquals(entry.refCnt(), 2);
166+
if (!retain) {
167+
entry.release();
168+
}
169+
}
155170

156171
Pair<Integer, Long> evictedSize = cache.evictLEntriesBeforeTimestamp(3);
157172
assertEquals(evictedSize.getRight().longValue(), 6);
158173
assertEquals(evictedSize.getLeft().longValue(), 3);
159-
160174
assertEquals(cache.getSize(), 4);
161175
assertEquals(cache.getNumberOfEntries(), 1);
176+
177+
if (retain) {
178+
final var valueToRefCnt = retainedEntries.stream().collect(Collectors.toMap(RefString::getS,
179+
AbstractReferenceCounted::refCnt));
180+
assertEquals(valueToRefCnt, Map.of("1", 1, "22", 1, "333", 1, "4444", 2));
181+
retainedEntries.forEach(AbstractReferenceCounted::release);
182+
} else {
183+
final var valueToRefCnt = retainedEntries.stream().filter(v -> v.refCnt() > 0).collect(Collectors.toMap(
184+
RefString::getS, AbstractReferenceCounted::refCnt));
185+
assertEquals(valueToRefCnt, Map.of("4444", 1));
186+
}
162187
}
163188

164189
@Test
@@ -355,4 +380,4 @@ public void testGetKeyWithDifferentInstance() {
355380
// the value should be found
356381
assertEquals(s.s, "129");
357382
}
358-
}
383+
}

0 commit comments

Comments
 (0)