-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Convert LFU to LRU cache for blob storage #107130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
6555408
9cdea52
b228a16
b3626c2
64fc8a3
19fa7e1
931f505
f472530
020ae6e
d4c5c11
ebaa61d
8d6f055
61dab6a
dd9d7fd
a05ea7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,12 @@ public class SharedBlobCacheService<KeyType> implements Releasable { | |
|
|
||
| private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache."; | ||
|
|
||
| public static final Setting<String> SHARED_CACHE_TYPE = Setting.simpleString( | ||
| SHARED_CACHE_SETTINGS_PREFIX + "cache_type", | ||
| "lru", | ||
| Setting.Property.NodeScope | ||
| ); | ||
|
|
||
| public static final Setting<ByteSizeValue> SHARED_CACHE_RANGE_SIZE_SETTING = new Setting<>( | ||
| SHARED_CACHE_SETTINGS_PREFIX + "range_size", | ||
| ByteSizeValue.ofMb(16).getStringRep(), | ||
|
|
@@ -256,22 +262,21 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool | |
| Setting.Property.NodeScope | ||
| ); | ||
|
|
||
| // used in tests | ||
| // used only in tests | ||
| void computeDecay() { | ||
| if (cache instanceof LFUCache lfuCache) { | ||
| lfuCache.computeDecay(); | ||
| } | ||
| assert cache instanceof LFUCache; | ||
| ((LFUCache) cache).computeDecay(); | ||
| } | ||
|
|
||
| // used in tests | ||
| // used only in tests | ||
| void maybeScheduleDecayAndNewEpoch() { | ||
| if (cache instanceof LFUCache lfuCache) { | ||
| lfuCache.maybeScheduleDecayAndNewEpoch(lfuCache.epoch.get()); | ||
| } | ||
| assert cache instanceof LFUCache; | ||
| ((LFUCache) cache).maybeScheduleDecayAndNewEpoch(epoch()); | ||
| } | ||
|
|
||
| // used in tests | ||
| // used only in tests | ||
| long epoch() { | ||
| assert cache instanceof LFUCache; | ||
| return ((LFUCache) cache).epoch.get(); | ||
| } | ||
|
|
||
|
|
@@ -363,7 +368,15 @@ public SharedBlobCacheService( | |
| } | ||
| this.regionSize = regionSize; | ||
| assert regionSize > 0L; | ||
| this.cache = new LFUCache(settings); | ||
| if ("lru".equals(SHARED_CACHE_TYPE.get(settings))) { | ||
| this.cache = new LRUCache(); | ||
| } else if ("lfu".equals(SHARED_CACHE_TYPE.get(settings))) { | ||
| this.cache = new LFUCache(settings); | ||
| } else { | ||
| throw new IllegalArgumentException( | ||
| "setting [" + SHARED_CACHE_TYPE.getKey() + " has unknown cache type [" + SHARED_CACHE_TYPE.get(settings) + "]" | ||
| ); | ||
| } | ||
| try { | ||
| sharedBytes = new SharedBytes( | ||
| numRegions, | ||
|
|
@@ -556,7 +569,7 @@ public void maybeFetchRegion( | |
| final RangeMissingHandler writer, | ||
| final ActionListener<Boolean> listener | ||
| ) { | ||
| if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) { | ||
| if (freeRegionCount() < 1 && maybeEvict() == false) { | ||
| // no free page available and no old enough unused region to be evicted | ||
| listener.onResponse(false); | ||
| return; | ||
|
|
@@ -575,8 +588,10 @@ public void maybeFetchRegion( | |
| } | ||
|
|
||
| // used by tests | ||
| boolean maybeEvictLeastUsed() { | ||
| if (cache instanceof LFUCache lfuCache) { | ||
| boolean maybeEvict() { | ||
| if (cache instanceof LRUCache lruCache) { | ||
| return lruCache.maybeEvictLeastRecent(); | ||
| } else if (cache instanceof LFUCache lfuCache) { | ||
| return lfuCache.maybeEvictLeastUsed(); | ||
| } | ||
| return false; | ||
|
|
@@ -1179,7 +1194,307 @@ public record Stats( | |
| public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L); | ||
| } | ||
|
|
||
| private class LFUCache implements Cache<KeyType, CacheFileRegion> { | ||
| private class LRUCache implements Cache<KeyType, CacheFileRegion> { | ||
|
|
||
| class LRUCacheList { | ||
| volatile LRUCacheEntry head; | ||
| LRUCacheEntry tail; | ||
| int size; | ||
| } | ||
|
|
||
| class LRUCacheEntry extends CacheEntry<CacheFileRegion> { | ||
| LRUCacheList list; | ||
| LRUCacheEntry prev; | ||
| LRUCacheEntry next; | ||
|
|
||
| LRUCacheEntry(CacheFileRegion chunk) { | ||
| super(chunk); | ||
| } | ||
|
|
||
| void touch() { | ||
| if (this != front.head) { | ||
| maybePromote(this); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private final ConcurrentHashMap<RegionKey<KeyType>, LRUCacheEntry> keyMapping = new ConcurrentHashMap<>(); | ||
| private final int maxSize; | ||
| private final LRUCacheList front = new LRUCacheList(); | ||
| private final LRUCacheList middle = new LRUCacheList(); | ||
|
|
||
| LRUCache() { | ||
| // we split the front and middle lists in half, but require a minimum of 2 | ||
| // for front to avoid strange head/tail semantics | ||
| maxSize = Math.min(2, numRegions / 2); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| // nothing to close | ||
| } | ||
|
|
||
| @Override | ||
| public LRUCacheEntry get(KeyType cacheKey, long fileLength, int region) { | ||
| final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region); | ||
| // try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path | ||
| // if we did not find an entry | ||
| var entry = keyMapping.get(regionKey); | ||
| if (entry == null) { | ||
| final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region); | ||
| entry = keyMapping.computeIfAbsent(regionKey, key -> new LRUCacheEntry(new CacheFileRegion(key, effectiveRegionSize))); | ||
| } | ||
| // io is volatile, double locking is fine, as long as we assign it last. | ||
| if (entry.chunk.io == null) { | ||
| synchronized (entry.chunk) { | ||
| if (entry.chunk.io == null && entry.chunk.isEvicted() == false) { | ||
| return initChunk(entry); | ||
| } | ||
| } | ||
| } | ||
| assert assertChunkActiveOrEvicted(entry); | ||
|
|
||
| // existing item, check if we need to promote item | ||
| if (entry != front.head) { | ||
| maybePromote(entry); | ||
| } | ||
|
|
||
| return entry; | ||
| } | ||
|
|
||
| @Override | ||
| public int forceEvict(Predicate<KeyType> cacheKeyPredicate) { | ||
| final List<LRUCacheEntry> matchingEntries = new ArrayList<>(); | ||
| keyMapping.forEach((key, value) -> { | ||
| if (cacheKeyPredicate.test(key.file)) { | ||
| matchingEntries.add(value); | ||
| } | ||
| }); | ||
| var evictedCount = 0; | ||
| if (matchingEntries.isEmpty() == false) { | ||
| synchronized (SharedBlobCacheService.this) { | ||
| for (LRUCacheEntry entry : matchingEntries) { | ||
| boolean evicted = entry.chunk.forceEvict(); | ||
| if (evicted && entry.chunk.io != null) { | ||
| unlink(entry); | ||
| keyMapping.remove(entry.chunk.regionKey, entry); | ||
| evictedCount++; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return evictedCount; | ||
| } | ||
|
|
||
| private LRUCacheEntry initChunk(LRUCacheEntry entry) { | ||
| assert Thread.holdsLock(entry.chunk); | ||
| RegionKey<KeyType> regionKey = entry.chunk.regionKey; | ||
| if (keyMapping.get(regionKey) != entry) { | ||
| throwAlreadyClosed("no free region found (contender)"); | ||
| } | ||
| // new item | ||
| assert entry.prev == null; | ||
| assert entry.next == null; | ||
| final SharedBytes.IO freeSlot = freeRegions.poll(); | ||
| if (freeSlot != null) { | ||
| // no need to evict an item, just add | ||
| assignToSlot(entry, freeSlot); | ||
| } else { | ||
| // need to evict something | ||
| SharedBytes.IO io; | ||
| synchronized (SharedBlobCacheService.this) { | ||
| io = maybeEvictAndTake(); | ||
| } | ||
| if (io == null) { | ||
| io = freeRegions.poll(); | ||
| } | ||
| if (io != null) { | ||
| assignToSlot(entry, io); | ||
| } else { | ||
| boolean removed = keyMapping.remove(regionKey, entry); | ||
| assert removed; | ||
| throwAlreadyClosed("no free region found"); | ||
| } | ||
| } | ||
|
|
||
| return entry; | ||
| } | ||
|
|
||
| private void assignToSlot(LRUCacheEntry entry, SharedBytes.IO freeSlot) { | ||
| assert regionOwners.put(freeSlot, entry.chunk) == null; | ||
| synchronized (SharedBlobCacheService.this) { | ||
| if (entry.chunk.isEvicted()) { | ||
| assert regionOwners.remove(freeSlot) == entry.chunk; | ||
| freeRegions.add(freeSlot); | ||
| keyMapping.remove(entry.chunk.regionKey, entry); | ||
| throwAlreadyClosed("evicted during free region allocation"); | ||
| } | ||
| pushEntryToMiddle(entry); | ||
| // assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict. | ||
| entry.chunk.io = freeSlot; | ||
| } | ||
| } | ||
|
|
||
| private void pushEntryToMiddle(final LRUCacheEntry entry) { | ||
| assert Thread.holdsLock(SharedBlobCacheService.this); | ||
| assert invariant(entry, false); | ||
| assert entry.prev == null; | ||
| assert entry.next == null; | ||
| if (middle.head == null) { | ||
| middle.head = entry; | ||
| middle.tail = entry; | ||
| } else { | ||
| entry.next = middle.head; | ||
| middle.head.prev = entry; | ||
| middle.head = entry; | ||
| } | ||
| entry.list = middle; | ||
| ++middle.size; | ||
| assert invariant(entry, true); | ||
| } | ||
|
|
||
| private void pushEntryToFront(final LRUCacheEntry entry) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should be a way for us to not need this kind of lock any longer. Can't we insert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's chat about this again and if it's still relevant once we start talking about using time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking back at past discussions, it seems lock-freedom was a key motivation for going to an LRU so would be good to proof out that this works as part of this (POC?). |
||
| assert Thread.holdsLock(SharedBlobCacheService.this); | ||
| assert invariant(entry, false); | ||
| assert entry.prev == null; | ||
| assert entry.next == null; | ||
| if (front.head == null) { | ||
| front.head = entry; | ||
| front.tail = entry; | ||
| } else { | ||
| entry.next = front.head; | ||
| front.head.prev = entry; | ||
| front.head = entry; | ||
| } | ||
| entry.list = front; | ||
| ++front.size; | ||
| assert invariant(entry, true); | ||
| if (front.size > maxSize) { | ||
| LRUCacheEntry move = front.tail; | ||
| unlink(move); | ||
| pushEntryToMiddle(move); | ||
| } | ||
| } | ||
|
|
||
| private synchronized boolean invariant(final LRUCacheEntry e, boolean present) { | ||
| boolean found = false; | ||
| for (LRUCacheList list : List.of(front, middle)) { | ||
| for (LRUCacheEntry entry = list.head; entry != null; entry = entry.next) { | ||
| assert entry.next == null || entry.next.prev == entry; | ||
| assert entry.prev == null || entry.prev.next == entry; | ||
| if (entry == e) { | ||
| found = true; | ||
| } | ||
| } | ||
| for (LRUCacheEntry entry = list.tail; entry != null; entry = entry.prev) { | ||
| assert entry.next == null || entry.next.prev == entry; | ||
| assert entry.prev == null || entry.prev.next == entry; | ||
| if (entry == e) { | ||
| found = true; | ||
| } | ||
| } | ||
| } | ||
| assert found == present; | ||
| return true; | ||
| } | ||
|
|
||
| private boolean assertChunkActiveOrEvicted(LRUCacheEntry entry) { | ||
| synchronized (SharedBlobCacheService.this) { | ||
| // assert linked (or evicted) | ||
| assert entry.list != null || entry.chunk.isEvicted(); | ||
| } | ||
| SharedBytes.IO io = entry.chunk.io; | ||
| assert io != null || entry.chunk.isEvicted(); | ||
| assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted(); | ||
| return true; | ||
| } | ||
|
|
||
| private void maybePromote(LRUCacheEntry entry) { | ||
| synchronized (SharedBlobCacheService.this) { | ||
| if (entry != front.head && entry.chunk.isEvicted() == false) { | ||
| unlink(entry); | ||
| pushEntryToFront(entry); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void unlink(final LRUCacheEntry entry) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT about testing this explicitly under the following conditions:
Maybe this is already done via the other tests in a transitive manner, as I didn't read through all the tests in detail. However, I noticed that neither this function, nor any of the methods that call this function, are under test. (I'm bit paranoid, but trust me I've earned it...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replied below. |
||
| assert Thread.holdsLock(SharedBlobCacheService.this); | ||
| assert invariant(entry, true); | ||
| if (entry == entry.list.head) { | ||
| entry.list.head = entry.next; | ||
| } | ||
| if (entry == entry.list.tail) { | ||
| entry.list.tail = entry.prev; | ||
| } | ||
| if (entry.prev != null) { | ||
| entry.prev.next = entry.next; | ||
| } | ||
| if (entry.next != null) { | ||
| entry.next.prev = entry.prev; | ||
| } | ||
| --entry.list.size; | ||
| entry.list = null; | ||
| entry.next = null; | ||
| entry.prev = null; | ||
| assert invariant(entry, false); | ||
| } | ||
|
|
||
| /** | ||
| * @return a now free IO region or null if none available. | ||
| */ | ||
| private SharedBytes.IO maybeEvictAndTake() { | ||
| assert Thread.holdsLock(SharedBlobCacheService.this); | ||
| LRUCacheEntry entry = middle.tail; | ||
| if (entry != null) { | ||
| boolean evicted = entry.chunk.tryEvictNoDecRef(); | ||
| if (evicted) { | ||
| try { | ||
| SharedBytes.IO ioRef = entry.chunk.io; | ||
| if (ioRef != null) { | ||
| try { | ||
| if (entry.chunk.refCount() == 1) { | ||
| // we own that one refcount (since we CAS'ed evicted to 1) | ||
| // grab io, rely on incref'ers also checking evicted field. | ||
| entry.chunk.io = null; | ||
| assert regionOwners.remove(ioRef) == entry.chunk; | ||
| return ioRef; | ||
| } | ||
| } finally { | ||
| unlink(entry); | ||
| keyMapping.remove(entry.chunk.regionKey, entry); | ||
| } | ||
| } | ||
| } finally { | ||
| entry.chunk.decRef(); | ||
| } | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * This method tries to evict the least recent {@link LRUCacheEntry}. | ||
| * | ||
| * @return true if an entry was evicted, false otherwise. | ||
| */ | ||
| public boolean maybeEvictLeastRecent() { | ||
| synchronized (SharedBlobCacheService.this) { | ||
| LRUCacheEntry entry = middle.tail; | ||
| if (entry != null) { | ||
| boolean evicted = entry.chunk.tryEvict(); | ||
| if (evicted && entry.chunk.io != null) { | ||
| unlink(entry); | ||
| keyMapping.remove(entry.chunk.regionKey, entry); | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| class LFUCache implements Cache<KeyType, CacheFileRegion> { | ||
|
|
||
| class LFUCacheEntry extends CacheEntry<CacheFileRegion> { | ||
| LFUCacheEntry prev; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with
pushEntryToMiddle/Back(), this seems like the kind of thing that would be good to explicitly test. (Maybe this is already satisfactorily tested through transitive method calls, I confess I didn't read the entirety of the test files.)I was thinking something along the lines of starting with an empty list, adding up to two entries, and for each added entry asserting that the pointers are correct.
We'd also want a test that sets the size to a low value, and checks that the entries are pushed to the middle list.
WDYT? Is that overkill? Is this behaviour already tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK this should be tested transitively. I think it's preferable to keep the
LRUCacheclasses private if possible. They also happen to be non-static so are tied to an instance ofSharedBlobCacheService.