Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133681.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133681
summary: Remove `DocumentSubsetBitsetCache` locking
area: Authorization
type: bug
issues:
- 132842
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,21 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.lucene.util.BitSets;
import org.elasticsearch.lucene.util.MatchAllBitSet;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
Expand Down Expand Up @@ -81,6 +76,8 @@
*/
public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListener, Closeable, Accountable {

private static final Logger logger = LogManager.getLogger(DocumentSubsetBitsetCache.class);

/**
* The TTL defaults to 2 hours. We default to a large cache size ({@link #CACHE_SIZE_SETTING}), and aggressively
* expire unused entries so that the cache does not hold on to memory unnecessarily.
Expand All @@ -102,40 +99,15 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen

private static final BitSet NULL_MARKER = new FixedBitSet(0);

private static final Logger logger = LogManager.getLogger(DocumentSubsetBitsetCache.class);

/**
* When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
* We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
* {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
* The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
* re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
* but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
*/
private final ReleasableLock cacheEvictionLock;
private final ReleasableLock cacheModificationLock;
private final ExecutorService cleanupExecutor;

private final long maxWeightBytes;
private final Cache<BitsetCacheKey, BitSet> bitsetCache;
private final Map<IndexReader.CacheKey, Set<BitsetCacheKey>> keysByIndex;
private final AtomicLong cacheFullWarningTime;

public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {
this(settings, threadPool.executor(ThreadPool.Names.GENERIC));
}

/**
* @param settings The global settings object for this node
* @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
* it is sometimes necessary to run an asynchronous task to synchronize the internal state.
*/
protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock());
this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock());
this.cleanupExecutor = cleanupExecutor;

public DocumentSubsetBitsetCache(Settings settings) {
final TimeValue ttl = CACHE_TTL_SETTING.get(settings);
this.maxWeightBytes = CACHE_SIZE_SETTING.get(settings).getBytes();
this.bitsetCache = CacheBuilder.<BitsetCacheKey, BitSet>builder()
Expand All @@ -150,8 +122,8 @@ protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupEx
}

@Override
public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
final Set<BitsetCacheKey> keys = keysByIndex.remove(ownerCoreCacheKey);
public void onClose(IndexReader.CacheKey indexKey) {
final Set<BitsetCacheKey> keys = keysByIndex.remove(indexKey);
if (keys != null) {
// Because this Set has been removed from the map, and the only update to the set is performed in a
// Map#compute call, it should not be possible to get a concurrent modification here.
Expand All @@ -163,24 +135,17 @@ public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
* Cleanup (synchronize) the internal state when an object is removed from the primary cache
*/
private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notification) {
final BitsetCacheKey bitsetKey = notification.getKey();
final IndexReader.CacheKey indexKey = bitsetKey.index;
if (keysByIndex.getOrDefault(indexKey, Set.of()).contains(bitsetKey) == false) {
// If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
return;
}
// We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
// simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
// holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
// need to acquire that lock here.
cleanupExecutor.submit(() -> {
try (ReleasableLock ignored = cacheEvictionLock.acquire()) {
// it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
if (bitsetCache.get(bitsetKey) == null) {
// key is no longer in the cache, make sure it is no longer in the lookup map either.
Optional.ofNullable(keysByIndex.get(indexKey)).ifPresent(set -> set.remove(bitsetKey));
}
}
final BitsetCacheKey cacheKey = notification.getKey();
final IndexReader.CacheKey indexKey = cacheKey.indexKey;
// the key is *probably* no longer in the cache, so make sure it is no longer in the lookup map.
// note: rather than locking (which destroys our throughput), we're erring on the side of tidying the keysByIndex
// structure even if some other racing thread has already added a new bitset into the cache for this same key.
// the keysByIndex structure is used in onClose (our notification from lucene that a segment has become inaccessible),
// so we might end up failing to *eagerly* invalidate a bitset -- the consequence of that would be temporarily higher
// memory use (the bitset will not be accessed, and it will still be invalidated eventually for size or ttl reasons).
keysByIndex.computeIfPresent(indexKey, (ignored, keys) -> {
keys.remove(cacheKey);
return keys.isEmpty() ? null : keys;
});
}

Expand Down Expand Up @@ -231,41 +196,39 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
final IndexReader.CacheKey indexKey = coreCacheHelper.getKey();
final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query);

try (ReleasableLock ignored = cacheModificationLock.acquire()) {
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, set) -> {
if (set == null) {
set = ConcurrentCollections.newConcurrentSet();
}
set.add(cacheKey);
return set;
});
final BitSet result = computeBitSet(query, context);
if (result == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
}
final long bitSetBytes = result.ramBytesUsed();
if (bitSetBytes > this.maxWeightBytes) {
logger.warn(
"built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
+ " this object cannot be cached and will need to be rebuilt for each use;"
+ " consider increasing the value of [{}]",
bitSetBytes,
maxWeightBytes,
CACHE_SIZE_SETTING.getKey()
);
} else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) {
maybeLogCacheFullWarning();
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, keys) -> {
if (keys == null) {
keys = ConcurrentCollections.newConcurrentSet();
}
return result;
keys.add(cacheKey);
return keys;
});
if (bitSet == NULL_MARKER) {
return null;
} else {
return bitSet;
final BitSet result = computeBitSet(query, context);
if (result == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
}
final long bitSetBytes = result.ramBytesUsed();
if (bitSetBytes > this.maxWeightBytes) {
logger.warn(
"built a DLS BitSet that uses [{}] bytes; the DLS BitSet cache has a maximum size of [{}] bytes;"
+ " this object cannot be cached and will need to be rebuilt for each use;"
+ " consider increasing the value of [{}]",
bitSetBytes,
maxWeightBytes,
CACHE_SIZE_SETTING.getKey()
);
} else if (bitSetBytes + bitsetCache.weight() > maxWeightBytes) {
maybeLogCacheFullWarning();
}
return result;
});
if (bitSet == NULL_MARKER) {
return null;
} else {
return bitSet;
}
}

