Add cancellation support to IndicesRequestCache#141708
Add cancellation support to IndicesRequestCache#141708drempapis wants to merge 29 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
…/elasticsearch into fix/cache-cancellation-support
| * @throws TaskCancelledException if the operation was cancelled | ||
| */ | ||
| private static <T> T blockOnFuture(CompletableFuture<T> future, Consumer<Runnable> cancellationRegistrar) throws ExecutionException, | ||
| InterruptedException { |
There was a problem hiding this comment.
Java doc mentions TaskCancelledException being thrown, but the definition doesn't.
There was a problem hiding this comment.
That's true, removed
| private void cleanupFailedFuture(CacheSegment segment, K key, CompletableFuture<Entry<K, V>> future) { | ||
| segment.writeLock.lock(); | ||
| try { | ||
| if (segment.map != null && segment.map.get(key) == future) { |
There was a problem hiding this comment.
I think that segment.map.get(key) != future deserves some handling. It's not expected, but maybe at least a log.
There was a problem hiding this comment.
I added a debug log when the key maps to a different future: Skipped cleanup for key [] because the future was replaced.
Do you think that it is enough? Do you suggest doing something else here?
| if (cancellationRegistrar != null) { | ||
| cancellationRegistrar.accept(() -> { | ||
| cancelled.set(true); | ||
| latch.countDown(); |
There was a problem hiding this comment.
I'm a bit worried about this if. This assumes that a cancellationRegistrar != null is passed if a task can be cancelled, but doesn't enforce it. If a task is cancelled for a future when cancellationRegistrar is null, this with be a deadlock.
A safer way to do this will be to add the latch.countDown(); regardless of the value of cancellationRegistrar.
There was a problem hiding this comment.
The code is structured as follows
future.whenComplete((value, throwable) -> {
if (throwable != null) {
error.set(throwable);
} else {
result.set(value);
}
latch.countDown();
});
if (cancellationRegistrar != null) {
cancellationRegistrar.accept(() -> {
cancelled.set(true);
latch.countDown();
});
}
The whenComplete callback, which always calls latch.countDown(), is registered before the cancellationRegistrar check. The latch is always released when the future completes, regardless of whether a registrar is provided.
When cancellationRegistrar == null, the waiting thread cannot exit early if its task is cancelled. It will remain blocked until the computation finishes. This is by design, not a deadlock.
Add latch.countDown() regardless of cancellationRegistrar would immediately count down the latch causing latch.await() to return instantly with no result.
I've added a test to prove that when cancellationRegistrar is null, the thread is not deadlocked, it simply cannot exit early and must wait for the future to complete.
| * @throws InterruptedException if the thread was interrupted | ||
| * @throws TaskCancelledException if the operation was cancelled | ||
| */ | ||
| private static <T> T blockOnFuture(CompletableFuture<T> future, Consumer<Runnable> cancellationRegistrar) throws ExecutionException, |
There was a problem hiding this comment.
Because this is a static method, and has locking, I think it would be nice to test it in CacheTests.java.
There was a problem hiding this comment.
+1, added more tests
…/elasticsearch into fix/cache-cancellation-support
Related github issue #108703
The problem
When expensive queries fill up the search thread pool, threads can become blocked in
IndicesRequestCache.getOrComputewaiting for other threads to compute cached results. If these queries are cancelled, the waiting threads don't react to the cancellation and continue blocking indefinitely. This can lead to search thread pool exhaustion, requiring node restarts to recover.This PR adds cancellation support to the cache's blocking operations, allowing waiting threads to be notified when their task is cancelled. However, this PR does not prevent the search pool from filling with blocking tasks. A follow-up pr will follow this one, changing the cache to use
SubscribableListenerfor a complete async solution. I haven't worked on it here for simplicity and to make the code updates discrete.