Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -49,8 +52,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongSupplier;

/**
* This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
Expand Down Expand Up @@ -120,18 +125,27 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
private final Cache<BitsetCacheKey, BitSet> bitsetCache;
private final Map<IndexReader.CacheKey, Set<BitsetCacheKey>> keysByIndex;
private final AtomicLong cacheFullWarningTime;
private final AtomicLong hitsTimeTakenNanos;
private final AtomicLong missesTimeTakenNanos;
Copy link
Contributor

@joegallo joegallo Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://en.wikipedia.org/wiki/Rectification_of_names -- do not express individuality, make your thing like the other things.

joegallo@simulacron:~/Code/elastic/elasticsearch $ git grep -E '(hitsTime|missesTime)' | grep Nanos | grep 'private final'
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java:    private final AtomicLong hitsTimeInNanos = new AtomicLong(0);
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java:    private final AtomicLong missesTimeInNanos = new AtomicLong(0);
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java:    private final AtomicLong hitsTimeInNanos = new AtomicLong(0);
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java:    private final AtomicLong missesTimeInNanos = new AtomicLong(0);

private final LongSupplier relativeNanoTimeProvider;

public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {
this(settings, threadPool.executor(ThreadPool.Names.GENERIC));
this(settings, threadPool.executor(ThreadPool.Names.GENERIC), System::nanoTime);
}

// visible for testing
DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
this(settings, cleanupExecutor, System::nanoTime);
}

/**
* @param settings The global settings object for this node
* @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
* it is sometimes necessary to run an asynchronous task to synchronize the internal state.
* @param relativeNanoTimeProvider Provider of nanos for code that needs to measure relative time.
*/
// visible for testing
DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor, LongSupplier relativeNanoTimeProvider) {
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock());
this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock());
Expand All @@ -148,6 +162,9 @@ public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {

this.keysByIndex = new ConcurrentHashMap<>();
this.cacheFullWarningTime = new AtomicLong(0);
this.hitsTimeTakenNanos = new AtomicLong();
this.missesTimeTakenNanos = new AtomicLong();
this.relativeNanoTimeProvider = Objects.requireNonNull(relativeNanoTimeProvider);
}