Expand Down Expand Up @@ -323,11 +286,11 @@ public Map<String, Object> usageStats() {
}

private static class BitsetCacheKey {
final IndexReader.CacheKey index;
final IndexReader.CacheKey indexKey;
final Query query;

private BitsetCacheKey(IndexReader.CacheKey index, Query query) {
this.index = index;
private BitsetCacheKey(IndexReader.CacheKey indexKey, Query query) {
this.indexKey = indexKey;
this.query = query;
}

Expand All @@ -340,41 +303,59 @@ public boolean equals(Object other) {
return false;
}
final BitsetCacheKey that = (BitsetCacheKey) other;
return Objects.equals(this.index, that.index) && Objects.equals(this.query, that.query);
return Objects.equals(this.indexKey, that.indexKey) && Objects.equals(this.query, that.query);
}

@Override
public int hashCode() {
return Objects.hash(index, query);
return Objects.hash(indexKey, query);
}

@Override
public String toString() {
return getClass().getSimpleName() + "(" + index + "," + query + ")";
return getClass().getSimpleName() + "(" + indexKey + "," + query + ")";
}
}

/**
* This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
* another. This method is only called by tests.
* This test-only method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent
* with one another.
*/
// visible for testing
void verifyInternalConsistency() {
this.bitsetCache.keys().forEach(bck -> {
final Set<BitsetCacheKey> set = this.keysByIndex.get(bck.index);
if (set == null) {
throw new IllegalStateException(
"Key [" + bck + "] is in the cache, but there is no entry for [" + bck.index + "] in the lookup map"
);
}
if (set.contains(bck) == false) {
verifyInternalConsistencyCacheToKeys();
verifyInternalConsistencyKeysToCache();
}

/**
* This test-only method iterates over the {@link #bitsetCache} and checks that {@link #keysByIndex} is consistent with it.
*/
// visible for testing
void verifyInternalConsistencyCacheToKeys() {
bitsetCache.keys().forEach(cacheKey -> {
final Set<BitsetCacheKey> keys = keysByIndex.get(cacheKey.indexKey);
if (keys == null || keys.contains(cacheKey) == false) {
throw new IllegalStateException(
"Key [" + bck + "] is in the cache, but the lookup entry for [" + bck.index + "] does not contain that key"
"Key [" + cacheKey + "] is in the cache, but the lookup entry for [" + cacheKey.indexKey + "] does not contain that key"
);
}
});
this.keysByIndex.values().stream().flatMap(Set::stream).forEach(bck -> {
if (this.bitsetCache.get(bck) == null) {
throw new IllegalStateException("Key [" + bck + "] is in the lookup map, but is not in the cache");
}

/**
* This test-only method iterates over the {@link #keysByIndex} and checks that {@link #bitsetCache} is consistent with it.
*/
// visible for testing
void verifyInternalConsistencyKeysToCache() {
keysByIndex.forEach((indexKey, keys) -> {
if (keys == null || keys.isEmpty()) {
throw new IllegalStateException("The lookup entry for [" + indexKey + "] is null or empty");
} else {
keys.forEach(cacheKey -> {
if (bitsetCache.get(cacheKey) == null) {
throw new IllegalStateException("Key [" + cacheKey + "] is in the lookup map, but is not in the cache");
}
});
}
});
}
Expand Down
Loading