Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
73a174f
Improve PIT context relocation
cbuescher Sep 22, 2025
730efd4
Update docs/changelog/135231.yaml
cbuescher Sep 22, 2025
ad4219c
Add keepalive to second and third query
cbuescher Sep 25, 2025
fb526d4
Merge branch 'main' into improve-pit-relocation
cbuescher Sep 25, 2025
6bcb45f
Adressing review comments
cbuescher Sep 26, 2025
841c5b3
Rework failure usage in updating PIT id
cbuescher Oct 8, 2025
b76a959
[CI] Auto commit changes from spotless
Oct 8, 2025
237ea4c
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 9, 2025
9085194
Adding feature flag
cbuescher Oct 9, 2025
d7166cd
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 13, 2025
cbab660
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 16, 2025
df9d875
Addressing review comments
cbuescher Oct 17, 2025
613cf73
Fix AbstractSearchAsyncActionTests.testMaybeReEncode
cbuescher Oct 19, 2025
0528e1d
Add new ActiveReaders class encapsulating active readers and relocati…
cbuescher Oct 19, 2025
07fad27
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 21, 2025
76340ca
Minor change in node lookup
cbuescher Oct 21, 2025
bb0f265
Update PIT ids only when we saw SearchContextMissingException as reas…
cbuescher Oct 21, 2025
c3d1b65
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 21, 2025
033e445
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 22, 2025
5bf1853
minor logging change
cbuescher Oct 22, 2025
cc3c6c5
Modify updating PIT id conditions
cbuescher Oct 22, 2025
67872f6
Adressing review comments
cbuescher Oct 23, 2025
308c366
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 23, 2025
5011068
Correct condition for updating pit
cbuescher Oct 23, 2025
b64a366
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 27, 2025
8ea193d
Going back to always close contexts if nodeId changed
cbuescher Oct 27, 2025
df1b1c7
Close context when concurrently retrying same id twice
cbuescher Oct 27, 2025
9f5a1ad
small cleanup
cbuescher Oct 28, 2025
bee8400
more logger cleanups
cbuescher Oct 28, 2025
9fe9542
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 28, 2025
3dca6e0
Separate normal and relocated putContext code paths
cbuescher Oct 29, 2025
0a06acf
minor cleanups
cbuescher Oct 29, 2025
1a302ac
Exemt context from current session from relocation handling
cbuescher Oct 30, 2025
b9f6f3d
Add RetrySearchIntegTests test for removed PIT context
cbuescher Oct 30, 2025
7a7aeef
Adressing review comments
cbuescher Oct 31, 2025
0edfa87
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 31, 2025
65747c4
Adapt changelog
cbuescher Nov 1, 2025
d2618d8
Merge branch 'main' into improve-pit-relocation
cbuescher Nov 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135231.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135231
summary: Improve PIT context relocation
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ParsedScrollId;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
} finally {
respFromProdIndex.decRef();
}
SearchPhaseExecutionException error = expectThrows(
SearchPhaseExecutionException.class,
client().prepareSearchScroll(respFromDemoIndexScrollId)
SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId);
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder);
assertEquals(1, error.shardFailures().length);
ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId();
ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId();
assertThat(
error.shardFailures()[0].getCause().getMessage(),
containsString("No search context found for id [" + shardSearchContextId + "]")
);
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
}
client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand All @@ -39,8 +41,10 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -93,6 +97,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;
private final TransportVersion mintransportVersion;

// protected for tests
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
Expand Down Expand Up @@ -149,6 +154,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.nodeIdToConnection = nodeIdToConnection;
this.concreteIndexBoosts = concreteIndexBoosts;
this.clusterStateVersion = clusterState.version();
this.mintransportVersion = clusterState.getMinTransportVersion();
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
// register the release of the query consumer to free up the circuit breaker memory
Expand Down Expand Up @@ -416,6 +422,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
onShardGroupFailure(shardIndex, shard, e);
}
if (lastShard == false) {
logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard);
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
// count down outstanding shards, we're done with this shard as there's no more copies to try
Expand Down Expand Up @@ -607,10 +614,70 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
}

protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be async so we wait for old contexts to close before responding. We've seen ML not wait for closing PIT and overload the cluster. But we can do this in a follow-up since this change is behind a feature flag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered waiting here but that would potentially slow down the search that is triggering the id update though, if I understand correctly. I assumed that the fire-and-forget approach should be okay. I'm still not 100% clear under which circumstance we'd need to close a PIT context that is now on a new node here because the reason we are opening a part of the PIT on a new node is likely caused by that context missing in its original location. So nothing to clean up? Or what am I missing here?

We've seen ML not wait for closing PIT and overload the cluster.

