diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java index a54e19b839ad3..fe994abc8764b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java @@ -11,11 +11,12 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.ParsedScrollId; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; @@ -28,6 +29,7 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; @@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { } finally { respFromProdIndex.decRef(); } - SearchPhaseExecutionException error = expectThrows( - SearchPhaseExecutionException.class, - client().prepareSearchScroll(respFromDemoIndexScrollId) + SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId); + SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder); + assertEquals(1, error.shardFailures().length); + ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId(); + ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId(); + assertThat( + error.shardFailures()[0].getCause().getMessage(), + containsString("No search context found for id [" + shardSearchContextId + "]") ); - for (ShardSearchFailure shardSearchFailure : error.shardFailures()) { - assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]")); - } client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef(); } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..c53f52bcccf08 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -31,6 +32,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -39,8 +41,10 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -93,6 +97,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + private final TransportVersion mintransportVersion; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -149,6 +154,7 @@ abstract class AbstractSearchAsyncAction exten this.nodeIdToConnection = nodeIdToConnection; this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); + this.mintransportVersion = clusterState.getMinTransportVersion(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -416,6 +422,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar onShardGroupFailure(shardIndex, shard, e); } if (lastShard == false) { + logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard); performPhaseOnShard(shardIndex, shardIt, nextShard); } else { // count down outstanding shards, we're done with this shard as there's no more copies to try @@ -607,10 +614,77 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At } protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { - var source = request.source(); - return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false - ? source.pointInTimeBuilder().getEncodedId() - : null; + SearchSourceBuilder source = request.source(); + // only (re-)build a search context id if we have a point in time + if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) { + // we want to change node ids in the PIT id if any shards and its PIT context have moved + BytesReference bytesReference = maybeReEncodeNodeIds( + source.pointInTimeBuilder(), + results.getAtomicArray().asList(), + failures, + namedWriteableRegistry, + mintransportVersion + ); + if ((bytesReference == source.pointInTimeBuilder().getEncodedId()) == false) { + logger.info( + "Changing PIT to: [{}]", + new PointInTimeBuilder(bytesReference).getSearchContextId(namedWriteableRegistry).toString().replace("},", "\n") + ); + } + return bytesReference; + } else { + return null; + } + } + + static BytesReference maybeReEncodeNodeIds( + PointInTimeBuilder originalPit, + List results, + ShardSearchFailure[] failures, + NamedWriteableRegistry namedWriteableRegistry, + TransportVersion mintransportVersion + ) { + SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry); + boolean idChanged = false; + Map updatedShardMap = null; // only create this if we detect a change + for (Result result : results) { + SearchShardTarget searchShardTarget = result.getSearchShardTarget(); + ShardId shardId = searchShardTarget.getShardId(); + SearchContextIdForNode originalShard = original.shards().get(shardId); + if (originalShard != null + && Objects.equals(originalShard.getClusterAlias(), searchShardTarget.getClusterAlias()) + && Objects.equals(originalShard.getSearchContextId(), result.getContextId())) { + // result shard and context id match the originalShard one, check if the node is different and replace if so + String originalNode = originalShard.getNode(); + if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) { + // the target node for this shard entry in the PIT has changed, we need to update it + idChanged = true; + if (updatedShardMap == null) { + updatedShardMap = new HashMap<>(original.shards().size()); + } + updatedShardMap.put( + shardId, + new SearchContextIdForNode( + originalShard.getClusterAlias(), + searchShardTarget.getNodeId(), + originalShard.getSearchContextId() + ) + ); + } + } + } + if (idChanged) { + // we also need to add shard that are not in the results for some reason (e.g. query rewrote to match none) but that + // were part of the original PIT + for (ShardId shardId : original.shards().keySet()) { + if (updatedShardMap.containsKey(shardId) == false) { + updatedShardMap.put(shardId, original.shards().get(shardId)); + } + } + return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, failures); + } else { + return originalPit.getEncodedId(); + } } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index bbd290a06a7f0..292f630b267fa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -8,20 +8,27 @@ */ package org.elasticsearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +45,7 @@ public final class ClearScrollController implements Runnable { private final AtomicBoolean hasFailed = new AtomicBoolean(false); private final AtomicInteger freedSearchContexts = new AtomicInteger(0); private final Logger logger; + private static final Logger staticLogger = LogManager.getLogger(ClearScrollController.class); private final Runnable runner; ClearScrollController( @@ -148,12 +156,15 @@ private void finish() { * Closes the given context id and reports the number of freed contexts via the listener */ public static void closeContexts( - DiscoveryNodes nodes, + ClusterService clusterService, + ProjectResolver projectResolver, SearchTransportService searchTransportService, - Collection contextIds, + Map shards, ActionListener listener ) { - final Set clusters = contextIds.stream() + DiscoveryNodes nodes = clusterService.state().nodes(); + final Set clusters = shards.values() + .stream() .map(SearchContextIdForNode::getClusterAlias) .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false) .collect(Collectors.toSet()); @@ -166,16 +177,34 @@ public static void closeContexts( lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> { final var successes = new AtomicInteger(); try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { - for (SearchContextIdForNode contextId : contextIds) { + for (Entry entry : shards.entrySet()) { + var contextId = entry.getValue(); if (contextId.getNode() == null) { // the shard was missing when creating the PIT, ignore. continue; } final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); + + Set targetNodes; if (node != null) { + targetNodes = Collections.singleton(node); + } else { + staticLogger.info("---> missing node when closing context: " + contextId.getNode()); + // TODO we won't be able to use this with remote clusters + IndexShardRoutingTable indexShardRoutingTable = clusterService.state() + .routingTable(projectResolver.getProjectId()) + .shardRoutingTable(entry.getKey()); + targetNodes = indexShardRoutingTable.assignedUnpromotableShards() + .stream() + .map(ShardRouting::currentNodeId) + .map(nodeId -> nodeLookup.apply(contextId.getClusterAlias(), nodeId)) + .collect(Collectors.toSet()); + staticLogger.info("---> trying alternative nodes to close context: " + targetNodes); + } + for (DiscoveryNode targetNode : targetNodes) { try { searchTransportService.sendFreeContext( - searchTransportService.getConnection(contextId.getClusterAlias(), node), + searchTransportService.getConnection(contextId.getClusterAlias(), targetNode), contextId.getSearchContextId(), refs.acquireListener().map(r -> { if (r.isFreed()) { diff --git a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java new file mode 100644 index 0000000000000..29fbabb6073c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Base64; +import java.util.Collections; +import java.util.Map; + +public class PITHelper { + + public static SearchContextId decodePITId(String id) { + return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id))); + } + + public static SearchContextId decodePITId(BytesReference id) { + try (var in = id.streamInput()) { + final TransportVersion version = TransportVersion.readVersion(in); + in.setTransportVersion(version); + final Map shards = Collections.unmodifiableMap( + in.readCollection(Maps::newHashMapWithExpectedSize, (i, map) -> map.put(new ShardId(in), new SearchContextIdForNode(in))) + ); + return new SearchContextId(shards, Collections.emptyMap()); + } catch (IOException e) { + assert false : e; + throw new IllegalArgumentException(e); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index c2f1510341fb0..6173798660c9c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.transport.RemoteClusterAware; @@ -30,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -62,6 +62,26 @@ public static BytesReference encode( Map aliasFilter, TransportVersion version, ShardSearchFailure[] shardFailures + ) { + Map shards = searchPhaseResults.stream() + .collect( + Collectors.toMap( + r -> r.getSearchShardTarget().getShardId(), + r -> new SearchContextIdForNode( + r.getSearchShardTarget().getClusterAlias(), + r.getSearchShardTarget().getNodeId(), + r.getContextId() + ) + ) + ); + return encode(shards, aliasFilter, version, shardFailures); + } + + static BytesReference encode( + Map shards, + Map aliasFilter, + TransportVersion version, + ShardSearchFailure[] shardFailures ) { assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.V_8_16_0) : "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version [" @@ -71,12 +91,12 @@ public static BytesReference encode( out.setTransportVersion(version); TransportVersion.writeVersion(version, out); boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0); - int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0); + int shardSize = shards.size() + (allowNullContextId ? shardFailures.length : 0); out.writeVInt(shardSize); - for (var searchResult : searchPhaseResults) { - final SearchShardTarget target = searchResult.getSearchShardTarget(); - target.getShardId().writeTo(out); - new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out); + for (ShardId shardId : shards.keySet()) { + shardId.writeTo(out); + SearchContextIdForNode searchContextIdForNode = shards.get(shardId); + searchContextIdForNode.writeTo(out); } if (allowNullContextId) { for (var failure : shardFailures) { @@ -142,4 +162,23 @@ public String[] getActualIndices() { } return indices.toArray(String[]::new); } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextId that = (SearchContextId) o; + return Objects.equals(shards, that.shards) + && Objects.equals(aliasFilter, that.aliasFilter) + && Objects.equals(contextIds, that.contextIds); + } + + @Override + public int hashCode() { + return Objects.hash(shards, aliasFilter, contextIds); + } + + @Override + public String toString() { + return "SearchContextId{" + "shards=" + shards + ", aliasFilter=" + aliasFilter + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index f91a9d09f4bb4..f68bd1ea52cb8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -17,6 +17,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId; import java.io.IOException; +import java.util.Objects; public final class SearchContextIdForNode implements Writeable { private final String node; @@ -103,4 +104,18 @@ public String toString() { + '\'' + '}'; } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextIdForNode that = (SearchContextIdForNode) o; + return Objects.equals(node, that.node) + && Objects.equals(searchContextId, that.searchContextId) + && Objects.equals(clusterAlias, that.clusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(node, searchContextId, clusterAlias); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..fb2235cb48664 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -386,8 +386,8 @@ public void writeTo(StreamOutput out) throws IOException { public static void registerRequestHandler(TransportService transportService, SearchService searchService) { final TransportRequestHandler freeContextHandler = (request, channel, task) -> { - logger.trace("releasing search context [{}]", request.id()); boolean freed = searchService.freeReaderContext(request.id()); + logger.trace("releasing search context [{}], [{}]", request.id(), freed); channel.sendResponse(SearchFreeContextResponse.of(freed)); }; final Executor freeContextExecutor = buildFreeContextExecutor(transportService); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java index 8fc954ca81ebf..1009c6ca2312b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java @@ -13,14 +13,16 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import java.util.Collection; +import java.util.Map; public class TransportClosePointInTimeAction extends HandledTransportAction { @@ -28,6 +30,7 @@ public class TransportClosePointInTimeAction extends HandledTransportAction listener) { final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId()); - final Collection contextIds = searchContextId.shards().values(); + Map shards = searchContextId.shards(); ClearScrollController.closeContexts( - clusterService.state().nodes(), + clusterService, + projectResolver, searchTransportService, - contextIds, - listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed)) + shards, + listener.map(freed -> new ClosePointInTimeResponse(freed == shards.size(), freed)) ); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 6366916c0a64f..ca43e70a159c9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -381,6 +381,12 @@ private void executeRequest( ); frozenIndexCheck(resolvedIndices); } + logger.info( + "Executing search request on node [{}] with local indices [{}] and remotes [{}]", + clusterService.getNodeName(), + resolvedIndices.getLocalIndices() == null ? "" : resolvedIndices.getLocalIndices().indices(), + resolvedIndices.getRemoteClusterIndices().keySet() + ); final SearchSourceBuilder source = original.source(); if (shouldOpenPIT(source)) { @@ -1271,6 +1277,8 @@ static List getRemoteShardsIteratorFromPointInTime( // Otherwise, we add the shard iterator without a target node, allowing a partial search failure to // be thrown when a search phase attempts to access it. targetNodes.add(perNode.getNode()); + // TODO this looks like its on the cross-cluster search path, we will need to adapt the retry mechanism here as well I + // think if (perNode.getSearchContextId().getSearcherId() != null) { for (String node : group.allocatedNodes()) { if (node.equals(perNode.getNode()) == false) { @@ -1355,6 +1363,7 @@ private void executeSearch( SearchResponse.Clusters clusters, SearchPhaseProvider searchPhaseProvider ) { + logger.info("Executing search locally."); if (searchRequest.allowPartialSearchResults() == null) { // No user preference defined in search request - apply cluster service default searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults()); @@ -1944,10 +1953,17 @@ static List getLocalShardsIteratorFromPointInTime( try { final ShardIterator shards = OperationRouting.getShards(projectState.routingTable(), shardId); // Prefer executing shard requests on nodes that are part of PIT first. - if (projectState.cluster().nodes().nodeExists(perNode.getNode())) { + boolean nodeExists = projectState.cluster().nodes().nodeExists(perNode.getNode()); + if (nodeExists) { targetNodes.add(perNode.getNode()); + } else { + logger.debug( + "Node [{}] referenced in PIT context id [{}] no longer exists.", + perNode.getNode(), + perNode.getSearchContextId() + ); } - if (perNode.getSearchContextId().getSearcherId() != null) { + if (perNode.getSearchContextId().getSearcherId() != null || nodeExists == false) { for (ShardRouting shard : shards) { if (shard.currentNodeId().equals(perNode.getNode()) == false) { targetNodes.add(shard.currentNodeId()); diff --git a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java index 3c75cf0e87ff1..6c1edc1b221be 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java +++ b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java @@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException { private final ShardSearchContextId contextId; public SearchContextMissingException(ShardSearchContextId contextId) { - super("No search context found for id [" + contextId.getId() + "]"); + super("No search context found for id [" + contextId + "]"); this.contextId = contextId; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index eed80bff2e9bb..9576e65aab28e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -30,7 +30,7 @@ */ public abstract class SearchPhaseResult extends TransportResponse { - private SearchShardTarget searchShardTarget; + protected SearchShardTarget searchShardTarget; private int shardIndex = -1; protected ShardSearchContextId contextId; private ShardSearchRequest shardSearchRequest; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0ce8c20fff943..d29bc76c69868 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -160,6 +160,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER; import static org.elasticsearch.common.Strings.format; @@ -361,7 +362,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicLong idGenerator = new AtomicLong(); - private final Map activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map activeReaders = ConcurrentCollections + .newConcurrentMapWithAggressiveConcurrency(); private final MultiBucketConsumerService multiBucketConsumerService; @@ -540,7 +542,7 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings } protected void putReaderContext(ReaderContext context) { - final long id = context.id().getId(); + final ShardSearchContextId id = context.id(); final ReaderContext previous = activeReaders.put(id, context); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. @@ -552,7 +554,7 @@ protected void putReaderContext(ReaderContext context) { } } - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ShardSearchContextId id) { if (logger.isTraceEnabled()) { logger.trace("removing reader context [{}]", id); } @@ -869,7 +871,7 @@ static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsE private IndexShard getShard(ShardSearchRequest request) { final ShardSearchContextId contextId = request.readerId(); if (contextId != null && sessionId.equals(contextId.getSessionId())) { - final ReaderContext readerContext = activeReaders.get(contextId.getId()); + final ReaderContext readerContext = activeReaders.get(contextId); if (readerContext != null) { return readerContext.indexShard(); } @@ -1247,10 +1249,7 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } - if (sessionId.equals(id.getSessionId()) == false) { - throw new SearchContextMissingException(id); - } - final ReaderContext reader = activeReaders.get(id.getId()); + final ReaderContext reader = activeReaders.get(id); if (reader == null) { throw new SearchContextMissingException(id); } @@ -1264,14 +1263,19 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques } final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { - if (request.readerId() != null) { + ShardSearchContextId contextId = request.readerId(); + final long keepAliveInMillis = getKeepAlive(request); + if (contextId != null) { try { - return findReaderContext(request.readerId(), request); + return findReaderContext(contextId, request); } catch (SearchContextMissingException e) { - final String searcherId = request.readerId().getSearcherId(); + final String searcherId = contextId.getSearcherId(); if (searcherId == null) { throw e; } + // We retry creating a ReaderContext on this node if we can get a searcher with same searcher id as in the original + // PIT context. This is currently possible for e.g. ReadOnlyEngine and its child FrozenEngine where a commitId is + // calculated from the ids of the underlying segments of an index commit final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(); @@ -1279,10 +1283,24 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { searcherSupplier.close(); throw e; } - return createAndPutReaderContext(request, indexService, shard, searcherSupplier, defaultKeepAlive); + // we are using a PIT here so set singleSession to false to prevent clearing after the search finishes + ReaderContext readerContext = createAndPutReaderContext( + contextId, + request, + indexService, + shard, + searcherSupplier, + false, + keepAliveInMillis + ); + logger.debug( + "Recreated reader context [{}] on node [{}]", + readerContext.id(), + clusterService.state().nodes().getLocalNode() + ); + return readerContext; } } - final long keepAliveInMillis = getKeepAlive(request); final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis); @@ -1294,12 +1312,24 @@ final ReaderContext createAndPutReaderContext( IndexShard shard, Engine.SearcherSupplier reader, long keepAliveInMillis + ) { + final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); + return createAndPutReaderContext(id, request, indexService, shard, reader, true, keepAliveInMillis); + } + + public ReaderContext createAndPutReaderContext( + ShardSearchContextId id, + ShardSearchRequest request, + IndexService indexService, + IndexShard shard, + Engine.SearcherSupplier reader, + boolean singleSession, + long keepAliveInMillis ) { ReaderContext readerContext = null; Releasable decreaseScrollContexts = null; try { - final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); - if (request.scroll() != null) { + if (request != null && request.scroll() != null) { decreaseScrollContexts = openScrollContexts::decrementAndGet; if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { throw new TooManyScrollContextsException(maxOpenScrollContext, MAX_OPEN_SCROLL_CONTEXT.getKey()); @@ -1308,7 +1338,7 @@ final ReaderContext createAndPutReaderContext( readerContext.addOnClose(decreaseScrollContexts); decreaseScrollContexts = null; } else { - readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true); + readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, singleSession); } reader = null; final ReaderContext finalReaderContext = readerContext; @@ -1351,6 +1381,11 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen searcherSupplier = null; // transfer ownership to reader context searchOperationListener.onNewReaderContext(readerContext); readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); + logger.debug( + "Opening new reader context [{}] on node [{}]", + readerContext.id(), + clusterService.state().nodes().getLocalNode() + ); putReaderContext(readerContext); readerContext = null; listener.onResponse(finalReaderContext.id()); @@ -1474,12 +1509,9 @@ private void freeAllContextsForShard(ShardId shardId) { public boolean freeReaderContext(ShardSearchContextId contextId) { logger.trace("freeing reader context [{}]", contextId); - if (sessionId.equals(contextId.getSessionId())) { - try (ReaderContext context = removeReaderContext(contextId.getId())) { - return context != null; - } + try (ReaderContext context = removeReaderContext(contextId)) { + return context != null; } - return false; } public void freeAllScrollContexts() { @@ -1836,6 +1868,15 @@ public int getActiveContexts() { return this.activeReaders.size(); } + public List getActiveContexts(ShardId shardId) { + return this.activeReaders.values() + .stream() + .filter(c -> c.singleSession() == false) + .filter(c -> c.scrollContext() == null) + .filter(c -> c.indexShard().shardId().equals(shardId)) + .collect(Collectors.toList()); + } + /** * Returns the number of scroll contexts opened on the node */ diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index c15b604b5b5fc..4e6ee9f5e85ba 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -116,6 +116,10 @@ private void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.accumulateAndGet(keepAlive, Math::max); } + public long keepAlive() { + return keepAlive.longValue(); + } + /** * Returns a releasable to indicate that the caller has stopped using this reader. * The time to live of the reader after usage can be extended using the provided diff --git a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index 2741ac37ce519..dc239a7526bc5 100644 --- a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -1258,6 +1258,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { DiscoveryNode node = DiscoveryNodeUtils.create("node_g"); failureCause = new NodeClosedException(node); failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause); + ShardSearchContextId shardSearchContextId = new ShardSearchContextId(UUIDs.randomBase64UUID(), 0, null); ShardSearchFailure[] shardFailures = new ShardSearchFailure[] { new ShardSearchFailure( new ParsingException(0, 0, "Parsing g", null), @@ -1267,10 +1268,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { new RepositoryException("repository_g", "Repo"), new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null) ), - new ShardSearchFailure( - new SearchContextMissingException(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L)), - null - ) }; + new ShardSearchFailure(new SearchContextMissingException(shardSearchContextId), null) }; failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures); expectedCause = new ElasticsearchException( "Elasticsearch exception [type=node_closed_exception, " + "reason=node closed " + node + "]" @@ -1293,7 +1291,10 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { ); expected.addSuppressed( new ElasticsearchException( - "Elasticsearch exception [type=search_context_missing_exception, " + "reason=No search context found for id [0]]" + "Elasticsearch exception [type=search_context_missing_exception, " + + "reason=No search context found for id [" + + shardSearchContextId + + "]]" ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..26612e17d08fb 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -9,30 +9,38 @@ package org.elasticsearch.action.search; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.transport.Transport; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -242,6 +250,116 @@ public SearchShardTarget getSearchShardTarget() { assertEquals(0, searchPhaseExecutionException.getSuppressed().length); } + public void testMaybeReEncode() { + String[] nodeIds = new String[] { "node1", "node2", "node3" }; + AtomicInteger idGenerator = new AtomicInteger(0); + Map originalShardIdMap = randomMap( + 10, + 20, + () -> new Tuple<>( + new ShardId("index", "index-uuid", idGenerator.getAndIncrement()), + new SearchContextIdForNode(null, randomFrom(nodeIds), randomSearchShardId()) + ) + ); + PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder( + SearchContextId.encode(originalShardIdMap, Collections.emptyMap(), TransportVersion.current(), ShardSearchFailure.EMPTY_ARRAY) + ); + { + // case 1, result shard ids are identical to original PIT ids + ArrayList results = new ArrayList<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + } + { + // case 2, some results are from different nodes but have same context Ids + ArrayList results = new ArrayList<>(); + Set shardsWithSwappedNodes = new HashSet<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + if (randomBoolean()) { + // swap to a different node + results.add( + new PhaseResult(searchContextIdForNode.getSearchContextId(), new SearchShardTarget("otherNode", shardId, null)) + ); + shardsWithSwappedNodes.add(shardId); + } else { + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertNotSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + SearchContextId reEncodedPit = SearchContextId.decode( + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + reEncodedId + ); + assertEquals(originalShardIdMap.size(), reEncodedPit.shards().size()); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode original = originalShardIdMap.get(shardId); + SearchContextIdForNode reEncoded = reEncodedPit.shards().get(shardId); + assertNotNull(reEncoded); + if (shardsWithSwappedNodes.contains(shardId)) { + assertNotEquals(original.getNode(), reEncoded.getNode()); + } else { + assertEquals(original.getNode(), reEncoded.getNode()); + } + assertEquals(original.getSearchContextId(), reEncoded.getSearchContextId()); + } + } + { + // case 3, result shard ids are identical to original PIT id but some are missing. Stay with original PIT id in this case + ArrayList results = new ArrayList<>(); + for (ShardId shardId : originalShardIdMap.keySet()) { + SearchContextIdForNode searchContextIdForNode = originalShardIdMap.get(shardId); + if (randomBoolean()) { + results.add( + new PhaseResult( + searchContextIdForNode.getSearchContextId(), + new SearchShardTarget(searchContextIdForNode.getNode(), shardId, null) + ) + ); + } + } + BytesReference reEncodedId = AbstractSearchAsyncAction.maybeReEncodeNodeIds( + pointInTimeBuilder, + results, + ShardSearchFailure.EMPTY_ARRAY, + new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), + TransportVersionUtils.randomCompatibleVersion(random()) + ); + assertSame(reEncodedId, pointInTimeBuilder.getEncodedId()); + } + } + + private ShardSearchContextId randomSearchShardId() { + return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()); + } + private static ArraySearchPhaseResults phaseResults( Set contextIds, List> nodeLookups, @@ -269,5 +387,10 @@ private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; } + + PhaseResult(ShardSearchContextId contextId, SearchShardTarget searchShardTarget) { + this.contextId = contextId; + this.searchShardTarget = searchShardTarget; + } } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index d3cd2a62ad35d..bf920c3540c3b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -1705,6 +1705,7 @@ public void testLocalShardIteratorFromPointInTime() { if (context.getSearchContextId().getSearcherId() == null) { assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); } else { + // TODO this branch seems never executed by this test. Needs investigation. final List targetNodes = clusterState.routingTable(project) .index(indexMetadata.getIndex()) .shard(id) diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 07ffb3ab9a4eb..af43553c5b9ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -9,6 +9,8 @@ package org.elasticsearch.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; @@ -22,6 +24,7 @@ import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.telemetry.tracing.Tracer; @@ -29,6 +32,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -40,6 +44,8 @@ public class MockSearchService extends SearchService { */ public static class TestPlugin extends Plugin {} + private static final Logger logger = LogManager.getLogger(MockSearchService.class); + private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); private Consumer onPutContext = context -> {}; @@ -66,7 +72,7 @@ public static void assertNoInFlightContext() { * Add an active search context to the list of tracked contexts. Package private for testing. */ static void addActiveContext(ReaderContext context) { - ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(context.toString())); + ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(String.format(Locale.ROOT, "%s : %s", context.toString(), context.id()))); } /** @@ -110,7 +116,7 @@ protected void putReaderContext(ReaderContext context) { } @Override - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ShardSearchContextId id) { final ReaderContext removed = super.removeReaderContext(id); if (removed != null) { onRemoveContext.accept(removed); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index ef7fd2c6b065d..e9486afd44988 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -6,13 +6,19 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.http.util.EntityUtils; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.ClosePointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -22,21 +28,42 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.MockSearchService; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.junit.annotations.TestIssueLogging; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.hamcrest.Matchers.equalTo; +@TestIssueLogging( + issueUrl = "https://github.com/elastic/elasticsearch/issues/129445", + value = "org.elasticsearch.action.search:DEBUG," + "org.elasticsearch.search:TRACE" +) public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(MockSearchService.TestPlugin.class); + return plugins; + } + + protected boolean addMockHttpTransport() { + return false; + } + public void testSearcherId() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final int numberOfShards = between(1, 5); @@ -109,14 +136,16 @@ public void testSearcherId() throws Exception { public void testRetryPointInTime() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + int numberOfShards = between(1, 5); assertAcked( indicesAdmin().prepareCreate(indexName) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build()) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards).build()) .setMapping(""" {"properties":{"created_date":{"type": "date", "format": "yyyy-MM-dd"}}}""") ); final List indexRequestBuilders = new ArrayList<>(); final int docCount = between(0, 100); + logger.info("---> number of docs: " + docCount); for (int i = 0; i < docCount; i++) { indexRequestBuilders.add(prepareIndex(indexName).setSource("created_date", "2011-02-02")); } @@ -136,6 +165,7 @@ public void testRetryPointInTime() throws Exception { assertAcked(indicesAdmin().prepareDelete(indexName)); final int numberOfReplicas = between(0, 2); + logger.info("---> number of replicas: " + numberOfReplicas); final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); @@ -146,6 +176,21 @@ public void testRetryPointInTime() throws Exception { IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED ).keepAlive(TimeValue.timeValueMinutes(2)); final BytesReference pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId(); + assertEquals(numberOfShards, SearchContextId.decode(writableRegistry(), pitId).shards().size()); + logger.info( + "---> Original PIT id: " + + new PointInTimeBuilder(pitId).getSearchContextId(this.writableRegistry()).toString().replace("},", "\n") + ); + // for debugging, we try to see where the documents are located + try (RestClient restClient = createRestClient()) { + Request checkShardsRequest = new Request( + "GET", + "/_cat/shards/" + indexName + "?format=json&h=index,node,shard,prirep,state,docs,index" + ); + Response response = restClient.performRequest(checkShardsRequest); + logger.info("---> document distribution: " + EntityUtils.toString(response.getEntity())); + } + SetOnce updatedPit = new SetOnce<>(); try { assertNoFailuresAndResponse(prepareSearch().setPointInTime(new PointInTimeBuilder(pitId)), resp -> { assertThat(resp.pointInTimeId(), equalTo(pitId)); @@ -157,19 +202,45 @@ public void testRetryPointInTime() throws Exception { ensureGreen(indexName); } ensureGreen(indexName); + + // we run a search after the restart to ensure that all shards from the PIT have re-created their search contexts + assertNoFailuresAndResponse( + prepareSearch().setSearchType(SearchType.QUERY_THEN_FETCH) + .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here + .setPointInTime(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueMinutes(2))), + resp -> { + assertHitCount(resp, docCount); + updatedPit.set(resp.pointInTimeId()); + } + ); + logger.info("--> first search after node restart finished"); + + // At this point we should have re-created all contexts, running a second search + // should not re-trigger creation of new contexts. Lets check this. + final AtomicLong newContexts = new AtomicLong(0); + for (String allocatedNode : allocatedNodes) { + MockSearchService searchService = (MockSearchService) internalCluster().getInstance(SearchService.class, allocatedNode); + searchService.setOnPutContext(context -> { newContexts.incrementAndGet(); }); + } + assertNoFailuresAndResponse( prepareSearch().setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) .setSearchType(SearchType.QUERY_THEN_FETCH) .setPreFilterShardSize(between(1, 10)) - .setAllowPartialSearchResults(true) - .setPointInTime(new PointInTimeBuilder(pitId)), + .setAllowPartialSearchResults(randomBoolean()) // partial results should not matter here + .setPointInTime(new PointInTimeBuilder(updatedPit.get()).setKeepAlive(TimeValue.timeValueMinutes(2))), resp -> { - assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertThat(resp.pointInTimeId(), equalTo(updatedPit.get())); assertHitCount(resp, docCount); } ); + logger.info("--> second search after node restart finished"); + assertThat("Search should not create new contexts", newContexts.get(), equalTo(0L)); + } catch (Exception e) { + logger.error("---> unexpected exception", e); + throw e; } finally { - client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); + client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(updatedPit.get())).actionGet(); } } }