Skip to content
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
f4a1a81
AsyncSearch: make MutableSearchResponse ref-counted
drempapis Sep 9, 2025
025e9e3
Update docs/changelog/134359.yaml
drempapis Sep 9, 2025
47f554e
[CI] Auto commit changes from spotless
Sep 9, 2025
4a2cdc0
apply spot
drempapis Sep 9, 2025
e3b7c64
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Sep 9, 2025
f7db807
Merge branch 'main' into fix/refcounting-async-response
drempapis Sep 9, 2025
beb3f23
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
0c7e61d
update code
drempapis Oct 14, 2025
b02ce52
update code
drempapis Oct 14, 2025
966ad7b
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 14, 2025
987434e
[CI] Auto commit changes from spotless
Oct 14, 2025
91eac66
update AsyncSearchTask to fallback from the store if entry available
drempapis Oct 14, 2025
f23f78f
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 14, 2025
dd9567d
remove files
drempapis Oct 14, 2025
c643cda
update code
drempapis Oct 14, 2025
dd09251
Delete System.out from debugging
drempapis Oct 14, 2025
497a736
Add forgotten imports
drempapis Oct 14, 2025
029c0d1
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
4f8e55b
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
b504e2a
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
85ec60f
make the store getResponse async
drempapis Oct 15, 2025
746caa3
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 15, 2025
8758981
[CI] Auto commit changes from spotless
Oct 15, 2025
81a6a55
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
e1d33dc
remove test
drempapis Oct 15, 2025
99610b2
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
2cb90cd
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
a2f7f7a
[CI] Auto commit changes from spotless
Oct 15, 2025
ab343f3
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 16, 2025
aed9d1f
Integration test that validates the behavior of async search status p…
drempapis Oct 16, 2025
b82dbe8
[CI] Auto commit changes from spotless
Oct 16, 2025
a6a45cb
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 16, 2025
07825fd
update after review
drempapis Oct 16, 2025
3412045
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 16, 2025
e4fedfc
update after review
drempapis Oct 16, 2025
c26b15d
apply spotless
drempapis Oct 16, 2025
770a902
[CI] Auto commit changes from spotless
Oct 16, 2025
fc178a9
update
drempapis Oct 16, 2025
2c9d1c7
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 16, 2025
0359f76
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 20, 2025
9fa88b9
update after review
drempapis Oct 21, 2025
286b6a9
[CI] Auto commit changes from spotless
Oct 21, 2025
9b57b99
update after review
drempapis Oct 21, 2025
dab85e4
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 21, 2025
a452322
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 21, 2025
89b3f3f
update after review
drempapis Oct 21, 2025
ee8fbca
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 21, 2025
30fe790
Remove Repeat annotation
drempapis Oct 21, 2025
26f105c
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 21, 2025
bfcd31e
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 22, 2025
547a050
code udpated after review
drempapis Oct 22, 2025
adada6a
[CI] Auto commit changes from spotless
Oct 22, 2025
9ce4cc7
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 3, 2025
634e698
update after review|
drempapis Nov 3, 2025
ce9f54d
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 3, 2025
e552d16
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 4, 2025
2ac0077
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 4, 2025
063b5a8
Add comments for the given choice
drempapis Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134359.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134359
summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async
search
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.search;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

@SuiteScopeTestCase
public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase {
private static String indexName;
private static int numShards;

private static int numKeywords;

@Override
public void setupSuiteScopeCluster() {
indexName = "test-async";
numShards = randomIntBetween(1, 20);
int numDocs = randomIntBetween(100, 1000);
createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build());
numKeywords = randomIntBetween(50, 100);
Set<String> keywordSet = new HashSet<>();
for (int i = 0; i < numKeywords; i++) {
keywordSet.add(randomAlphaOfLengthBetween(10, 20));
}
numKeywords = keywordSet.size();
String[] keywords = keywordSet.toArray(String[]::new);
List<IndexRequestBuilder> reqs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
float metric = randomFloat();
String keyword = keywords[randomIntBetween(0, numKeywords - 1)];
reqs.add(prepareIndex(indexName).setSource("terms", keyword, "metric", metric));
}
indexRandom(true, true, reqs);
}

