Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4c2c7d3
add cache cancellation support
drempapis Feb 3, 2026
8c0fae1
light updates
drempapis Feb 3, 2026
9bf1a41
[CI] Auto commit changes from spotless
Feb 3, 2026
be5e2a4
Merge branch 'main' into fix/cache-cancellation-support
drempapis Feb 3, 2026
5acb50d
update for checkstyle
drempapis Feb 3, 2026
4e37fbe
Merge branch 'main' into fix/cache-cancellation-support
drempapis Feb 3, 2026
635cc8c
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 13, 2026
5793981
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 13, 2026
1ac60de
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 13, 2026
121f39b
update code
drempapis Mar 13, 2026
a0e033f
[CI] Auto commit changes from spotless
Mar 13, 2026
16f92d8
update after review
drempapis Mar 13, 2026
b4ace63
Merge branch 'fix/cache-cancellation-support' of github.com:drempapis…
drempapis Mar 13, 2026
7ebc399
update after review
drempapis Mar 13, 2026
05de3e3
[CI] Auto commit changes from spotless
Mar 13, 2026
104060a
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 13, 2026
8bf32ae
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 16, 2026
914d73e
fix test
drempapis Mar 16, 2026
9eefc0a
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 16, 2026
6e997e6
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 19, 2026
90250dc
update after review
drempapis Mar 19, 2026
d34ad03
update after review
drempapis Mar 20, 2026
4dc4309
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 20, 2026
728fbb6
add tests
drempapis Mar 20, 2026
2a85c37
Merge branch 'fix/cache-cancellation-support' of github.com:drempapis…
drempapis Mar 20, 2026
0ee7cad
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 20, 2026
b4bd167
[CI] Auto commit changes from spotless
Mar 20, 2026
1530d72
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 21, 2026
6f33bc9
Merge branch 'main' into fix/cache-cancellation-support
drempapis Mar 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 132 additions & 47 deletions server/src/main/java/org/elasticsearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@
package org.elasticsearch.common.cache;

import org.elasticsearch.core.Tuple;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.tasks.TaskCancelledException;

import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.ToLongBiFunction;

