Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133681.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133681
summary: Remove `DocumentSubsetBitsetCache` locking
area: Authorization
type: bug
issues:
- 132842
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.lucene.util.BitSets;
import org.elasticsearch.lucene.util.MatchAllBitSet;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -45,15 +43,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -108,18 +103,6 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen

private static final BitSet NULL_MARKER = new FixedBitSet(0);

/**
* When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
* We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
* {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
* The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
* re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
* but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
*/
private final ReleasableLock cacheEvictionLock;
private final ReleasableLock cacheModificationLock;
private final ExecutorService cleanupExecutor;

private final long maxWeightBytes;
private final Cache<BitsetCacheKey, BitSet> bitsetCache;
private final Map<IndexReader.CacheKey, Set<BitsetCacheKey>> keysByIndex;
Expand All @@ -128,28 +111,16 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
private final LongAdder hitsTimeInNanos = new LongAdder();
private final LongAdder missesTimeInNanos = new LongAdder();

public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {
this(settings, threadPool.executor(ThreadPool.Names.GENERIC));
}

// visible for testing
DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
this(settings, cleanupExecutor, System::nanoTime);
public DocumentSubsetBitsetCache(Settings settings) {
this(settings, System::nanoTime);
}

/**
* @param settings The global settings object for this node
* @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
* it is sometimes necessary to run an asynchronous task to synchronize the internal state.
* @param relativeNanoTimeProvider Provider of nanos for code that needs to measure relative time.
*/
// visible for testing
DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor, LongSupplier relativeNanoTimeProvider) {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock());
this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock());
this.cleanupExecutor = cleanupExecutor;

DocumentSubsetBitsetCache(Settings settings, LongSupplier relativeNanoTimeProvider) {
final TimeValue ttl = CACHE_TTL_SETTING.get(settings);
this.maxWeightBytes = CACHE_SIZE_SETTING.get(settings).getBytes();
this.bitsetCache = CacheBuilder.<BitsetCacheKey, BitSet>builder()
Expand Down Expand Up @@ -180,22 +151,15 @@ public void onClose(IndexReader.CacheKey indexKey) {
private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notification) {
final BitsetCacheKey cacheKey = notification.getKey();
final IndexReader.CacheKey indexKey = cacheKey.indexKey;
if (keysByIndex.getOrDefault(indexKey, Set.of()).contains(cacheKey) == false) {
// If the cacheKey isn't in the lookup map, then there's nothing to synchronize
return;
}
// We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
// simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
// holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write lock ("eviction"), but we
// need to acquire that lock here.
cleanupExecutor.submit(() -> {
try (ReleasableLock ignored = cacheEvictionLock.acquire()) {
// it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
if (bitsetCache.get(cacheKey) == null) {
// key is no longer in the cache, make sure it is no longer in the lookup map either.
Optional.ofNullable(keysByIndex.get(indexKey)).ifPresent(set -> set.remove(cacheKey));
}
}
// the key is *probably* no longer in the cache, so make sure it is no longer in the lookup map.
// note: rather than locking (which destroys our throughput), we're erring on the side of tidying the keysByIndex
// structure even if some other racing thread has already added a new bitset into the cache for this same key.
// the keysByIndex structure is used in onClose (our notification from lucene that a segment has become inaccessible),
// so we might end up failing to *eagerly* invalidate a bitset -- the consequence of that would be temporarily higher
// memory use (the bitset will not be accessed, and it will still be invalidated eventually for size or ttl reasons).
keysByIndex.computeIfPresent(indexKey, (ignored, keys) -> {
keys.remove(cacheKey);
return keys.isEmpty() ? null : keys;
});
}

Expand Down Expand Up @@ -248,48 +212,46 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
final IndexReader.CacheKey indexKey = coreCacheHelper.getKey();
final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query);

try (ReleasableLock ignored = cacheModificationLock.acquire()) {
final boolean[] cacheKeyWasPresent = new boolean[] { true };
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
cacheKeyWasPresent[0] = false;
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, set) -> {
if (set == null) {
set = ConcurrentCollections.newConcurrentSet();
}
set.add(cacheKey);
return set;
});
final BitSet result = computeBitSet(query, context);
if (result == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
final boolean[] cacheKeyWasPresent = new boolean[] { true };
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
cacheKeyWasPresent[0] = false;
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, keys) -> {
if (keys == null) {
keys = ConcurrentCollections.newConcurrentSet();
}
final long bitSetBytes = result.ramBytesUsed();
if (bitSetBytes > this.maxWeightBytes) {
logger.warn(
"built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
+ " this object cannot be cached and will need to be rebuilt for each use;"
+ " consider increasing the value of [{}]",
bitSetBytes,
maxWeightBytes,
CACHE_SIZE_SETTING.getKey()
);
} else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) {
maybeLogCacheFullWarning();
}
return result;
keys.add(cacheKey);
return keys;
});
if (cacheKeyWasPresent[0]) {
hitsTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart);
} else {
missesTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart);
final BitSet result = computeBitSet(query, context);
if (result == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
}
if (bitSet == NULL_MARKER) {
return null;
} else {
return bitSet;
final long bitSetBytes = result.ramBytesUsed();
if (bitSetBytes > this.maxWeightBytes) {
logger.warn(
"built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
+ " this object cannot be cached and will need to be rebuilt for each use;"
+ " consider increasing the value of [{}]",
bitSetBytes,
maxWeightBytes,
CACHE_SIZE_SETTING.getKey()
);
} else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) {
maybeLogCacheFullWarning();
}
return result;
});
if (cacheKeyWasPresent[0]) {
hitsTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart);
} else {
missesTimeInNanos.add(relativeNanoTimeProvider.getAsLong() - cacheStart);
}
if (bitSet == NULL_MARKER) {
return null;
} else {
return bitSet;
}
}