/**
* This test spins up a set of poller threads that repeatedly call
* {@code _async_search/{id}}. Each poller starts immediately, and once enough
* requests have been issued they signal a latch to indicate the group is "warmed up".
* The test waits on this latch to deterministically ensure pollers are active.
* In parallel, a consumer thread drives the async search to completion using the
* blocking iterator. This coordinated overlap exercises the window where the task
* is closing and some status calls may return {@code 410 GONE}.
*/
public void testConcurrentStatusFetchWhileTaskCloses() throws Exception {
final TimeValue timeout = TimeValue.timeValueSeconds(3);
final String aggName = "terms";
final SearchSourceBuilder source = new SearchSourceBuilder().aggregation(
AggregationBuilders.terms(aggName).field("terms.keyword").size(Math.max(1, numKeywords))
);

final int progressStep = (numShards > 2) ? randomIntBetween(2, numShards) : 2;
try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, 0, progressStep)) {
String id = getAsyncId(it);

PollStats stats = new PollStats();

// Pick a random number of status-poller threads, at least 1, up to (4×numShards)
int pollerThreads = randomIntBetween(1, 4 * numShards);

// Wait for pollers to be active
CountDownLatch warmed = new CountDownLatch(1);

PollerGroup pollers = createPollers(id, pollerThreads, aggName, stats, warmed);

// Wait until pollers are issuing requests (warming period)
assertTrue("pollers did not warm up in time", warmed.await(timeout.millis(), TimeUnit.MILLISECONDS));

// Start consumer on a separate thread and capture errors
var consumerExec = Executors.newSingleThreadExecutor();
AtomicReference<Throwable> consumerError = new AtomicReference<>();
Future<?> consumer = consumerExec.submit(() -> {
try {
consumeAllResponses(it, aggName);
} catch (Throwable t) {
consumerError.set(t);
}
});

// Join consumer & surface errors
try {
consumer.get(timeout.millis(), TimeUnit.MILLISECONDS);

if (consumerError.get() != null) {
fail("consumeAllResponses failed: " + consumerError.get());
}
} catch (TimeoutException e) {
consumer.cancel(true);
fail(e, "Consumer thread did not finish within timeout");
} catch (Exception ignored) {} finally {

try {
pollers.stopAndAwait(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("Interrupted while stopping pollers: " + e.getMessage());
}

consumerExec.shutdown();
try {
consumerExec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}

assertNoWorkerFailures(pollers);
assertStats(stats);
}
}

private void assertNoWorkerFailures(PollerGroup pollers) {
List<Throwable> failures = pollers.getFailures();
assertTrue(
"Unexpected worker failures:\n" + failures.stream().map(ExceptionsHelper::stackTrace).reduce("", (a, b) -> a + "\n---\n" + b),
failures.isEmpty()
);
}

private void assertStats(PollStats stats) {
assertEquals(stats.totalCalls.sum(), stats.runningResponses.sum() + stats.completedResponses.sum());
assertEquals("There should be no exceptions other than GONE", 0, stats.exceptions.sum());
}

private String getAsyncId(SearchResponseIterator it) {
AsyncSearchResponse response = it.next();
try {
assertNotNull(response.getId());
return response.getId();
} finally {
response.decRef();
}
}

private void consumeAllResponses(SearchResponseIterator it, String aggName) throws Exception {
while (it.hasNext()) {
AsyncSearchResponse response = it.next();
try {
if (response.getSearchResponse() != null && response.getSearchResponse().getAggregations() != null) {
assertNotNull(response.getSearchResponse().getAggregations().get(aggName));
}
} finally {
response.decRef();
}
}
}

private PollerGroup createPollers(String id, int threads, String aggName, PollStats stats, CountDownLatch warmed) {
final ExecutorService exec = Executors.newFixedThreadPool(threads);
final List<Future<?>> futures = new ArrayList<>(threads);
final AtomicBoolean running = new AtomicBoolean(true);

for (int i = 0; i < threads; i++) {
futures.add(exec.submit(() -> {
while (running.get()) {
AsyncSearchResponse resp = null;
try {
resp = getAsyncSearch(id);
stats.totalCalls.increment();

// Once enough requests have been sent, consider pollers "warmed".
if (stats.totalCalls.sum() >= threads) {
warmed.countDown();
}

if (resp.isRunning()) {
stats.runningResponses.increment();
} else {
// Success-only assertions: if reported completed, we must have a proper search response
assertNull("Async search reported completed with failure", resp.getFailure());
assertNotNull("Completed async search must carry a SearchResponse", resp.getSearchResponse());
assertNotNull("Completed async search must have aggregations", resp.getSearchResponse().getAggregations());
assertNotNull(
"Completed async search must contain the expected aggregation",
resp.getSearchResponse().getAggregations().get(aggName)
);
stats.completedResponses.increment();
}
} catch (Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ElasticsearchStatusException) {
RestStatus status = ExceptionsHelper.status(cause);
if (status == RestStatus.GONE) {
stats.gone410.increment();
}
} else {
stats.exceptions.increment();
}
} finally {
if (resp != null) {
resp.decRef();
}
}
}
return null;
}));
}
return new PollerGroup(exec, futures, running);
}