Expand Down Expand Up @@ -58,6 +63,7 @@
* @param <V> The type of the values
*/
public class Cache<K, V> {
private static final Logger logger = LogManager.getLogger(Cache.class);

private final LongAdder hits = new LongAdder();

Expand Down Expand Up @@ -187,15 +193,17 @@ private final class CacheSegment {
Map<K, CompletableFuture<Entry<K, V>>> map;

/**
* get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
* pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
* get an entry from the segment with cancellation support; expired entries will be returned as null but not removed from the
* cache until the LRU list is pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using
* the provided callback
*
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @param eagerEvict whether entries should be eagerly evicted on expiration
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @param eagerEvict whether entries should be eagerly evicted on expiration
* @param cancellationRegistrar if non-null, accepts a Runnable to be called on cancellation
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now, boolean eagerEvict) {
Entry<K, V> get(K key, long now, boolean eagerEvict, Consumer<Runnable> cancellationRegistrar) {
CompletableFuture<Entry<K, V>> future;
readLock.lock();
try {
Expand All @@ -206,12 +214,12 @@ Entry<K, V> get(K key, long now, boolean eagerEvict) {
if (future != null) {
Entry<K, V> entry;
try {
entry = future.get();
entry = blockOnFuture(future, cancellationRegistrar);
} catch (ExecutionException e) {
assert future.isCompletedExceptionally();
misses.increment();
return null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
if (isExpired(entry, now)) {
Expand Down Expand Up @@ -335,6 +343,57 @@ void remove(K key, V value, boolean notify) {

}

/**
* Block on a CompletableFuture with cancellation support.
*
* @param future the future to wait on
* @param cancellationRegistrar if non-null, accepts a Runnable to be called on cancellation
* @return the result of the future
*
* @throws ExecutionException if the future completed exceptionally
* @throws InterruptedException if the thread was interrupted
*/
private static <T> T blockOnFuture(CompletableFuture<T> future, Consumer<Runnable> cancellationRegistrar) throws ExecutionException,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is a static method, and has locking, I think it would be nice to test it in CacheTests.java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, added more tests

InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doc mentions TaskCancelledException being thrown, but the definition doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, removed

if (future.isDone()) {
return future.get();
}

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<T> result = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean cancelled = new AtomicBoolean(false);

future.whenComplete((value, throwable) -> {
if (throwable != null) {
error.set(throwable);
} else {
result.set(value);
}
latch.countDown();
});

if (cancellationRegistrar != null) {
cancellationRegistrar.accept(() -> {
cancelled.set(true);
latch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this if. This assumes that a cancellationRegistrar != null is passed if a task can be cancelled, but doesn't enforce it. If a task is cancelled for a future when cancellationRegistrar is null, this with be a deadlock.
A safer way to do this will be to add the latch.countDown(); regardless of the value of cancellationRegistrar.

Copy link
Contributor Author

@drempapis drempapis Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is structured as follows

 future.whenComplete((value, throwable) -> {
            if (throwable != null) {
                error.set(throwable);
            } else {
                result.set(value);
            }
            latch.countDown();
        });

        if (cancellationRegistrar != null) {
            cancellationRegistrar.accept(() -> {
                cancelled.set(true);
                latch.countDown();
            });
        }

The whenComplete callback, which always calls latch.countDown(), is registered before the cancellationRegistrar check. The latch is always released when the future completes, regardless of whether a registrar is provided.

When cancellationRegistrar == null, the waiting thread cannot exit early if its task is cancelled. It will remain blocked until the computation finishes. This is by design, not a deadlock.

Add latch.countDown() regardless of cancellationRegistrar would immediately count down the latch causing latch.await() to return instantly with no result.

I've added a test to prove that when cancellationRegistrar is null, the thread is not deadlocked, it simply cannot exit early and must wait for the future to complete.

});
}

latch.await();

if (future.isDone()) {
return future.get();
}
if (cancelled.get()) {
throw new TaskCancelledException("Cache wait cancelled");
}
if (error.get() != null) {
throw new ExecutionException(error.get());
}
return result.get();
}

public static final int NUMBER_OF_SEGMENTS = 256;
@SuppressWarnings("unchecked")
private final CacheSegment[] segments = (CacheSegment[]) Array.newInstance(CacheSegment.class, NUMBER_OF_SEGMENTS);
Expand Down Expand Up @@ -362,8 +421,12 @@ public V get(K key) {
}

private V get(K key, long now, boolean eagerEvict) {
return get(key, now, eagerEvict, null);
}

private V get(K key, long now, boolean eagerEvict, Consumer<Runnable> cancellationRegistrar) {
CacheSegment segment = getCacheSegment(key);
Entry<K, V> entry = segment.get(key, now, eagerEvict);
Entry<K, V> entry = segment.get(key, now, eagerEvict, cancellationRegistrar);
if (entry == null) {
return null;
} else {
Expand All @@ -387,9 +450,27 @@ private V get(K key, long now, boolean eagerEvict) {
* @throws ExecutionException thrown if loader throws an exception or returns a null value
*/
public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionException {
return computeIfAbsent(key, loader, null);
}

/**
* This variant supports cancellation - if a cancellation callback is provided and triggered while waiting for
* another thread to compute the value, a TaskCancelledException will be thrown.
* <p>
* Waiting can happen at multiple points:
* <ul>
* <li>during the initial eager lookup when another thread already has an in-flight computation for the key, and</li>
* <li>after this thread loses the put-if-absent race and must wait on the winner's computation.</li>
* </ul>
*
* @param cancellationRegistrar if non-null, accepts a Runnable to be called when this wait should be cancelled
* @throws TaskCancelledException thrown if the operation is cancelled at any cache wait point
*/
public V computeIfAbsent(K key, CacheLoader<K, V> loader, Consumer<Runnable> cancellationRegistrar) throws ExecutionException {
long now = now();
// we have to eagerly evict expired entries or our putIfAbsent call below will fail
V value = get(key, now, true);
// this can block on an existing in-flight computation and may throw TaskCancelledException
V value = get(key, now, true, cancellationRegistrar);
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
Expand All @@ -410,61 +491,65 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
segment.writeLock.unlock();
}

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
promote(ok, now);
return ok.value;
} else {
segment.writeLock.lock();
try {
CompletableFuture<Entry<K, V>> sanity = segment.map == null ? null : segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
if (segment.map.isEmpty()) {
segment.map = null;
}
}
} finally {
segment.writeLock.unlock();
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
final boolean isComputing = (future == null);
if (isComputing) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
cleanupFailedFuture(segment, key, future);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
cleanupFailedFuture(segment, key, future);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
Entry<K, V> entry = new Entry<>(key, loaded, now);
future.complete(entry);
promote(entry, now);
return loaded;
} else {
completableValue = future.handle(handler);
try {
Entry<K, V> entry = blockOnFuture(future, cancellationRegistrar);
if (entry == null) {
future.get();
throw new IllegalStateException("future completed exceptionally but no exception thrown");
}
promote(entry, now);
return entry.value;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ExecutionException(e);
}
}
}
return value;
}

try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
/**
* Clean up a failed future from the segment map.
*/
private void cleanupFailedFuture(CacheSegment segment, K key, CompletableFuture<Entry<K, V>> future) {
segment.writeLock.lock();
try {
if (segment.map != null) {
CompletableFuture<Entry<K, V>> current = segment.map.get(key);
if (current == future) {
segment.map.remove(key);
if (segment.map.isEmpty()) {
segment.map = null;
}
} else if (current != null) {
logger.debug("Skipped cleanup for key [{}] because the future was replaced", key);
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
} finally {
segment.writeLock.unlock();
}
return value;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

/**
* The indices request cache allows to cache a shard level request stage responses, helping with improving
Expand Down Expand Up @@ -97,18 +98,46 @@ void clear(CacheEntity entity) {
cleanCache();
}

/**
* Get or compute a cache entry without cancellation support (backward compatible).
*/
BytesReference getOrCompute(
CacheEntity cacheEntity,
CheckedSupplier<BytesReference, IOException> loader,
MappingLookup.CacheKey mappingCacheKey,
DirectoryReader reader,
BytesReference cacheKey
) throws Exception {
return getOrCompute(cacheEntity, loader, mappingCacheKey, reader, cacheKey, null);
}

/**
* Get or compute a cache entry with cancellation support.
*
* @param cacheEntity the cache entity
* @param loader the loader to compute the value if not cached
* @param mappingCacheKey the mapping cache key
* @param reader the directory reader
* @param cacheKey the cache key
* @param cancellationRegistrar if non-null, accepts a Runnable to be called when the operation should be cancelled.
* This allows waiting threads to be notified instantly when their task is cancelled,
* rather than polling.
* @return the cached or computed value
* @throws Exception if the computation fails or the operation is cancelled
*/
BytesReference getOrCompute(
CacheEntity cacheEntity,
CheckedSupplier<BytesReference, IOException> loader,
MappingLookup.CacheKey mappingCacheKey,
DirectoryReader reader,
BytesReference cacheKey,
Consumer<Runnable> cancellationRegistrar
) throws Exception {
final ESCacheHelper cacheHelper = ElasticsearchDirectoryReader.getESReaderCacheHelper(reader);
assert cacheHelper != null;
final Key key = new Key(cacheEntity, mappingCacheKey, cacheHelper.getKey(), cacheKey);
Loader cacheLoader = new Loader(cacheEntity, loader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader);
BytesReference value = cache.computeIfAbsent(key, cacheLoader, cancellationRegistrar);
if (cacheLoader.isLoaded()) {
key.entity.onMiss();
// see if its the first time we see this reader, and make sure to register a cleanup key
Expand Down
Loading
Loading