From 73a174f00d2fd8239267dbf5dfa5e4b7f0d5a9b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Mon, 22 Sep 2025 20:59:30 +0200 Subject: [PATCH 1/7] Improve PIT context relocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change improves point-in-time (PIT) context relocations in cases where we loose a PIT due to e.g. node shutdowns or restarts. We are already able to retry shards where open PIT contexts have been lost in the case where re-create identical reader contests on wither the same or a different node for read-only and frozen indices (via FrozenEngine / ReadOnlyEngine). However, currently we are re-creating these contexts with every subsequent search because we don’t store the recreated context in a similar way as when we are opening a new PIT. This PR avoids this extra work. --- .../search/scroll/SearchScrollIT.java | 18 ++- .../search/AbstractSearchAsyncAction.java | 75 ++++++++++- .../action/search/SearchContextId.java | 51 +++++++- .../action/search/SearchContextIdForNode.java | 15 +++ .../action/search/SearchTransportService.java | 2 +- .../action/search/TransportSearchAction.java | 6 + .../search/SearchContextMissingException.java | 2 +- .../search/SearchPhaseResult.java | 2 +- .../elasticsearch/search/SearchService.java | 62 ++++++--- .../ElasticsearchExceptionTests.java | 11 +- .../AbstractSearchAsyncActionTests.java | 123 ++++++++++++++++++ .../search/MockSearchService.java | 10 +- .../RetrySearchIntegTests.java | 57 +++++++- 13 files changed, 382 insertions(+), 52 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java index a54e19b839ad3..fe994abc8764b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java @@ -11,11 +11,12 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.ParsedScrollId; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; @@ -28,6 +29,7 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; @@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { } finally { respFromProdIndex.decRef(); } - SearchPhaseExecutionException error = expectThrows( - SearchPhaseExecutionException.class, - client().prepareSearchScroll(respFromDemoIndexScrollId) + SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId); + SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder); + assertEquals(1, error.shardFailures().length); + ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId(); + ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId(); + assertThat( + error.shardFailures()[0].getCause().getMessage(), + containsString("No search context found for id [" + shardSearchContextId + "]") ); - for (ShardSearchFailure shardSearchFailure : error.shardFailures()) { - assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]")); - } client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef(); } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..6036bec3b8bac 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -31,6 +32,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -39,8 +41,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -93,6 +97,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + private final TransportVersion mintransportVersion; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -149,6 +154,7 @@ abstract class AbstractSearchAsyncAction exten this.nodeIdToConnection = nodeIdToConnection; this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); + this.mintransportVersion = clusterState.getMinTransportVersion(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -416,6 +422,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar onShardGroupFailure(shardIndex, shard, e); } if (lastShard == false) { + logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard); performPhaseOnShard(shardIndex, shardIt, nextShard); } else { // count down outstanding shards, we're done with this shard as there's no more copies to try @@ -607,10 +614,70 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At } protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { - var source = request.source(); - return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false - ? source.pointInTimeBuilder().getEncodedId() - : null; + SearchSourceBuilder source = request.source(); + // only (re-)build a search context id if we have a point in time + if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) { + // we want to change node ids in the PIT id if any shards and its PIT context have moved + return maybeReEncodeNodeIds( + source.pointInTimeBuilder(), + results.getAtomicArray().asList(), + failures, + namedWriteableRegistry, + mintransportVersion + ); + } else { + return null; + } + } + + static BytesReference maybeReEncodeNodeIds( + PointInTimeBuilder originalPit, + List results, + ShardSearchFailure[] failures, + NamedWriteableRegistry namedWriteableRegistry, + TransportVersion mintransportVersion + ) { + SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry); + boolean idChanged = false; + Map updatedShardMap = null; // only create this if we detect a change + for (Result result : results) { + SearchShardTarget searchShardTarget = result.getSearchShardTarget(); + ShardId shardId = searchShardTarget.getShardId(); + SearchContextIdForNode originalShard = original.shards().get(shardId); + if (originalShard != null + && Objects.equals(originalShard.getClusterAlias(), searchShardTarget.getClusterAlias()) + && Objects.equals(originalShard.getSearchContextId(), result.getContextId())) { + // result shard and context id match the originalShard one, check if the node is different and replace if so + String originalNode = originalShard.getNode(); + if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) { + // the target node for this shard entry in the PIT has changed, we need to update it + idChanged = true; + if (updatedShardMap == null) { + updatedShardMap = new HashMap<>(original.shards().size()); + } + updatedShardMap.put( + shardId, + new SearchContextIdForNode( + originalShard.getClusterAlias(), + searchShardTarget.getNodeId(), + originalShard.getSearchContextId() + ) + ); + } + } + } + if (idChanged) { + // we also need to add shard that are not in the results for some reason (e.g. query rewrote to match none) but that + // were part of the original PIT + for (ShardId shardId : original.shards().keySet()) { + if (updatedShardMap.containsKey(shardId) == false) { + updatedShardMap.put(shardId, original.shards().get(shardId)); + } + } + return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, failures); + } else { + return originalPit.getEncodedId(); + } } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index c2f1510341fb0..6173798660c9c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.transport.RemoteClusterAware; @@ -30,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -62,6 +62,26 @@ public static BytesReference encode( Map aliasFilter, TransportVersion version, ShardSearchFailure[] shardFailures + ) { + Map shards = searchPhaseResults.stream() + .collect( + Collectors.toMap( + r -> r.getSearchShardTarget().getShardId(), + r -> new SearchContextIdForNode( + r.getSearchShardTarget().getClusterAlias(), + r.getSearchShardTarget().getNodeId(), + r.getContextId() + ) + ) + ); + return encode(shards, aliasFilter, version, shardFailures); + } + + static BytesReference encode( + Map shards, + Map aliasFilter, + TransportVersion version, + ShardSearchFailure[] shardFailures ) { assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.V_8_16_0) : "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version [" @@ -71,12 +91,12 @@ public static BytesReference encode( out.setTransportVersion(version); TransportVersion.writeVersion(version, out); boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0); - int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0); + int shardSize = shards.size() + (allowNullContextId ? shardFailures.length : 0); out.writeVInt(shardSize); - for (var searchResult : searchPhaseResults) { - final SearchShardTarget target = searchResult.getSearchShardTarget(); - target.getShardId().writeTo(out); - new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out); + for (ShardId shardId : shards.keySet()) { + shardId.writeTo(out); + SearchContextIdForNode searchContextIdForNode = shards.get(shardId); + searchContextIdForNode.writeTo(out); } if (allowNullContextId) { for (var failure : shardFailures) { @@ -142,4 +162,23 @@ public String[] getActualIndices() { } return indices.toArray(String[]::new); } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextId that = (SearchContextId) o; + return Objects.equals(shards, that.shards) + && Objects.equals(aliasFilter, that.aliasFilter) + && Objects.equals(contextIds, that.contextIds); + } + + @Override + public int hashCode() { + return Objects.hash(shards, aliasFilter, contextIds); + } + + @Override + public String toString() { + return "SearchContextId{" + "shards=" + shards + ", aliasFilter=" + aliasFilter + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index f91a9d09f4bb4..f68bd1ea52cb8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -17,6 +17,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId; import java.io.IOException; +import java.util.Objects; public final class SearchContextIdForNode implements Writeable { private final String node; @@ -103,4 +104,18 @@ public String toString() { + '\'' + '}'; } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextIdForNode that = (SearchContextIdForNode) o; + return Objects.equals(node, that.node) + && Objects.equals(searchContextId, that.searchContextId) + && Objects.equals(clusterAlias, that.clusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(node, searchContextId, clusterAlias); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..fb2235cb48664 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -386,8 +386,8 @@ public void writeTo(StreamOutput out) throws IOException { public static void registerRequestHandler(TransportService transportService, SearchService searchService) { final TransportRequestHandler freeContextHandler = (request, channel, task) -> { - logger.trace("releasing search context [{}]", request.id()); boolean freed = searchService.freeReaderContext(request.id()); + logger.trace("releasing search context [{}], [{}]", request.id(), freed); channel.sendResponse(SearchFreeContextResponse.of(freed)); }; final Executor freeContextExecutor = buildFreeContextExecutor(transportService); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 6366916c0a64f..b4ac2ffd50218 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1946,6 +1946,12 @@ static List getLocalShardsIteratorFromPointInTime( // Prefer executing shard requests on nodes that are part of PIT first. if (projectState.cluster().nodes().nodeExists(perNode.getNode())) { targetNodes.add(perNode.getNode()); + } else { + logger.debug( + "Node [{}] referenced in PIT context id [{}] no longer exists.", + perNode.getNode(), + perNode.getSearchContextId() + ); } if (perNode.getSearchContextId().getSearcherId() != null) { for (ShardRouting shard : shards) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java index 3c75cf0e87ff1..6c1edc1b221be 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java +++ b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java @@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException { private final ShardSearchContextId contextId; public SearchContextMissingException(ShardSearchContextId contextId) { - super("No search context found for id [" + contextId.getId() + "]"); + super("No search context found for id [" + contextId + "]"); this.contextId = contextId; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index eed80bff2e9bb..9576e65aab28e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -30,7 +30,7 @@ */ public abstract class SearchPhaseResult extends TransportResponse { - private SearchShardTarget searchShardTarget; + protected SearchShardTarget searchShardTarget; private int shardIndex = -1; protected ShardSearchContextId contextId; private ShardSearchRequest shardSearchRequest; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0ce8c20fff943..d4065ad1b461a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -361,7 +361,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicLong idGenerator = new AtomicLong(); - private final Map activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map activeReaders = ConcurrentCollections + .newConcurrentMapWithAggressiveConcurrency(); private final MultiBucketConsumerService multiBucketConsumerService; @@ -540,7 +541,7 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings } protected void putReaderContext(ReaderContext context) { - final long id = context.id().getId(); + final ShardSearchContextId id = context.id(); final ReaderContext previous = activeReaders.put(id, context); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. @@ -552,7 +553,7 @@ protected void putReaderContext(ReaderContext context) { } } - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ShardSearchContextId id) { if (logger.isTraceEnabled()) { logger.trace("removing reader context [{}]", id); } @@ -869,7 +870,7 @@ static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsE private IndexShard getShard(ShardSearchRequest request) { final ShardSearchContextId contextId = request.readerId(); if (contextId != null && sessionId.equals(contextId.getSessionId())) { - final ReaderContext readerContext = activeReaders.get(contextId.getId()); + final ReaderContext readerContext = activeReaders.get(contextId); if (readerContext != null) { return readerContext.indexShard(); } @@ -1247,10 +1248,7 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } - if (sessionId.equals(id.getSessionId()) == false) { - throw new SearchContextMissingException(id); - } - final ReaderContext reader = activeReaders.get(id.getId()); + final ReaderContext reader = activeReaders.get(id); if (reader == null) { throw new SearchContextMissingException(id); } @@ -1264,14 +1262,19 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques } final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { - if (request.readerId() != null) { + ShardSearchContextId contextId = request.readerId(); + final long keepAliveInMillis = getKeepAlive(request); + if (contextId != null) { try { - return findReaderContext(request.readerId(), request); + return findReaderContext(contextId, request); } catch (SearchContextMissingException e) { - final String searcherId = request.readerId().getSearcherId(); + final String searcherId = contextId.getSearcherId(); if (searcherId == null) { throw e; } + // We retry creating a ReaderContext on this node if we can get a searcher with same searcher id as in the original + // PIT context. This is currently possible for e.g. ReadOnlyEngine and its child FrozenEngine where a commitId is + // calculated from the ids of the underlying segments of an index commit final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(); @@ -1279,10 +1282,20 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { searcherSupplier.close(); throw e; } - return createAndPutReaderContext(request, indexService, shard, searcherSupplier, defaultKeepAlive); + // we are using a PIT here so set singleSession to false to prevent clearing after the search finishes + ReaderContext readerContext = createAndPutReaderContext( + contextId, + request, + indexService, + shard, + searcherSupplier, + false, + keepAliveInMillis + ); + logger.debug("Recreated reader context [{}]", readerContext.id()); + return readerContext; } } - final long keepAliveInMillis = getKeepAlive(request); final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis); @@ -1294,11 +1307,23 @@ final ReaderContext createAndPutReaderContext( IndexShard shard, Engine.SearcherSupplier reader, long keepAliveInMillis + ) { + final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); + return createAndPutReaderContext(id, request, indexService, shard, reader, true, keepAliveInMillis); + } + + private ReaderContext createAndPutReaderContext( + ShardSearchContextId id, + ShardSearchRequest request, + IndexService indexService, + IndexShard shard, + Engine.SearcherSupplier reader, + boolean singleSession, + long keepAliveInMillis ) { ReaderContext readerContext = null; Releasable decreaseScrollContexts = null; try { - final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); if (request.scroll() != null) { decreaseScrollContexts = openScrollContexts::decrementAndGet; if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { @@ -1308,7 +1333,7 @@ final ReaderContext createAndPutReaderContext( readerContext.addOnClose(decreaseScrollContexts); decreaseScrollContexts = null; } else { - readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true); + readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, singleSession); } reader = null; final ReaderContext finalReaderContext = readerContext; @@ -1474,12 +1499,9 @@ private void freeAllContextsForShard(ShardId shardId) { public boolean freeReaderContext(ShardSearchContextId contextId) { logger.trace("freeing reader context [{}]", contextId); - if (sessionId.equals(contextId.getSessionId())) { - try (ReaderContext context = removeReaderContext(contextId.getId())) { - return context != null; - } + try (ReaderContext context = removeReaderContext(contextId)) { + return context != null; } - return false; } public void freeAllScrollContexts() { diff --git a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index 2741ac37ce519..dc239a7526bc5 100644 --- a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -1258,6 +1258,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { DiscoveryNode node = DiscoveryNodeUtils.create("node_g"); failureCause = new NodeClosedException(node); failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause); + ShardSearchContextId shardSearchContextId = new ShardSearchContextId(UUIDs.randomBase64UUID(), 0, null); ShardSearchFailure[] shardFailures = new ShardSearchFailure[] { new ShardSearchFailure( new ParsingException(0, 0, "Parsing g", null), @@ -1267,10 +1268,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { new RepositoryException("repository_g", "Repo"), new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null) ), - new ShardSearchFailure( - new SearchContextMissingException(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L)), - null - ) }; + new ShardSearchFailure(new SearchContextMissingException(shardSearchContextId), null) }; failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures); expectedCause = new ElasticsearchException( "Elasticsearch exception [type=node_closed_exception, " + "reason=node closed " + node + "]" @@ -1293,7 +1291,10 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { ); expected.addSuppressed( new ElasticsearchException( - "Elasticsearch exception [type=search_context_missing_exception, " + "reason=No search context found for id [0]]" + "Elasticsearch exception [type=search_context_missing_exception, " + + "reason=No search context found for id [" + + shardSearchContextId + + "]]" ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..26612e17d08fb 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -9,30 +9,38 @@ package org.elasticsearch.action.search; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.transport.Transport; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -242,6 +250,116 @@ public SearchShardTarget getSearchShardTarget() { assertEquals(0, searchPhaseExecutionException.getSuppressed().length); } + public void testMaybeReEncode() { + String[] nodeIds = new String[] { "node1", "node2", "node3" }; + AtomicInteger idGenerator = new AtomicInteger(0); + Map originalShardIdMap = randomMap( + 10, + 20, + () -> new Tuple<>( + new ShardId("index", "index-uuid", idGenerator.getAndIncrement()), + new SearchContextIdForNode(null, randomFrom(nodeIds), randomSearchShardId()) + ) + ); + PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder( + SearchContextId.encode(originalShardIdMap, Collections.emptyMap(), TransportVersion.current(), ShardSearchFailure.EMPTY_ARRAY) + ); + { + // case 1, result shard ids are identical to original PIT ids + ArrayList results = new ArrayList<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + } + { + // case 2, some results are from different nodes but have same context Ids + ArrayList results = new ArrayList<>(); + Set shardsWithSwappedNodes = new HashSet<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + if (randomBoolean()) { + // swap to a different node + results.add( + new PhaseResult(searchContextIdForNode.getSearchContextId(), new SearchShardTarget("otherNode", shardId, null)) + ); + shardsWithSwappedNodes.add(shardId); + } else { + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertNotSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + SearchContextId reEncodedPit = SearchContextId.decode( + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + reEncodedId + ); + assertEquals(originalShardIdMap.size(), reEncodedPit.shards().size()); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode original = originalShardIdMap.get(shardId); + SearchContextIdForNode reEncoded = reEncodedPit.shards().get(shardId); + assertNotNull(reEncoded); + if (shardsWithSwappedNodes.contains(shardId)) { + assertNotEquals(original.getNode(), reEncoded.getNode()); + } else { + assertEquals(original.getNode(), reEncoded.getNode()); + } + assertEquals(original.getSearchContextId(), reEncoded.getSearchContextId()); + } + } + { + // case 3, result shard ids are identical to original PIT id but some are missing. Stay with original PIT id in this case + ArrayList results = new ArrayList<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + if (randomBoolean()) { + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + } + } + + private ShardSearchContextId randomSearchShardId() { + return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + } + private static ArraySearchPhaseResults phaseResults( Set contextIds, List> nodeLookups, @@ -269,5 +387,10 @@ private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; } + + PhaseResult(ShardSearchContextId contextId, SearchShardTarget searchShardTarget) { + this.contextId = contextId; + this.searchShardTarget = searchShardTarget; + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 07ffb3ab9a4eb..af43553c5b9ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -9,6 +9,8 @@ package org.elasticsearch.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; @@ -22,6 +24,7 @@ import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.telemetry.tracing.Tracer; @@ -29,6 +32,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -40,6 +44,8 @@ public class MockSearchService extends SearchService { */ public static class TestPlugin extends Plugin {} + private static final Logger logger = LogManager.getLogger(MockSearchService.class); + private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); private Consumer onPutContext = context -> {}; @@ -66,7 +72,7 @@ public static void assertNoInFlightContext() { * Add an active search context to the list of tracked contexts. Package private for testing. */ static void addActiveContext(ReaderContext context) { - ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(context.toString())); + ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(String.format(Locale.ROOT, "%s : %s", context.toString(), context.id()))); } /** @@ -110,7 +116,7 @@ protected void putReaderContext(ReaderContext context) { } @Override - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ShardSearchContextId id) { final ReaderContext removed = super.removeReaderContext(id); if (removed != null) { onRemoveContext.accept(removed); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index ef7fd2c6b065d..25d8f66761ba0 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -6,9 +6,11 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.ClosePointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; @@ -22,13 +24,18 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.MockSearchService; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotId; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -37,6 +44,13 @@ public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(MockSearchService.TestPlugin.class); + return plugins; + } + public void testSearcherId() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final int numberOfShards = between(1, 5); @@ -109,9 +123,10 @@ public void testSearcherId() throws Exception { public void testRetryPointInTime() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + int numberOfShards = between(1, 5); assertAcked( indicesAdmin().prepareCreate(indexName) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build()) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards).build()) .setMapping(""" {"properties":{"created_date":{"type": "date", "format": "yyyy-MM-dd"}}}""") ); @@ -146,6 +161,12 @@ public void testRetryPointInTime() throws Exception { IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED ).keepAlive(TimeValue.timeValueMinutes(2)); final BytesReference pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId(); + assertEquals(numberOfShards, SearchContextId.decode(writableRegistry(), pitId).shards().size()); + logger.info( + "---> Original PIT id: " + + new PointInTimeBuilder(pitId).getSearchContextId(this.writableRegistry()).toString().replace("},", "\n") + ); + SetOnce updatedPit = new SetOnce<>(); try { assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> { assertThat(resp.pointInTimeId(), equalTo(pitId)); @@ -157,19 +178,45 @@ public void testRetryPointInTime() throws Exception { ensureGreen(indexName); } ensureGreen(indexName); + + // we run a search after the restart to ensure that all shards from the PIT have re-created their search contexts + assertNoFailuresAndResponse( + prepareSearch().setSearchType(SearchType.QUERY_THEN_FETCH) + .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here + .setPointInTime(new PointInTimeBuilder(pitId)), + resp -> { + assertHitCount(resp, docCount); + updatedPit.set(resp.pointInTimeId()); + } + ); + logger.info("--> first search after node restart finished"); + + // At this point we should have re-created all contexts, running a second search + // should not re-trigger creation of new contexts. Lets check this. + final AtomicLong newContexts = new AtomicLong(0); + for (String allocatedNode : allocatedNodes) { + MockSearchService searchService = (MockSearchService) internalCluster().getInstance(SearchService.class, allocatedNode); + searchService.setOnPutContext(context -> { newContexts.incrementAndGet(); }); + } + assertNoFailuresAndResponse( prepareSearch().setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) .setSearchType(SearchType.QUERY_THEN_FETCH) .setPreFilterShardSize(between(1, 10)) - .setAllowPartialSearchResults(true) - .setPointInTime(new PointInTimeBuilder(pitId)), + .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here + .setPointInTime(new PointInTimeBuilder(updatedPit.get())), resp -> { - assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertThat(resp.pointInTimeId(), equalTo(updatedPit.get())); assertHitCount(resp, docCount); } ); + logger.info("--> second search after node restart finished"); + assertThat("Search should not create new contexts", newContexts.get(), equalTo(0L)); + } catch (Exception e) { + logger.error("---> unexpected exception", e); + throw e; } finally { - client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); + client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(updatedPit.get())).actionGet(); } } } From 730efd41416b4477fc8685babea3ab63c5ff959e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Mon, 22 Sep 2025 23:03:25 +0200 Subject: [PATCH 2/7] Update docs/changelog/135231.yaml --- docs/changelog/135231.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/135231.yaml diff --git a/docs/changelog/135231.yaml b/docs/changelog/135231.yaml new file mode 100644 index 0000000000000..c204375119e09 --- /dev/null +++ b/docs/changelog/135231.yaml @@ -0,0 +1,5 @@ +pr: 135231 +summary: Improve PIT context relocation +area: Search +type: enhancement +issues: [] From ad4219c9f84114e837dc6d65b358f14618e45716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 25 Sep 2025 14:24:39 +0200 Subject: [PATCH 3/7] Add keepalive to second and third query --- .../xpack/searchablesnapshots/RetrySearchIntegTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index 25d8f66761ba0..f70460d4f0a50 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -183,7 +183,7 @@ public void testRetryPointInTime() throws Exception { assertNoFailuresAndResponse( prepareSearch().setSearchType(SearchType.QUERY_THEN_FETCH) .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here - .setPointInTime(new PointInTimeBuilder(pitId)), + .setPointInTime(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueMinutes(2))), resp -> { assertHitCount(resp, docCount); updatedPit.set(resp.pointInTimeId()); @@ -204,7 +204,7 @@ public void testRetryPointInTime() throws Exception { .setSearchType(SearchType.QUERY_THEN_FETCH) .setPreFilterShardSize(between(1, 10)) .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here - .setPointInTime(new PointInTimeBuilder(updatedPit.get())), + .setPointInTime(new PointInTimeBuilder(updatedPit.get()).setKeepAlive(TimeValue.timeValueMinutes(2))), resp -> { assertThat(resp.pointInTimeId(), equalTo(updatedPit.get())); assertHitCount(resp, docCount); From 6bcb45f2c55a73a2908b178e5d5b66fb394d08ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 26 Sep 2025 22:08:56 +0200 Subject: [PATCH 4/7] Adressing review comments --- server/src/main/java/module-info.java | 1 + .../search/AbstractSearchAsyncAction.java | 43 ++++++++++--- .../action/search/ClearScrollController.java | 55 ----------------- .../TransportClosePointInTimeAction.java | 60 ++++++++++++++++++- .../AbstractSearchAsyncActionTests.java | 53 ++++++++++++++-- 5 files changed, 143 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index cfe6345d7e590..2aeeb4cd6c700 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -55,6 +55,7 @@ requires org.apache.lucene.queryparser; requires org.apache.lucene.sandbox; requires org.apache.lucene.suggest; + requires jdk.jdi; exports org.elasticsearch; exports org.elasticsearch.action; diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 6036bec3b8bac..babdf0ae85986 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.Maps; @@ -41,10 +42,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -101,6 +102,7 @@ abstract class AbstractSearchAsyncAction exten // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); + private final DiscoveryNodes discoveryNodes; AbstractSearchAsyncAction( String name, @@ -155,6 +157,7 @@ abstract class AbstractSearchAsyncAction exten this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); this.mintransportVersion = clusterState.getMinTransportVersion(); + this.discoveryNodes = clusterState.nodes(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -623,7 +626,10 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { results.getAtomicArray().asList(), failures, namedWriteableRegistry, - mintransportVersion + mintransportVersion, + searchTransportService, + discoveryNodes, + logger ); } else { return null; @@ -635,7 +641,10 @@ static BytesReference maybeReEncodeNodeIds( List results, ShardSearchFailure[] failures, NamedWriteableRegistry namedWriteableRegistry, - TransportVersion mintransportVersion + TransportVersion mintransportVersion, + SearchTransportService searchTransportService, + DiscoveryNodes nodes, + Logger logger ) { SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry); boolean idChanged = false; @@ -644,10 +653,8 @@ static BytesReference maybeReEncodeNodeIds( SearchShardTarget searchShardTarget = result.getSearchShardTarget(); ShardId shardId = searchShardTarget.getShardId(); SearchContextIdForNode originalShard = original.shards().get(shardId); - if (originalShard != null - && Objects.equals(originalShard.getClusterAlias(), searchShardTarget.getClusterAlias()) - && Objects.equals(originalShard.getSearchContextId(), result.getContextId())) { - // result shard and context id match the originalShard one, check if the node is different and replace if so + if (originalShard != null && originalShard.getSearchContextId() != null && originalShard.getSearchContextId().isRetryable()) { + // check if the node is different, if so we need to re-encode the PIT String originalNode = originalShard.getNode(); if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) { // the target node for this shard entry in the PIT has changed, we need to update it @@ -658,15 +665,32 @@ static BytesReference maybeReEncodeNodeIds( updatedShardMap.put( shardId, new SearchContextIdForNode( - originalShard.getClusterAlias(), + searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId(), - originalShard.getSearchContextId() + result.getContextId() ) ); } } } if (idChanged) { + // we free all old contexts that have moved, just in case we have re-tried them elsewhere but they still exist in the old + // location + Collection contextsToClose = updatedShardMap.keySet() + .stream() + .map(shardId -> original.shards().get(shardId)) + .collect(Collectors.toCollection(ArrayList::new)); + TransportClosePointInTimeAction.closeContexts(nodes, searchTransportService, contextsToClose, new ActionListener() { + @Override + public void onResponse(Integer integer) { + // ignore + } + + @Override + public void onFailure(Exception e) { + logger.trace("Failure while freeing old point in time contexts", e); + } + }); // we also need to add shard that are not in the results for some reason (e.g. query rewrote to match none) but that // were part of the original PIT for (ShardId shardId : original.shards().keySet()) { @@ -674,6 +698,7 @@ static BytesReference maybeReEncodeNodeIds( updatedShardMap.put(shardId, original.shards().get(shardId)); } } + return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, failures); } else { return originalPit.getEncodedId(); diff --git a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index bbd290a06a7f0..9d2002a48f6a2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -13,20 +13,14 @@ import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; -import java.util.stream.Collectors; import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; @@ -143,53 +137,4 @@ private void onFailedFreedContext(Throwable e, DiscoveryNode node) { private void finish() { listener.onResponse(new ClearScrollResponse(hasFailed.get() == false, freedSearchContexts.get())); } - - /** - * Closes the given context id and reports the number of freed contexts via the listener - */ - public static void closeContexts( - DiscoveryNodes nodes, - SearchTransportService searchTransportService, - Collection contextIds, - ActionListener listener - ) { - final Set clusters = contextIds.stream() - .map(SearchContextIdForNode::getClusterAlias) - .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false) - .collect(Collectors.toSet()); - final ListenableFuture> lookupListener = new ListenableFuture<>(); - if (clusters.isEmpty()) { - lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId)); - } else { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } - lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> { - final var successes = new AtomicInteger(); - try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { - for (SearchContextIdForNode contextId : contextIds) { - if (contextId.getNode() == null) { - // the shard was missing when creating the PIT, ignore. - continue; - } - final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); - if (node != null) { - try { - searchTransportService.sendFreeContext( - searchTransportService.getConnection(contextId.getClusterAlias(), node), - contextId.getSearchContextId(), - refs.acquireListener().map(r -> { - if (r.isFreed()) { - successes.incrementAndGet(); - } - return null; - }) - ); - } catch (Exception e) { - // ignored - } - } - } - } - })); - } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java index 8fc954ca81ebf..6affa5574b769 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java @@ -13,14 +13,23 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.stream.Collectors; public class TransportClosePointInTimeAction extends HandledTransportAction { @@ -47,11 +56,60 @@ public TransportClosePointInTimeAction( protected void doExecute(Task task, ClosePointInTimeRequest request, ActionListener listener) { final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId()); final Collection contextIds = searchContextId.shards().values(); - ClearScrollController.closeContexts( + closeContexts( clusterService.state().nodes(), searchTransportService, contextIds, listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed)) ); } + + /** + * Closes the given context id and reports the number of freed contexts via the listener + */ + public static void closeContexts( + DiscoveryNodes nodes, + SearchTransportService searchTransportService, + Collection contextIds, + ActionListener listener + ) { + final Set clusters = contextIds.stream() + .map(SearchContextIdForNode::getClusterAlias) + .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false) + .collect(Collectors.toSet()); + final ListenableFuture> lookupListener = new ListenableFuture<>(); + if (clusters.isEmpty()) { + lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId)); + } else { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } + lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> { + final var successes = new AtomicInteger(); + try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { + for (SearchContextIdForNode contextId : contextIds) { + if (contextId.getNode() == null) { + // the shard was missing when creating the PIT, ignore. + continue; + } + final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); + if (node != null) { + try { + searchTransportService.sendFreeContext( + searchTransportService.getConnection(contextId.getClusterAlias(), node), + contextId.getSearchContextId(), + refs.acquireListener().map(r -> { + if (r.isFreed()) { + successes.incrementAndGet(); + } + return null; + }) + ); + } catch (Exception e) { + // ignored + } + } + } + } + })); + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 26612e17d08fb..595c82030b00a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -15,6 +15,9 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -33,6 +36,7 @@ import org.elasticsearch.transport.Transport; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -258,12 +262,38 @@ public void testMaybeReEncode() { 20, () -> new Tuple<>( new ShardId("index", "index-uuid", idGenerator.getAndIncrement()), - new SearchContextIdForNode(null, randomFrom(nodeIds), randomSearchShardId()) + new SearchContextIdForNode( + null, + randomFrom(nodeIds), + new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), randomUUID()) + ) ) ); PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder( SearchContextId.encode(originalShardIdMap, Collections.emptyMap(), TransportVersion.current(), ShardSearchFailure.EMPTY_ARRAY) ); + + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + Arrays.stream(nodeIds).forEach(id -> nodesBuilder.add(DiscoveryNodeUtils.create(id))); + DiscoveryNodes nodes = nodesBuilder.build(); + final Set freedContexts = new CopyOnWriteArraySet<>(); + SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + freedContexts.add(contextId); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + { // case 1, result shard ids are identical to original PIT ids ArrayList results = new ArrayList<>(); @@ -281,10 +311,15 @@ public void testMaybeReEncode() { results, ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - TransportVersionUtils.randomCompatibleVersion(random()) + TransportVersionUtils.randomCompatibleVersion(random()), + searchTransportService, + nodes, + logger ); + assertEquals(0, freedContexts.size()); assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); } + freedContexts.clear(); { // case 2, some results are from different nodes but have same context Ids ArrayList results = new ArrayList<>(); @@ -311,7 +346,10 @@ public void testMaybeReEncode() { results, ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - TransportVersionUtils.randomCompatibleVersion(random()) + TransportVersionUtils.randomCompatibleVersion(random()), + searchTransportService, + nodes, + logger ); assertNotSame(reEncodedId, pointInTimeBuilder.getEncodedId()); SearchContextId reEncodedPit = SearchContextId.decode( @@ -319,18 +357,22 @@ public void testMaybeReEncode() { reEncodedId ); assertEquals(originalShardIdMap.size(), reEncodedPit.shards().size()); + logger.info(freedContexts); for (ShardId shardId : originalShardIdMap.keySet()) { SearchContextIdForNode original = originalShardIdMap.get(shardId); SearchContextIdForNode reEncoded = reEncodedPit.shards().get(shardId); assertNotNull(reEncoded); if (shardsWithSwappedNodes.contains(shardId)) { assertNotEquals(original.getNode(), reEncoded.getNode()); + assertTrue(freedContexts.contains(reEncoded.getSearchContextId())); } else { assertEquals(original.getNode(), reEncoded.getNode()); + assertFalse(freedContexts.contains(reEncoded.getSearchContextId())); } assertEquals(original.getSearchContextId(), reEncoded.getSearchContextId()); } } + freedContexts.clear(); { // case 3, result shard ids are identical to original PIT id but some are missing. Stay with original PIT id in this case ArrayList results = new ArrayList<>(); @@ -350,7 +392,10 @@ public void testMaybeReEncode() { results, ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - TransportVersionUtils.randomCompatibleVersion(random()) + TransportVersionUtils.randomCompatibleVersion(random()), + searchTransportService, + nodes, + logger ); assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); } From 841c5b365cb884db910df3ca687d481799220626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 8 Oct 2025 23:20:20 +0200 Subject: [PATCH 5/7] Rework failure usage in updating PIT id --- .../search/AbstractSearchAsyncAction.java | 5 +--- .../AbstractSearchAsyncActionTests.java | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index babdf0ae85986..96624aaaa7912 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -624,7 +624,6 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { return maybeReEncodeNodeIds( source.pointInTimeBuilder(), results.getAtomicArray().asList(), - failures, namedWriteableRegistry, mintransportVersion, searchTransportService, @@ -639,7 +638,6 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { static BytesReference maybeReEncodeNodeIds( PointInTimeBuilder originalPit, List results, - ShardSearchFailure[] failures, NamedWriteableRegistry namedWriteableRegistry, TransportVersion mintransportVersion, SearchTransportService searchTransportService, @@ -698,8 +696,7 @@ public void onFailure(Exception e) { updatedShardMap.put(shardId, original.shards().get(shardId)); } } - - return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, failures); + return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, ShardSearchFailure.EMPTY_ARRAY); } else { return originalPit.getEncodedId(); } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 595c82030b00a..3125e8a627028 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -302,14 +302,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod results.add( new PhaseResult( searchContextIdForNode.getSearchContextId(), - new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, searchContextIdForNode.getClusterAlias()) ) ); } BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( pointInTimeBuilder, results, - ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), TransportVersionUtils.randomCompatibleVersion(random()), searchTransportService, @@ -326,25 +325,28 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod Set shardsWithSwappedNodes = new HashSet<>(); for (ShardId shardId : originalShardIdMap.keySet()) { SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); - if (randomBoolean()) { + // only swap node for ids there have a non-null node id, i.e. those that didn't fail when opening a PIT + if (randomBoolean() && searchContextIdForNode.getNode() != null) { // swap to a different node + PhaseResult otherNode = new PhaseResult(searchContextIdForNode.getSearchContextId(), + new SearchShardTarget("otherNode", shardId, searchContextIdForNode.getClusterAlias())); results.add( - new PhaseResult(searchContextIdForNode.getSearchContextId(), new SearchShardTarget("otherNode", shardId, null)) + otherNode ); shardsWithSwappedNodes.add(shardId); } else { results.add( - new PhaseResult( - searchContextIdForNode.getSearchContextId(), - new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) - ) + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, + searchContextIdForNode.getClusterAlias()) + ) ); } } BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( pointInTimeBuilder, results, - ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), TransportVersionUtils.randomCompatibleVersion(random()), searchTransportService, @@ -382,7 +384,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod results.add( new PhaseResult( searchContextIdForNode.getSearchContextId(), - new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, searchContextIdForNode.getClusterAlias()) ) ); } @@ -390,7 +392,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( pointInTimeBuilder, results, - ShardSearchFailure.EMPTY_ARRAY, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), TransportVersionUtils.randomCompatibleVersion(random()), searchTransportService, From b76a959916b132132ec7516d3f1e4cbbc932759d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 8 Oct 2025 21:33:50 +0000 Subject: [PATCH 6/7] [CI] Auto commit changes from spotless --- .../search/AbstractSearchAsyncActionTests.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 3125e8a627028..83ce564fcae9c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -328,19 +328,18 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod // only swap node for ids there have a non-null node id, i.e. those that didn't fail when opening a PIT if (randomBoolean() && searchContextIdForNode.getNode() != null) { // swap to a different node - PhaseResult otherNode = new PhaseResult(searchContextIdForNode.getSearchContextId(), - new SearchShardTarget("otherNode", shardId, searchContextIdForNode.getClusterAlias())); - results.add( - otherNode + PhaseResult otherNode = new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget("otherNode", shardId, searchContextIdForNode.getClusterAlias()) ); + results.add(otherNode); shardsWithSwappedNodes.add(shardId); } else { results.add( - new PhaseResult( - searchContextIdForNode.getSearchContextId(), - new SearchShardTarget(searchContextIdForNode.getNode(), shardId, - searchContextIdForNode.getClusterAlias()) - ) + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, searchContextIdForNode.getClusterAlias()) + ) ); } } From 9085194a7a44a15e27d2e853f7a6a30a19e24a95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 9 Oct 2025 16:13:49 +0200 Subject: [PATCH 7/7] Adding feature flag Only store retried contexts and rewrite PIT when the feature is enabled. --- .../search/AbstractSearchAsyncAction.java | 25 +++++++++------- .../elasticsearch/search/SearchService.java | 30 ++++++++++++------- .../RetrySearchIntegTests.java | 4 ++- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 96624aaaa7912..eb0006c84719a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -620,16 +621,20 @@ protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { SearchSourceBuilder source = request.source(); // only (re-)build a search context id if we have a point in time if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) { - // we want to change node ids in the PIT id if any shards and its PIT context have moved - return maybeReEncodeNodeIds( - source.pointInTimeBuilder(), - results.getAtomicArray().asList(), - namedWriteableRegistry, - mintransportVersion, - searchTransportService, - discoveryNodes, - logger - ); + if (SearchService.PIT_RELOCATION_FEATURE_FLAG.isEnabled()) { + // we want to change node ids in the PIT id if any shards and its PIT context have moved + return maybeReEncodeNodeIds( + source.pointInTimeBuilder(), + results.getAtomicArray().asList(), + namedWriteableRegistry, + mintransportVersion, + searchTransportService, + discoveryNodes, + logger + ); + } else { + return source.pointInTimeBuilder().getEncodedId(); + } } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 7c637c158145d..fe5698d4fdbbf 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -299,6 +299,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final FeatureFlag BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase"); + public static final FeatureFlag PIT_RELOCATION_FEATURE_FLAG = new FeatureFlag("pit_relocation_feature"); + /** * The size of the buffer used for memory accounting. * This buffer is used to locally track the memory accummulated during the execution of @@ -1280,17 +1282,23 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { searcherSupplier.close(); throw e; } - // we are using a PIT here so set singleSession to false to prevent clearing after the search finishes - ReaderContext readerContext = createAndPutReaderContext( - contextId, - request, - indexService, - shard, - searcherSupplier, - false, - keepAliveInMillis - ); - logger.debug("Recreated reader context [{}]", readerContext.id()); + ReaderContext readerContext = null; + if (PIT_RELOCATION_FEATURE_FLAG.isEnabled()) { + // we are using a PIT here so set singleSession to false to prevent clearing after the search finishes + readerContext = createAndPutReaderContext( + contextId, + request, + indexService, + shard, + searcherSupplier, + false, + keepAliveInMillis + ); + logger.debug("Recreated reader context [{}]", readerContext.id()); + } else { + // when feature is disabled, stay with the old way of just adding a temporary context + readerContext = createAndPutReaderContext(request, indexService, shard, searcherSupplier, defaultKeepAlive); + } return readerContext; } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index f70460d4f0a50..8b0910d5e1e9c 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -211,7 +211,9 @@ public void testRetryPointInTime() throws Exception { } ); logger.info("--> second search after node restart finished"); - assertThat("Search should not create new contexts", newContexts.get(), equalTo(0L)); + if (SearchService.PIT_RELOCATION_FEATURE_FLAG.isEnabled()) { + assertThat("Search should not create new contexts", newContexts.get(), equalTo(0L)); + } } catch (Exception e) { logger.error("---> unexpected exception", e); throw e;