static final class PollStats {
final LongAdder totalCalls = new LongAdder();
final LongAdder runningResponses = new LongAdder();
final LongAdder completedResponses = new LongAdder();
final LongAdder exceptions = new LongAdder();
final LongAdder gone410 = new LongAdder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use keep_alive values that prevent us from ever seeing gone? and perhaps add a separate test where the keep_alive is very short and we are expecting gone ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test itself completes quickly, so the default keep_alive is sufficient to cover its execution. Although the minimum allowed keep_alive is 1s, this makes it impractical to simulate expiry within the test; we would need to add significant context and construct a heavy query, which would not be suitable for CI. Instead, a more practical approach is to simulate GONE by explicitly deleting the async search while pollers are hammering its ID. Even then, making this fully deterministic in CI is challenging because the window where GONE can be observed is very narrow.

The class AsyncSearchRefcountAndFallbackTests simulates such scenarios in a deterministic way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we randomise the keep_alive value to sometimes be:

  1. a very high value so gone410 is never possible in that test run (i.e. the test fails if 410 is seen)
  2. a lower value 1-2 seconds so gone410 is possible (i.e. the test does not fail if 410 is visible)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum keep_alive we can configure is 1 second (default), which is already long enough that, under the current test timing, we don’t realistically observe any 410 GONE responses. To simulate expiration behavior, we can extend the lifetime of the pollers after the consumer has complete, for example:

try {
   consumer.get(timeout.millis(), TimeUnit.MILLISECONDS);

    if (consumerError.get() != null) {
       fail("consumeAllResponses failed: " + consumerError.get());
   }

   Tread.sleep(keepAlive.millis() + 500L); // allow the async_search to expire
}

This ensures the async search document is deleted before pollers stop. In practice, though, we’ll usually see ResourceNotFoundException (transport path) rather than 410 GONE (REST path), because the race window where the document still exists yet is no longer retrievable is tiny.

Practically both 410 GONE and ResourceNotFoundException represent the same expired state through different APIs (REST vs transport). Handling both wouldn’t test new logic, it would only duplicate coverage of the same lifecycle behavior, adding complexity without improving the test’s value.

Copy link
Contributor

@andreidan andreidan Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was mostly around the test being prescriptive around what it is indeed expecting. It's currently, I believe, mixing success and some 410 as "everything is cool". I was suggesting we separate the cases where 410s are not even an option.

I appreciate you don't consider this valuable, and I'm happy to go forward as this is now (hence why I approved the PR)

}

static class PollerGroup {
private final ExecutorService exec;
private final List<Future<?>> futures;
// The threads are created and running right away
private final AtomicBoolean running;

private PollerGroup(ExecutorService exec, List<Future<?>> futures, AtomicBoolean running) {
this.exec = exec;
this.futures = futures;
this.running = running;
}

void stopAndAwait(TimeValue timeout) throws InterruptedException {
running.set(false);
exec.shutdown();
if (exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS) == false) {
exec.shutdownNow();
exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS);
}
}

List<Throwable> getFailures() {
List<Throwable> failures = new ArrayList<>();
for (Future<?> f : futures) {
try {
f.get();
} catch (CancellationException ignored) {} catch (ExecutionException ee) {
failures.add(ExceptionsHelper.unwrapCause(ee.getCause()));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
if (failures.isEmpty()) failures.add(ie);
}
}
return failures;
}
}
}
Loading