Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class BlobCachePlugin extends Plugin implements ExtensiblePlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(
SharedBlobCacheService.SHARED_CACHE_TYPE,
SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING,
SharedBlobCacheService.SHARED_CACHE_SIZE_MAX_HEADROOM_SETTING,
SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class SharedBlobCacheService<KeyType> implements Releasable {

private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache.";

public static final Setting<String> SHARED_CACHE_TYPE = Setting.simpleString(
SHARED_CACHE_SETTINGS_PREFIX + "cache_type",
"lru",
Setting.Property.NodeScope
);

public static final Setting<ByteSizeValue> SHARED_CACHE_RANGE_SIZE_SETTING = new Setting<>(
SHARED_CACHE_SETTINGS_PREFIX + "range_size",
ByteSizeValue.ofMb(16).getStringRep(),
Expand Down Expand Up @@ -259,22 +265,21 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool
Setting.Property.NodeScope
);

// used in tests
// used only in tests
void computeDecay() {
if (cache instanceof LFUCache lfuCache) {
lfuCache.computeDecay();
}
assert cache instanceof LFUCache;
((LFUCache) cache).computeDecay();
}

// used in tests
// used only in tests
void maybeScheduleDecayAndNewEpoch() {
if (cache instanceof LFUCache lfuCache) {
lfuCache.maybeScheduleDecayAndNewEpoch(lfuCache.epoch.get());
}
assert cache instanceof LFUCache;
((LFUCache) cache).maybeScheduleDecayAndNewEpoch(epoch());
}

// used in tests
// used only in tests
long epoch() {
assert cache instanceof LFUCache;
return ((LFUCache) cache).epoch.get();
}

Expand Down Expand Up @@ -364,7 +369,15 @@ public SharedBlobCacheService(
}
this.regionSize = regionSize;
assert regionSize > 0L;
this.cache = new LFUCache(settings);
if ("lru".equals(SHARED_CACHE_TYPE.get(settings))) {
this.cache = new LRUCache();
} else if ("lfu".equals(SHARED_CACHE_TYPE.get(settings))) {
this.cache = new LFUCache(settings);
} else {
throw new IllegalArgumentException(
"setting [" + SHARED_CACHE_TYPE.getKey() + " has unknown cache type [" + SHARED_CACHE_TYPE.get(settings) + "]"
);
}
try {
sharedBytes = new SharedBytes(
numRegions,
Expand Down Expand Up @@ -566,7 +579,7 @@ public void maybeFetchRegion(
final Executor fetchExecutor,
final ActionListener<Boolean> listener
) {
if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) {
if (freeRegionCount() < 1 && maybeEvict() == false) {
// no free page available and no old enough unused region to be evicted
logger.info("No free regions, skipping loading region [{}]", region);
listener.onResponse(false);
Expand Down Expand Up @@ -614,7 +627,7 @@ public void maybeFetchRange(
final Executor fetchExecutor,
final ActionListener<Boolean> listener
) {
if (freeRegionCount() < 1 && maybeEvictLeastUsed() == false) {
if (freeRegionCount() < 1 && maybeEvict() == false) {
// no free page available and no old enough unused region to be evicted
logger.info("No free regions, skipping loading region [{}]", region);
listener.onResponse(false);
Expand Down Expand Up @@ -653,8 +666,10 @@ private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, int wri
}

// used by tests
boolean maybeEvictLeastUsed() {
if (cache instanceof LFUCache lfuCache) {
boolean maybeEvict() {
if (cache instanceof LRUCache lruCache) {
return lruCache.maybeEvictLeastRecent();
} else if (cache instanceof LFUCache lfuCache) {
return lfuCache.maybeEvictLeastUsed();
}
return false;
Expand Down Expand Up @@ -1267,7 +1282,274 @@ public record Stats(
public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}

private class LFUCache implements Cache<KeyType, CacheFileRegion> {
private class LRUCache implements Cache<KeyType, CacheFileRegion> {

class LRUCacheList {
volatile LRUCacheEntry head;
LRUCacheEntry tail;
int size;
}

class LRUCacheEntry extends CacheEntry<CacheFileRegion> {
LRUCacheList list;
LRUCacheEntry prev;
LRUCacheEntry next;

LRUCacheEntry(CacheFileRegion chunk) {
super(chunk);
}

void touch() {
if (this != front.head) {
maybePromote(this);
}
}
}

private final ConcurrentHashMap<RegionKey<KeyType>, LRUCacheEntry> keyMapping = new ConcurrentHashMap<>();
private final LRUCacheList front = new LRUCacheList();

@Override
public void close() {
// nothing to close
}

@Override
public LRUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
// try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
// if we did not find an entry
var entry = keyMapping.get(regionKey);
if (entry == null) {
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(regionKey, key -> new LRUCacheEntry(new CacheFileRegion(key, effectiveRegionSize)));
}
// io is volatile, double locking is fine, as long as we assign it last.
if (entry.chunk.io == null) {
synchronized (entry.chunk) {
if (entry.chunk.io == null && entry.chunk.isEvicted() == false) {
return initChunk(entry);
}
}
}
assert assertChunkActiveOrEvicted(entry);

// existing item, check if we need to promote item
if (entry != front.head) {
maybePromote(entry);
}

return entry;
}

@Override
public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
final List<LRUCacheEntry> matchingEntries = new ArrayList<>();
keyMapping.forEach((key, value) -> {
if (cacheKeyPredicate.test(key.file)) {
matchingEntries.add(value);
}
});
var evictedCount = 0;
if (matchingEntries.isEmpty() == false) {
synchronized (SharedBlobCacheService.this) {
for (LRUCacheEntry entry : matchingEntries) {
boolean evicted = entry.chunk.forceEvict();
if (evicted && entry.chunk.io != null) {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
evictedCount++;
}
}
}
}
return evictedCount;
}

private LRUCacheEntry initChunk(LRUCacheEntry entry) {
assert Thread.holdsLock(entry.chunk);
RegionKey<KeyType> regionKey = entry.chunk.regionKey;
if (keyMapping.get(regionKey) != entry) {
throwAlreadyClosed("no free region found (contender)");
}
// new item
assert entry.prev == null;
assert entry.next == null;
final SharedBytes.IO freeSlot = freeRegions.poll();
if (freeSlot != null) {
// no need to evict an item, just add
assignToSlot(entry, freeSlot);
} else {
// need to evict something
SharedBytes.IO io;
synchronized (SharedBlobCacheService.this) {
io = maybeEvictAndTake();
}
if (io == null) {
io = freeRegions.poll();
}
if (io != null) {
assignToSlot(entry, io);
} else {
boolean removed = keyMapping.remove(regionKey, entry);
assert removed;
throwAlreadyClosed("no free region found");
}
}

return entry;
}

private void assignToSlot(LRUCacheEntry entry, SharedBytes.IO freeSlot) {
assert regionOwners.put(freeSlot, entry.chunk) == null;
synchronized (SharedBlobCacheService.this) {
if (entry.chunk.isEvicted()) {
assert regionOwners.remove(freeSlot) == entry.chunk;
freeRegions.add(freeSlot);
keyMapping.remove(entry.chunk.regionKey, entry);
throwAlreadyClosed("evicted during free region allocation");
}
pushEntryToFront(entry);
// assign io only when chunk is ready for use. Under lock to avoid concurrent tryEvict.
entry.chunk.io = freeSlot;
}
}

private void pushEntryToFront(final LRUCacheEntry entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along with pushEntryToMiddle/Back(), this seems like the kind of thing that would be good to explicitly test. (Maybe this is already satisfactorily tested through transitive method calls, I confess I didn't read the entirety of the test files.)

I was thinking something along the lines of starting with an empty list, adding up to two entries, and for each added entry asserting that the pointers are correct.

We'd also want a test that sets the size to a low value, and checks that the entries are pushed to the middle list.

WDYT? Is that overkill? Is this behaviour already tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this should be tested transitively. I think it's preferable to keep the LRUCache classes private if possible. They also happen to be non-static so are tied to an instance of SharedBlobCacheService.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a way for us to not need this kind of lock any longer. Can't we insert
Because it's basically: point new node at current head, set head to new node, but only if current head is still what we just pointed at (CAS)? No need for locking is there?
And then basically precede that whole operation by evicting the last element in the cache in case we insert a new page instead of just cycling a page to the front?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's chat about this again and if it's still relevant once we start talking about using time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking back at past discussions, it seems lock-freedom was a key motivation for going to an LRU so would be good to proof out that this works as part of this (POC?).

assert Thread.holdsLock(SharedBlobCacheService.this);
assert invariant(entry, false);
assert entry.prev == null;
assert entry.next == null;
if (front.head == null) {
front.head = entry;
front.tail = entry;
} else {
entry.next = front.head;
front.head.prev = entry;
front.head = entry;
}
entry.list = front;
++front.size;
assert invariant(entry, true);
}

private synchronized boolean invariant(final LRUCacheEntry e, boolean present) {
boolean found = false;
for (LRUCacheEntry entry = front.head; entry != null; entry = entry.next) {
assert entry.next == null || entry.next.prev == entry;
assert entry.prev == null || entry.prev.next == entry;
if (entry == e) {
found = true;
}
}
for (LRUCacheEntry entry = front.tail; entry != null; entry = entry.prev) {
assert entry.next == null || entry.next.prev == entry;
assert entry.prev == null || entry.prev.next == entry;
if (entry == e) {
found = true;
}
}
assert found == present;
return true;
}

private boolean assertChunkActiveOrEvicted(LRUCacheEntry entry) {
synchronized (SharedBlobCacheService.this) {
// assert linked (or evicted)
assert entry.list != null || entry.chunk.isEvicted();
}
SharedBytes.IO io = entry.chunk.io;
assert io != null || entry.chunk.isEvicted();
assert io == null || regionOwners.get(io) == entry.chunk || entry.chunk.isEvicted();
return true;
}

private void maybePromote(LRUCacheEntry entry) {
synchronized (SharedBlobCacheService.this) {
if (entry != front.head && entry.chunk.isEvicted() == false) {
unlink(entry);
pushEntryToFront(entry);
}
}
}

private void unlink(final LRUCacheEntry entry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about testing this explicitly under the following conditions:

  • entry.list has one entry, which we unlink()
  • entry.list has one 2 entries, and we unlink() the first
  • entry.list has one 2 entries, and we unlink() the last
  • entry.list has one 3 entries, and we unlink() the middle

Maybe this is already done via the other tests in a transitive manner, as I didn't read through all the tests in detail. However, I noticed that neither this function, nor any of the methods that call this function, are under test. (I'm bit paranoid, but trust me I've earned it...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied below.

assert Thread.holdsLock(SharedBlobCacheService.this);
assert invariant(entry, true);
if (entry == entry.list.head) {
entry.list.head = entry.next;
}
if (entry == entry.list.tail) {
entry.list.tail = entry.prev;
}
if (entry.prev != null) {
entry.prev.next = entry.next;
}
if (entry.next != null) {
entry.next.prev = entry.prev;
}
--entry.list.size;
entry.list = null;
entry.next = null;
entry.prev = null;
assert invariant(entry, false);
}

/**
* @return a now free IO region or null if none available.
*/
private SharedBytes.IO maybeEvictAndTake() {
assert Thread.holdsLock(SharedBlobCacheService.this);
LRUCacheEntry entry = front.tail;
if (entry != null) {
boolean evicted = entry.chunk.tryEvictNoDecRef();
if (evicted) {
try {
SharedBytes.IO ioRef = entry.chunk.io;
if (ioRef != null) {
try {
if (entry.chunk.refCount() == 1) {
// we own that one refcount (since we CAS'ed evicted to 1)
// grab io, rely on incref'ers also checking evicted field.
entry.chunk.io = null;
assert regionOwners.remove(ioRef) == entry.chunk;
return ioRef;
}
} finally {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
}
}
} finally {
entry.chunk.decRef();
}
}
}
return null;
}

/**
* This method tries to evict the least recent {@link LRUCacheEntry}.
*
* @return true if an entry was evicted, false otherwise.
*/
public boolean maybeEvictLeastRecent() {
synchronized (SharedBlobCacheService.this) {
LRUCacheEntry entry = front.tail;
if (entry != null) {
boolean evicted = entry.chunk.tryEvict();
if (evicted && entry.chunk.io != null) {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
return true;
}
}
}
return false;
}
}

class LFUCache implements Cache<KeyType, CacheFileRegion> {

class LFUCacheEntry extends CacheEntry<CacheFileRegion> {
LFUCacheEntry prev;
Expand Down
Loading