@Override
Expand Down Expand Up @@ -220,6 +237,8 @@ public long ramBytesUsed() {
*/
@Nullable
public BitSet getBitSet(final Query query, final LeafReaderContext context) throws ExecutionException {
final long startTime = relativeNanoTimeProvider.getAsLong();

final IndexReader.CacheHelper coreCacheHelper = context.reader().getCoreCacheHelper();
if (coreCacheHelper == null) {
try {
Expand All @@ -233,7 +252,9 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query);

try (ReleasableLock ignored = cacheModificationLock.acquire()) {
final AtomicBoolean cacheKeyWasPresent = new AtomicBoolean(true);
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
cacheKeyWasPresent.set(false);
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, set) -> {
if (set == null) {
Expand Down Expand Up @@ -262,6 +283,11 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
}
return result;
});
if (cacheKeyWasPresent.get()) {
hitsTimeTakenNanos.addAndGet(relativeNanoTimeProvider.getAsLong() - startTime);
} else {
missesTimeTakenNanos.addAndGet(relativeNanoTimeProvider.getAsLong() - startTime);
}
if (bitSet == NULL_MARKER) {
return null;
} else {
Expand Down Expand Up @@ -320,7 +346,18 @@ public static List<Setting<?>> getSettings() {

public Map<String, Object> usageStats() {
final ByteSizeValue ram = ByteSizeValue.ofBytes(ramBytesUsed());
return Map.of("count", entryCount(), "memory", ram.toString(), "memory_in_bytes", ram.getBytes());
final Cache.Stats bitsetCacheStats = bitsetCache.stats();

final HashMap<String, Object> stats = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use Map (the interface) as the type here on the LHS, rather than HashMap (the concrete superclass of LinkedHashMap).

stats.put("count", entryCount());
stats.put("memory", ram.toString());
stats.put("memory_in_bytes", ram.getBytes());
stats.put("hits", bitsetCacheStats.getHits());
stats.put("misses", bitsetCacheStats.getMisses());
stats.put("evictions", bitsetCacheStats.getEvictions());
stats.put("hits_time_in_millis", TimeValue.nsecToMSec(hitsTimeTakenNanos.get()));
stats.put("misses_time_in_millis", TimeValue.nsecToMSec(missesTimeTakenNanos.get()));
return Collections.unmodifiableMap(stats);
}

private static final class BitsetCacheKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,9 +62,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -396,9 +400,9 @@ public void testCacheUnderConcurrentAccess() throws Exception {
cache.verifyInternalConsistency();

// Due to cache evictions, we must get more bitsets than fields
assertThat(uniqueBitSets.size(), Matchers.greaterThan(FIELD_COUNT));
assertThat(uniqueBitSets.size(), greaterThan(FIELD_COUNT));
// Due to cache evictions, we must have seen more bitsets than the cache currently holds
assertThat(uniqueBitSets.size(), Matchers.greaterThan(cache.entryCount()));
assertThat(uniqueBitSets.size(), greaterThan(cache.entryCount()));
// Even under concurrent pressure, the cache should hit the expected size
assertThat(cache.entryCount(), is(maxCacheCount));
assertThat(cache.ramBytesUsed(), is(maxCacheBytes));
Expand Down Expand Up @@ -517,6 +521,58 @@ public void testEquivalentMatchAllDocsQuery() {
assertFalse(DocumentSubsetBitsetCache.isEffectiveMatchAllDocsQuery(new TermQuery(new Term("term"))));
}

public void testHitsMissesAndEvictionsStats() throws Exception {
// Create cache with small size and TTL to test evictions
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + (EXPECTED_BYTES_PER_BIT_SET / 2);
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
final DocumentSubsetBitsetCache cache = newCache(settings);

final Map<String, Object> expectedStats = new LinkedHashMap<>();
expectedStats.put("count", 0);
expectedStats.put("memory", "0b");
expectedStats.put("memory_in_bytes", 0L);
expectedStats.put("hits", 0L);
expectedStats.put("misses", 0L);
expectedStats.put("evictions", 0L);
expectedStats.put("hits_time_in_millis", 0L);
expectedStats.put("misses_time_in_millis", 0L);
assertThat(cache.usageStats(), equalTo(expectedStats));

runTestOnIndex((searchExecutionContext, leafContext) -> {
// first lookup - miss
final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
assertThat(bitSet1, notNullValue());

// second same lookup - hit
final BitSet bitSet1Again = cache.getBitSet(query1, leafContext);
assertThat(bitSet1Again, sameInstance(bitSet1));

expectedStats.put("hits", 1L);
expectedStats.put("misses", 1L);
expectedStats.put("count", 1);
expectedStats.put("memory", EXPECTED_BYTES_PER_BIT_SET + "b");
expectedStats.put("memory_in_bytes", EXPECTED_BYTES_PER_BIT_SET);
expectedStats.put("hits_time_in_millis", 1L);
expectedStats.put("misses_time_in_millis", 1L);
assertThat(cache.usageStats(), equalTo(expectedStats));

// second query - miss, should evict the first one
final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
assertThat(bitSet2, notNullValue());

// Check eviction stats
// todo(sz): bug with invalidation making a get call on a invalidation thread?
expectedStats.put("misses", 3L);
expectedStats.put("evictions", 1L);
expectedStats.put("misses_time_in_millis", 2L);
assertBusy(() -> { assertThat(cache.usageStats(), equalTo(expectedStats)); }, 200, TimeUnit.MILLISECONDS);
});
}

private void runTestOnIndex(CheckedBiConsumer<SearchExecutionContext, LeafReaderContext, Exception> body) throws Exception {
runTestOnIndices(1, ctx -> {
final TestIndexContext indexContext = ctx.get(0);
Expand Down Expand Up @@ -636,7 +692,8 @@ private void runTestOnIndices(int numberIndices, CheckedConsumer<List<TestIndexC
}

private DocumentSubsetBitsetCache newCache(Settings settings) {
return new DocumentSubsetBitsetCache(settings, singleThreadExecutor);
final AtomicLong increasingMillisTime = new AtomicLong();
final LongSupplier relativeNanoTimeSupplier = () -> TimeUnit.MILLISECONDS.toNanos(increasingMillisTime.getAndIncrement());
return new DocumentSubsetBitsetCache(settings, singleThreadExecutor, relativeNanoTimeSupplier);
}

}