Do you remember the details for that scenario? Both for checking how this relates to the situation here and for my own learning to avoid such scenarios in future work.

Copy link
Member

@dnhatn dnhatn Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shard-level request can fail for various reasons, such as a circuit breaking exception on a data node. In such cases, we try another node, and if successful, we replace the old context with the new one here. Alternatively, we could avoid replacing the search context, which means we do not need to close it. We can maintain a list of failure types (SearchContextMissingException or ...) that indicate the search contexts are no longer available and should be replaced. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I like the idea of conditioning the Id rewrite on failure types, however when looking at how to do this here in AbstractSearchAsyncAction I noticed that we clear earlier shard failures when another shard responds with a result. I'm not sure yet how to make this possible here or if there are better locations in the code to do so. Need to do some digging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a different solution to this than we talked about directly in AbstractSearchAsyncAction without having to modify anything in SearchPhaseResult. Let me know if this looks okay to you, otherwise I'll change.

var source = request.source();
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
? source.pointInTimeBuilder().getEncodedId()
: null;
SearchSourceBuilder source = request.source();
// only (re-)build a search context id if we have a point in time
if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) {
// we want to change node ids in the PIT id if any shards and its PIT context have moved
return maybeReEncodeNodeIds(
source.pointInTimeBuilder(),
results.getAtomicArray().asList(),
failures,
namedWriteableRegistry,
mintransportVersion
);
} else {
return null;
}
}