Expand Down Expand Up @@ -402,26 +364,44 @@ public String toString() {
}

/**
* This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
* another. This method is only called by tests.
* This test-only method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent
* with one another.
*/
// visible for testing
void verifyInternalConsistency() {
this.bitsetCache.keys().forEach(bck -> {
final Set<BitsetCacheKey> set = this.keysByIndex.get(bck.indexKey);
if (set == null) {
throw new IllegalStateException(
"Key [" + bck + "] is in the cache, but there is no entry for [" + bck.indexKey + "] in the lookup map"
);
}
if (set.contains(bck) == false) {
verifyInternalConsistencyCacheToKeys();
verifyInternalConsistencyKeysToCache();
}

/**
* This test-only method iterates over the {@link #bitsetCache} and checks that {@link #keysByIndex} is consistent with it.
*/
// visible for testing
void verifyInternalConsistencyCacheToKeys() {
bitsetCache.keys().forEach(cacheKey -> {
final Set<BitsetCacheKey> keys = keysByIndex.get(cacheKey.indexKey);
if (keys == null || keys.contains(cacheKey) == false) {
throw new IllegalStateException(
"Key [" + bck + "] is in the cache, but the lookup entry for [" + bck.indexKey + "] does not contain that key"
"Key [" + cacheKey + "] is in the cache, but the lookup entry for [" + cacheKey.indexKey + "] does not contain that key"
);
}
});
this.keysByIndex.values().stream().flatMap(Set::stream).forEach(bck -> {
if (this.bitsetCache.get(bck) == null) {
throw new IllegalStateException("Key [" + bck + "] is in the lookup map, but is not in the cache");
}

/**
* This test-only method iterates over the {@link #keysByIndex} and checks that {@link #bitsetCache} is consistent with it.
*/
// visible for testing
void verifyInternalConsistencyKeysToCache() {
keysByIndex.forEach((indexKey, keys) -> {
if (keys == null || keys.isEmpty()) {
throw new IllegalStateException("The lookup entry for [" + indexKey + "] is null or empty");
} else {
keys.forEach(cacheKey -> {
if (bitsetCache.get(cacheKey) == null) {
throw new IllegalStateException("Key [" + cacheKey + "] is in the lookup map, but is not in the cache");
}
});
}
});
}
Expand Down
Loading