Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
f4a1a81
AsyncSearch: make MutableSearchResponse ref-counted
drempapis Sep 9, 2025
025e9e3
Update docs/changelog/134359.yaml
drempapis Sep 9, 2025
47f554e
[CI] Auto commit changes from spotless
Sep 9, 2025
4a2cdc0
apply spot
drempapis Sep 9, 2025
e3b7c64
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Sep 9, 2025
f7db807
Merge branch 'main' into fix/refcounting-async-response
drempapis Sep 9, 2025
beb3f23
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
0c7e61d
update code
drempapis Oct 14, 2025
b02ce52
update code
drempapis Oct 14, 2025
966ad7b
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 14, 2025
987434e
[CI] Auto commit changes from spotless
Oct 14, 2025
91eac66
update AsyncSearchTask to fallback from the store if entry available
drempapis Oct 14, 2025
f23f78f
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 14, 2025
dd9567d
remove files
drempapis Oct 14, 2025
c643cda
update code
drempapis Oct 14, 2025
dd09251
Delete System.out from debugging
drempapis Oct 14, 2025
497a736
Add forgotten imports
drempapis Oct 14, 2025
029c0d1
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
4f8e55b
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
b504e2a
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 14, 2025
85ec60f
make the store getResponse async
drempapis Oct 15, 2025
746caa3
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 15, 2025
8758981
[CI] Auto commit changes from spotless
Oct 15, 2025
81a6a55
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
e1d33dc
remove test
drempapis Oct 15, 2025
99610b2
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
2cb90cd
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 15, 2025
a2f7f7a
[CI] Auto commit changes from spotless
Oct 15, 2025
ab343f3
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 16, 2025
aed9d1f
Integration test that validates the behavior of async search status p…
drempapis Oct 16, 2025
b82dbe8
[CI] Auto commit changes from spotless
Oct 16, 2025
a6a45cb
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 16, 2025
07825fd
update after review
drempapis Oct 16, 2025
3412045
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 16, 2025
e4fedfc
update after review
drempapis Oct 16, 2025
c26b15d
apply spotless
drempapis Oct 16, 2025
770a902
[CI] Auto commit changes from spotless
Oct 16, 2025
fc178a9
update
drempapis Oct 16, 2025
2c9d1c7
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 16, 2025
0359f76
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 20, 2025
9fa88b9
update after review
drempapis Oct 21, 2025
286b6a9
[CI] Auto commit changes from spotless
Oct 21, 2025
9b57b99
update after review
drempapis Oct 21, 2025
dab85e4
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 21, 2025
a452322
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 21, 2025
89b3f3f
update after review
drempapis Oct 21, 2025
ee8fbca
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 21, 2025
30fe790
Remove Repeat annotation
drempapis Oct 21, 2025
26f105c
Merge branch 'fix/refcounting-async-response' of github.com:drempapis…
drempapis Oct 21, 2025
bfcd31e
Merge branch 'main' into fix/refcounting-async-response
drempapis Oct 22, 2025
547a050
code udpated after review
drempapis Oct 22, 2025
adada6a
[CI] Auto commit changes from spotless
Oct 22, 2025
9ce4cc7
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 3, 2025
634e698
update after review|
drempapis Nov 3, 2025
ce9f54d
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 3, 2025
e552d16
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 4, 2025
2ac0077
Merge branch 'main' into fix/refcounting-async-response
drempapis Nov 4, 2025
063b5a8
Add comments for the given choice
drempapis Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134359.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134359
summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async
search
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.junit.After;
import org.junit.Before;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class MutableSearchResponseRefCountingTests extends ESTestCase {

private TestThreadPool threadPool;
private NoOpClient client;

@Before
public void setup() {
this.threadPool = new TestThreadPool(getTestName());
this.client = new NoOpClient(threadPool);
}

@After
public void cleanup() throws Exception {
terminate(threadPool);
}

public void testBuildSucceedsIfAnotherThreadHoldsRef() {
final int totalShards = 1;
final int skippedShards = 0;

// Build a SearchResponse (sr refCount -> 1)
SearchResponse searchResponse = createSearchResponse(totalShards, totalShards, skippedShards);

// Take a ref - (msr refCount -> 1, sr refCount -> 2)
MutableSearchResponse msr = new MutableSearchResponse(threadPool.getThreadContext());
msr.updateShardsAndClusters(totalShards, skippedShards, null);
msr.updateFinalResponse(searchResponse, false);

searchResponse.decRef(); // sr refCount -> 1

// Simulate another thread : take a resource (msr refCount -> 2)
msr.incRef();
// close resource (msr refCount -> 1) -> closeInternal not called yet
msr.decRef();

// Build a response
AsyncSearchResponse resp = msr.toAsyncSearchResponse(
createAsyncSearchTask(),
System.currentTimeMillis() + 60_000, /*restoreResponseHeaders*/
false
);
try {
assertNotNull("Expect SearchResponse when a live ref prevents close", resp.getSearchResponse());
assertNull("No failure expected while ref is held", resp.getFailure());
assertFalse("Response should not be marked running", resp.isRunning());
} finally {
resp.decRef();
}

// Release msr (msr refCount -> 0, sr refCount -> 0) -> now calling closeInternal
msr.decRef();
}

public void testGetResponseAfterCloseReturnsGone() throws Exception {
final int totalShards = 1;
final int skippedShards = 0;

// Build a SearchResponse (sr refCount -> 1)
SearchResponse searchResponse = createSearchResponse(totalShards, totalShards, skippedShards);

// Create an AsyncSearchTask
AsyncSearchTask task = createAsyncSearchTask();

// Get response instance and method from task
Field f = AsyncSearchTask.class.getDeclaredField("searchResponse");
f.setAccessible(true);
Method m = AsyncSearchTask.class.getDeclaredMethod("getResponseWithHeaders");
m.setAccessible(true);

// Take a ref - (msr refCount -> 1, sr refCount -> 2)
MutableSearchResponse msr = (MutableSearchResponse) f.get(task);
msr.updateShardsAndClusters(totalShards, skippedShards, null);
msr.updateFinalResponse(searchResponse, false);

searchResponse.decRef(); // sr ref -> 1
msr.decRef(); // msr ref -> 0 -> closeInternal() -> sr ref -> 0

// Invoke getResponseWithHeaders and expect GONE exception
InvocationTargetException ite = expectThrows(InvocationTargetException.class, () -> {
AsyncSearchResponse resp = (AsyncSearchResponse) m.invoke(task);
if (resp != null) {
resp.decRef();
}
});

Throwable cause = ExceptionsHelper.unwrapCause(ite.getCause());
assertThat(cause, instanceOf(ElasticsearchStatusException.class));
assertThat(ExceptionsHelper.status(cause), is(RestStatus.GONE));
}

private AsyncSearchTask createAsyncSearchTask() {
return new AsyncSearchTask(
1L,
"search",
"indices:data/read/search",
TaskId.EMPTY_TASK_ID,
() -> "debug",
TimeValue.timeValueMinutes(1),
Map.of(),
Map.of(),
new AsyncExecutionId("debug", new TaskId("node", 1L)),
client,
threadPool,
isCancelled -> () -> new AggregationReduceContext.ForFinal(null, null, null, null, null, PipelineAggregator.PipelineTree.EMPTY)
);
}

private SearchResponse createSearchResponse(int totalShards, int successfulShards, int skippedShards) {
return new SearchResponse(
SearchHits.empty(Lucene.TOTAL_HITS_GREATER_OR_EQUAL_TO_ZERO, Float.NaN),
null,
null,
false,
false,
null,
0,
null,
totalShards,
successfulShards,
skippedShards,
1L,
ShardSearchFailure.EMPTY_ARRAY,
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -344,6 +344,9 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
assert mutableSearchResponse != null;
checkCancellation();
AsyncSearchResponse asyncSearchResponse;
if (mutableSearchResponse.tryIncRef() == false) {
throw new ElasticsearchStatusException("async-search result, no longer available", RestStatus.GONE);
Copy link
Contributor

@andreidan andreidan Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the request will still fail, just a different message? (i.e. results in Discover will still not be displayed right?)

In the race condition you describe we'd have the results on this. Should we not return them from disk instead of failing here?

Or with the new response, do clients know to retry automatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @andreidan, for reviewing this.

I'm not super sure the proposed solution fixes the problem just yet

The problem is a race: one thread (A) is building a response from MutableSearchResponse while another (B) closes the task and drops the last ref, so the reader (A) hits already closed, can’t increment ref count when calling the getResponse().

The idea here is to allow a thread that is already building a response to complete and return, while preventing subsequent calls from accessing the resource once it has been closed/released.

IIUC the request will still fail, just a different message?

With the code as written, the focus is on mitigating the race. If tryIncRef() fails, we throw GONE instead of hitting the assertion. The request still fails, but it does so in a controlled, user-facing way rather than with an assertion error.

In the race condition you describe, we'd have the results on this. Should we not return them from disk instead of failing here?

That's a good point. To guarantee safety, if tryIncRef() fails, we stop touching the in-memory finalResponse.

We may add a fallback. If the task is completed and the container is already closed, we should load and return the stored async-search result from disk, and only return GONE when the result is not found on disk.

  • tryIncRef succeeds -> build from in-memory (fast path) -> 200.

  • tryIncRef fails and stored doc exists -> load from .async-search -> 200.

  • tryIncRef fails and stored doc missing (expired via keep_alive or deleted), -> GONE.

}
try {
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, restoreResponseHeaders);
} catch (Exception e) {
Expand All @@ -353,6 +356,8 @@ private AsyncSearchResponse getResponse(boolean restoreResponseHeaders) {
e
);
asyncSearchResponse = mutableSearchResponse.toAsyncSearchResponse(this, expirationTimeMillis, exception);
} finally {
mutableSearchResponse.decRef();
}
return asyncSearchResponse;
}
Expand Down Expand Up @@ -381,7 +386,7 @@ public static AsyncStatusResponse getStatusResponse(AsyncSearchTask asyncTask) {

@Override
public void close() {
Releasables.close(searchResponse);
searchResponse.decRef();
}

class Listener extends SearchProgressActionListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand All @@ -38,7 +38,7 @@
* creating an async response concurrently. This limits the number of final reduction that can
* run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
*/
class MutableSearchResponse implements Releasable {
class MutableSearchResponse extends AbstractRefCounted {
private int totalShards;
private int skippedShards;
private Clusters clusters;
Expand Down Expand Up @@ -85,6 +85,7 @@ class MutableSearchResponse implements Releasable {
* @param threadContext The thread context to retrieve the final response headers.
*/
MutableSearchResponse(ThreadContext threadContext) {
super();
this.isPartial = true;
this.threadContext = threadContext;
this.totalHits = Lucene.TOTAL_HITS_GREATER_OR_EQUAL_TO_ZERO;
Expand Down Expand Up @@ -487,14 +488,17 @@ private String getShardsInResponseMismatchInfo(SearchResponse response, boolean
}

@Override
public synchronized void close() {
protected synchronized void closeInternal() {
if (finalResponse != null) {
finalResponse.decRef();
finalResponse = null;
}
if (clusterResponses != null) {
for (SearchResponse clusterResponse : clusterResponses) {
clusterResponse.decRef();
}
clusterResponses.clear();
clusterResponses = null;
}
}
}