static <Result extends SearchPhaseResult> BytesReference maybeReEncodeNodeIds(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I haven't looked into how we handle partial results with re-encoding. Have you considered this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you are referring to the functionality added with #111516? If I understand that PR correctly we will have SearchContextIdForNode entries in the PIT with a "null" node entry. I think in that case we won't add that shard to the shard iterators of any subsequent search, so we won't get a Result for that shard. That is one reason why I added copying PIT id entries for everything that has no Result from the old to the new encoded ID without change.
Is that what you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked again at how we handle partial results. The way I see it, when opening the PIT we can tolerate partial results, i.e. failures from certain shards when they are not available. The result will be an entry in the PIT id that has a ShardId but a ShardContextIdForNode with empty node/contextId
The way I see it, we should not change any of these entries, which is what is already happening in this method.
In addition the re-encoding step here doesn't change any entries for shards that have failed in the last search. They shouldn't be included in the "results" list and for that reason not update the related part in the updated PIT id. In cases where these failures are temporary, subsequent searches with the updated ID will try to hit the "old" shard context locations, if any of these can be re-tried we will updated that part of the PIT in a later call.

PointInTimeBuilder originalPit,
List<Result> results,
ShardSearchFailure[] failures,
NamedWriteableRegistry namedWriteableRegistry,
TransportVersion mintransportVersion
) {
SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry);
boolean idChanged = false;
Map<ShardId, SearchContextIdForNode> updatedShardMap = null; // only create this if we detect a change
for (Result result : results) {
SearchShardTarget searchShardTarget = result.getSearchShardTarget();
ShardId shardId = searchShardTarget.getShardId();
SearchContextIdForNode originalShard = original.shards().get(shardId);
if (originalShard != null
&& Objects.equals(originalShard.getClusterAlias(), searchShardTarget.getClusterAlias())
&& Objects.equals(originalShard.getSearchContextId(), result.getContextId())) {
// result shard and context id match the originalShard one, check if the node is different and replace if so
String originalNode = originalShard.getNode();
if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) {
// the target node for this shard entry in the PIT has changed, we need to update it
idChanged = true;
if (updatedShardMap == null) {
updatedShardMap = new HashMap<>(original.shards().size());
}
updatedShardMap.put(
shardId,
new SearchContextIdForNode(
originalShard.getClusterAlias(),
searchShardTarget.getNodeId(),
originalShard.getSearchContextId()
)
);
}
}
}
if (idChanged) {
// we also need to add shard that are not in the results for some reason (e.g. query rewrote to match none) but that
// were part of the original PIT
for (ShardId shardId : original.shards().keySet()) {
if (updatedShardMap.containsKey(shardId) == false) {
updatedShardMap.put(shardId, original.shards().get(shardId));
}
}
return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, failures);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question is how to clean up search contexts from old PITs. For example, when a shard-level request fails, we try to execute it on another copy. In these cases, we re-encode the PIT. If users close the new PIT, the old search context won't be closed. This is not an issue with stateful, but it can be a problem with serverless. Should we close the old search contexts here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean shard-level request that are not related to the search context Missing? I didn't consider this yet, good point I think. I was assuming that if get a result from another node than the original one that would always mean the old context is gone, but you are right we might retry elsewhere for other reasons.
Would you suggest something like the fire-and-forget approach we use e.g. here in TransportSearchAction from this location? I assume a close request is relatively cheap even id the old context doesn't exist any longer and we can treat this as a best-effort attempt. If this fails at some point the context reaper process should clean other stuff up thats over the keepalive limit, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just see TransportSearchAction goes to all shards, it would probably be something more selective to only the shards that had changes in location and use ClearScrollController#closeContexts directly.

} else {
return originalPit.getEncodedId();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -30,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,6 +62,26 @@ public static BytesReference encode(
Map<String, AliasFilter> aliasFilter,
TransportVersion version,
ShardSearchFailure[] shardFailures
) {
Map<ShardId, SearchContextIdForNode> shards = searchPhaseResults.stream()
.collect(
Collectors.toMap(
r -> r.getSearchShardTarget().getShardId(),
r -> new SearchContextIdForNode(
r.getSearchShardTarget().getClusterAlias(),
r.getSearchShardTarget().getNodeId(),
r.getContextId()
)
)
);
return encode(shards, aliasFilter, version, shardFailures);
}

static BytesReference encode(
Map<ShardId, SearchContextIdForNode> shards,
Map<String, AliasFilter> aliasFilter,
TransportVersion version,
ShardSearchFailure[] shardFailures
) {
assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.V_8_16_0)
: "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version ["
Expand All @@ -71,12 +91,12 @@ public static BytesReference encode(
out.setTransportVersion(version);
TransportVersion.writeVersion(version, out);
boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0);
int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0);
int shardSize = shards.size() + (allowNullContextId ? shardFailures.length : 0);
out.writeVInt(shardSize);
for (var searchResult : searchPhaseResults) {
final SearchShardTarget target = searchResult.getSearchShardTarget();
target.getShardId().writeTo(out);
new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out);
for (ShardId shardId : shards.keySet()) {
shardId.writeTo(out);
SearchContextIdForNode searchContextIdForNode = shards.get(shardId);
searchContextIdForNode.writeTo(out);
}
if (allowNullContextId) {
for (var failure : shardFailures) {
Expand Down Expand Up @@ -142,4 +162,23 @@ public String[] getActualIndices() {
}
return indices.toArray(String[]::new);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
SearchContextId that = (SearchContextId) o;
return Objects.equals(shards, that.shards)
&& Objects.equals(aliasFilter, that.aliasFilter)
&& Objects.equals(contextIds, that.contextIds);
}

@Override
public int hashCode() {
return Objects.hash(shards, aliasFilter, contextIds);
}

@Override
public String toString() {
return "SearchContextId{" + "shards=" + shards + ", aliasFilter=" + aliasFilter + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.search.internal.ShardSearchContextId;

import java.io.IOException;
import java.util.Objects;

public final class SearchContextIdForNode implements Writeable {
private final String node;
Expand Down Expand Up @@ -103,4 +104,18 @@ public String toString() {
+ '\''
+ '}';
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
SearchContextIdForNode that = (SearchContextIdForNode) o;
return Objects.equals(node, that.node)
&& Objects.equals(searchContextId, that.searchContextId)
&& Objects.equals(clusterAlias, that.clusterAlias);
}

@Override
public int hashCode() {
return Objects.hash(node, searchContextId, clusterAlias);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ public void writeTo(StreamOutput out) throws IOException {

public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
final TransportRequestHandler<ScrollFreeContextRequest> freeContextHandler = (request, channel, task) -> {
logger.trace("releasing search context [{}]", request.id());
boolean freed = searchService.freeReaderContext(request.id());
logger.trace("releasing search context [{}], [{}]", request.id(), freed);
channel.sendResponse(SearchFreeContextResponse.of(freed));
};
final Executor freeContextExecutor = buildFreeContextExecutor(transportService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,12 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
// Prefer executing shard requests on nodes that are part of PIT first.
if (projectState.cluster().nodes().nodeExists(perNode.getNode())) {
targetNodes.add(perNode.getNode());
} else {
logger.debug(
"Node [{}] referenced in PIT context id [{}] no longer exists.",
perNode.getNode(),
perNode.getSearchContextId()
);
}
ShardSearchContextId shardSearchContextId = perNode.getSearchContextId();
if (shardSearchContextId.isRetryable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException {
private final ShardSearchContextId contextId;

public SearchContextMissingException(ShardSearchContextId contextId) {
super("No search context found for id [" + contextId.getId() + "]");
super("No search context found for id [" + contextId + "]");
this.contextId = contextId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public abstract class SearchPhaseResult extends TransportResponse {

private SearchShardTarget searchShardTarget;
protected SearchShardTarget searchShardTarget;
private int shardIndex = -1;
protected ShardSearchContextId contextId;
private ShardSearchRequest shardSearchRequest;
Expand Down
Loading