diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 95936f3ee9caf..481d3a4d5b7dd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -109,17 +110,18 @@ protected void masterOperation( } Set nodeIds = new HashSet<>(); - List groupShardsIterator = clusterService.operationRouting() + List groupShardRouting = clusterService.operationRouting() .searchShards(project, concreteIndices, routingMap, request.preference()); ShardRouting shard; - ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()]; + ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardRouting.size()]; int currentGroup = 0; - for (ShardIterator shardIt : groupShardsIterator) { - ShardId shardId = shardIt.shardId(); - ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()]; + for (SearchShardRouting shardRouting : groupShardRouting) { + ShardIterator shardIterator = shardRouting.iterator(); + ShardId shardId = shardIterator.shardId(); + ShardRouting[] shardRoutings = new ShardRouting[shardIterator.size()]; int currentShard = 0; - shardIt.reset(); - while ((shard = shardIt.nextOrNull()) != null) { + shardIterator.reset(); + while ((shard = shardIterator.nextOrNull()) != null) { shardRoutings[currentShard++] = shard; nodeIds.add(shard.currentNodeId()); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java index d175379efb736..40df1faf312b4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -220,15 +221,17 @@ protected AnalyzeIndexDiskUsageResponse newResponse( @Override protected List shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) { ProjectState project = projectResolver.getProjectState(clusterState); - final List groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null); + final List groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null); - for (ShardIterator group : groups) { + var shardIterators = new ArrayList(groups.size()); + for (SearchShardRouting group : groups) { // fails fast if any non-active groups - if (group.size() == 0) { - throw new NoShardAvailableActionException(group.shardId()); + if (group.iterator().size() == 0) { + throw new NoShardAvailableActionException(group.iterator().shardId()); } + shardIterators.add(group.iterator()); } - return groups; + return shardIterators; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java index afe48c092af09..8d1e6db26daca 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -180,7 +181,11 @@ protected List shards(ClusterState clusterState, ValidateQueryReq routing, request.indices() ); - return clusterService.operationRouting().searchShards(project, concreteIndices, routingMap, "_local"); + return clusterService.operationRouting() + .searchShards(project, concreteIndices, routingMap, "_local") + .stream() + .map(SearchShardRouting::iterator) + .toList(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index 3ad45526be217..172f2c34713a3 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -105,16 +106,16 @@ public RequestDispatcher( ProjectState project = projectResolver.getProjectState(clusterState); for (String index : indices) { - final List shardIts; + final List searchShards; try { - shardIts = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null); + searchShards = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null); } catch (Exception e) { onIndexFailure.accept(index, e); continue; } final IndexSelector indexResult = new IndexSelector( fieldCapsRequest.clusterAlias(), - shardIts, + searchShards, fieldCapsRequest.indexFilter(), nowInMillis, coordinatorRewriteContextProvider @@ -270,14 +271,15 @@ private static class IndexSelector { IndexSelector( String clusterAlias, - List shardIts, + List searchShards, QueryBuilder indexFilter, long nowInMillis, CoordinatorRewriteContextProvider coordinatorRewriteContextProvider ) { - for (ShardIterator shardIt : shardIts) { + for (SearchShardRouting routing : searchShards) { boolean canMatch = true; - final ShardId shardId = shardIt.shardId(); + ShardIterator iterator = routing.iterator(); + final ShardId shardId = iterator.shardId(); if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) { var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex()); if (coordinatorRewriteContext != null) { @@ -291,7 +293,7 @@ private static class IndexSelector { } } if (canMatch) { - for (ShardRouting shard : shardIt) { + for (ShardRouting shard : iterator) { nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); } } else { diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..f3e4151356881 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -710,7 +710,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s timeProvider.absoluteStartMillis(), shardIt.getClusterAlias(), shardIt.getSearchContextId(), - shardIt.getSearchContextKeepAlive() + shardIt.getSearchContextKeepAlive(), + shardIt.getReshardSplitShardCountSummary() ); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java index eaba7b9c762c6..2aa3c84ea3184 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -65,6 +66,7 @@ public static class Shard implements Writeable { private final ShardSearchContextId readerId; private final TimeValue keepAlive; private final long waitForCheckpoint; + private final SplitShardCountSummary reshardSplitShardCountSummary; public Shard( String[] indices, @@ -74,7 +76,8 @@ public Shard( float indexBoost, ShardSearchContextId readerId, TimeValue keepAlive, - long waitForCheckpoint + long waitForCheckpoint, + SplitShardCountSummary reshardSplitShardCountSummary ) { this.indices = indices; this.shardId = shardId; @@ -85,6 +88,7 @@ public Shard( this.keepAlive = keepAlive; this.waitForCheckpoint = waitForCheckpoint; assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } public Shard(StreamInput in) throws IOException { @@ -97,6 +101,11 @@ public Shard(StreamInput in) throws IOException { keepAlive = in.readOptionalTimeValue(); waitForCheckpoint = in.readLong(); assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; + if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { + reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + } else { + reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + } } @Override @@ -109,6 +118,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(readerId); out.writeOptionalTimeValue(keepAlive); out.writeLong(waitForCheckpoint); + if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { + out.writeVInt(reshardSplitShardCountSummary.asInt()); + } } public int getShardRequestIndex() { @@ -234,7 +246,8 @@ public ShardSearchRequest createShardSearchRequest(Shard r) { r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout, - false + false, + r.reshardSplitShardCountSummary ); shardSearchRequest.setParentTask(getParentTask()); return shardSearchRequest; diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index e42f8127c5e97..b5faf88dff125 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -425,7 +425,8 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha indexBoost, shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), - ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex) + ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex), + shardIt.getReshardSplitShardCountSummary() ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 1a7c0f26f4b21..c212be715e050 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -354,9 +355,14 @@ public IndicesOptions indicesOptions() { } } - private record ShardToQuery(float boost, String[] originalIndices, int shardIndex, ShardId shardId, ShardSearchContextId contextId) - implements - Writeable { + private record ShardToQuery( + float boost, + String[] originalIndices, + int shardIndex, + ShardId shardId, + ShardSearchContextId contextId, + SplitShardCountSummary reshardSplitShardCountSummary + ) implements Writeable { static ShardToQuery readFrom(StreamInput in) throws IOException { return new ShardToQuery( @@ -364,7 +370,10 @@ static ShardToQuery readFrom(StreamInput in) throws IOException { in.readStringArray(), in.readVInt(), new ShardId(in), - in.readOptionalWriteable(ShardSearchContextId::new) + in.readOptionalWriteable(ShardSearchContextId::new), + in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY) + ? SplitShardCountSummary.fromInt(in.readVInt()) + : SplitShardCountSummary.UNSET ); } @@ -375,6 +384,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(shardIndex); shardId.writeTo(out); out.writeOptionalWriteable(contextId); + if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { + out.writeVInt(reshardSplitShardCountSummary.asInt()); + } } } @@ -454,7 +466,8 @@ protected void doRun(Map shardIndexMap) { getOriginalIndices(shardIndex).indices(), shardIndex, routing.getShardId(), - shardRoutings.getSearchContextId() + shardRoutings.getSearchContextId(), + shardRoutings.getReshardSplitShardCountSummary() ) ); var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY); @@ -651,7 +664,8 @@ private static ShardSearchRequest buildShardSearchRequest( SearchRequest searchRequest, int totalShardCount, long absoluteStartMillis, - boolean hasResponse + boolean hasResponse, + SplitShardCountSummary reshardSplitShardCountSummary ) { ShardSearchRequest shardRequest = new ShardSearchRequest( originalIndices, @@ -664,7 +678,8 @@ private static ShardSearchRequest buildShardSearchRequest( absoluteStartMillis, clusterAlias, searchContextId, - searchContextKeepAlive + searchContextKeepAlive, + reshardSplitShardCountSummary ); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather @@ -702,7 +717,8 @@ private static void executeShardTasks(QueryPerNodeState state) { searchRequest, nodeQueryRequest.totalShards, nodeQueryRequest.absoluteStartMillis, - state.hasResponse.getAcquire() + state.hasResponse.getAcquire(), + shardToQuery.reshardSplitShardCountSummary ) ), state.task, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 00ff8f33f5659..9eefd5b87c3db 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.util.PlainIterator; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -38,31 +39,52 @@ public final class SearchShardIterator implements Comparable targetNodesIterator; + /** + * Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}. + */ + private final SplitShardCountSummary reshardSplitShardCountSummary; /** - * Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards - * this the a given shardId. + * Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided shardId. * * @param clusterAlias the alias of the cluster where the shard is located * @param shardId shard id of the group * @param shards shards to iterate * @param originalIndices the indices that the search request originally related to (before any rewriting happened) */ - public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { - this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, false); + public SearchShardIterator( + @Nullable String clusterAlias, + ShardId shardId, + List shards, + OriginalIndices originalIndices, + SplitShardCountSummary reshardSplitShardCountSummary + ) { + this( + clusterAlias, + shardId, + shards.stream().map(ShardRouting::currentNodeId).toList(), + originalIndices, + null, + null, + false, + false, + reshardSplitShardCountSummary + ); } /** - * Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards + * Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard + * with provided shardId. * - * @param clusterAlias the alias of the cluster where the shard is located - * @param shardId shard id of the group - * @param targetNodeIds the list of nodes hosting shard copies - * @param originalIndices the indices that the search request originally related to (before any rewriting happened) - * @param searchContextId the point-in-time specified for this group if exists - * @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time - * @param prefiltered if true, then this group already executed the can_match phase - * @param skip if true, then this group won't have matches, and it can be safely skipped from the search + * @param clusterAlias the alias of the cluster where the shard is located + * @param shardId shard id of the group + * @param targetNodeIds the list of nodes hosting shard copies + * @param originalIndices the indices that the search request originally related to (before any rewriting happened) + * @param searchContextId the point-in-time specified for this group if exists + * @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time + * @param prefiltered if true, then this group already executed the can_match phase + * @param skip if true, then this group won't have matches, and it can be safely skipped from the search + * @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary} */ public SearchShardIterator( @Nullable String clusterAlias, @@ -72,7 +94,8 @@ public SearchShardIterator( ShardSearchContextId searchContextId, TimeValue searchContextKeepAlive, boolean prefiltered, - boolean skip + boolean skip, + SplitShardCountSummary reshardSplitShardCountSummary ) { this.shardId = shardId; this.targetNodesIterator = new PlainIterator<>(targetNodeIds); @@ -84,6 +107,7 @@ public SearchShardIterator( this.prefiltered = prefiltered; this.skip = skip; assert skip == false || prefiltered : "only prefiltered shards are skip-able"; + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } /** @@ -171,6 +195,10 @@ ShardId shardId() { return shardId; } + public SplitShardCountSummary getReshardSplitShardCountSummary() { + return reshardSplitShardCountSummary; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 16389f6137f81..6b5b682d152d5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -46,8 +46,10 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -1247,7 +1249,10 @@ static List getRemoteShardsIterator( null, null, searchShardsGroup.preFiltered(), - searchShardsGroup.skipped() + searchShardsGroup.skipped(), + // This parameter is specific to the resharding feature. + // Resharding is currently not supported with CCS. + SplitShardCountSummary.UNSET ); remoteShardIterators.add(shardIterator); } @@ -1300,7 +1305,10 @@ static List getRemoteShardsIteratorFromPointInTime( perNode.getSearchContextId(), searchContextKeepAlive, false, - false + false, + // This parameter is specific to the resharding feature. + // Resharding is currently not supported with CCS. + SplitShardCountSummary.UNSET ); remoteShardIterators.add(shardIterator); } @@ -1982,7 +1990,16 @@ static List getLocalShardsIteratorFromPointInTime( perNode.getSearchContextId(), keepAlive, false, - false + false, + // This parameter is specific to the resharding feature. + // It is used when creating a searcher to apply filtering needed to have correct search results + // while resharding is in progress. + // In context of PIT the searcher is reused or can be recreated only in read-only scenarios. + // If a searcher is reused, this value won't be used + // (it was calculated and used when PIT was created). + // In read-only scenarios (e.g. searchable snapshots) we don't expect resharding to happen + // so the value doesn't matter. + SplitShardCountSummary.UNSET ) ); } @@ -2007,7 +2024,7 @@ List getLocalShardsIterator( searchRequest.routing(), searchRequest.indices() ); - List shardRoutings = clusterService.operationRouting() + List searchShards = clusterService.operationRouting() .searchShards( projectState, concreteIndices, @@ -2022,13 +2039,20 @@ List getLocalShardsIterator( concreteIndices, searchRequest.indicesOptions() ); - SearchShardIterator[] list = new SearchShardIterator[shardRoutings.size()]; + SearchShardIterator[] list = new SearchShardIterator[searchShards.size()]; int i = 0; - for (ShardIterator shardRouting : shardRoutings) { - final ShardId shardId = shardRouting.shardId(); + for (SearchShardRouting shardInfo : searchShards) { + ShardIterator iterator = shardInfo.iterator(); + final ShardId shardId = iterator.shardId(); OriginalIndices finalIndices = originalIndices.get(shardId.getIndex().getName()); assert finalIndices != null; - list[i++] = new SearchShardIterator(clusterAlias, shardId, shardRouting.getShardRoutings(), finalIndices); + list[i++] = new SearchShardIterator( + clusterAlias, + shardId, + iterator.getShardRoutings(), + finalIndices, + shardInfo.reshardSplitShardCountSummary() + ); } // the returned list must support in-place sorting, so this is the most memory efficient we can do here return Arrays.asList(list); diff --git a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index 299458ca728c0..d00f04a5d0060 100644 --- a/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/server/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -76,7 +76,8 @@ protected ShardIterator shards(ProjectState project, InternalRequest request) { if (request.request().doc() != null && request.request().routing() == null) { // artificial document without routing specified, ignore its "id" and use either random shard or according to preference return operationRouting.searchShards(project, new String[] { request.concreteIndex() }, null, request.request().preference()) - .getFirst(); + .getFirst() + .iterator(); } ShardIterator iterator = clusterService.operationRouting() diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 39fc76de9f629..66a7b9b161101 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -78,7 +78,7 @@ public ShardIterator getShards(ProjectState projectState, String index, int shar return preferenceActiveShardIterator(indexShard, nodes.getLocalNodeId(), nodes, preference, null, null); } - public List searchShards( + public List searchShards( ProjectState projectState, String[] concreteIndices, @Nullable Map> routing, @@ -87,7 +87,7 @@ public List searchShards( return searchShards(projectState, concreteIndices, routing, preference, null, null); } - public List searchShards( + public List searchShards( ProjectState projectState, String[] concreteIndices, @Nullable Map> routing, @@ -95,12 +95,12 @@ public List searchShards( @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts ) { - final Set shards = computeTargetedShards(projectState, concreteIndices, routing); + final Set shards = computeTargetedShards(projectState, concreteIndices, routing); DiscoveryNodes nodes = projectState.cluster().nodes(); - List res = new ArrayList<>(shards.size()); - for (IndexShardRoutingTable shard : shards) { + var res = new ArrayList(shards.size()); + for (SearchTargetShard targetShard : shards) { ShardIterator iterator = preferenceActiveShardIterator( - shard, + targetShard.shardRoutingTable(), nodes.getLocalNodeId(), nodes, preference, @@ -108,10 +108,10 @@ public List searchShards( nodeCounts ); if (iterator != null) { - res.add(ShardIterator.allSearchableShards(iterator)); + res.add(new SearchShardRouting(ShardIterator.allSearchableShards(iterator), targetShard.reshardSplitShardCountSummary())); } } - res.sort(ShardIterator::compareTo); + res.sort(SearchShardRouting::compareTo); return res; } @@ -124,59 +124,87 @@ public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId return shard.activeInitializingShardsRandomIt(); } - private static Set computeTargetedShards( + private record SearchTargetShard(IndexShardRoutingTable shardRoutingTable, SplitShardCountSummary reshardSplitShardCountSummary) {} + + private static Set computeTargetedShards( ProjectState projectState, String[] concreteIndices, @Nullable Map> routing ) { // we use set here and not list since we might get duplicates - final Set set = new HashSet<>(); if (routing == null || routing.isEmpty()) { - collectTargetShardsNoRouting(projectState, concreteIndices, set); - } else { - collectTargetShardsWithRouting(projectState, concreteIndices, routing, set); + return collectTargetShardsNoRouting(projectState, concreteIndices); } - return set; + return collectTargetShardsWithRouting(projectState, concreteIndices, routing); } - private static void collectTargetShardsWithRouting( + private static Set collectTargetShardsWithRouting( ProjectState projectState, String[] concreteIndices, - Map> routing, - Set set + Map> routing ) { + var result = new HashSet(); for (String index : concreteIndices) { final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index); final Set indexSearchRouting = routing.get(index); if (indexSearchRouting != null) { - IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata(projectState.metadata(), index)); + IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index); + IndexRouting indexRouting = IndexRouting.fromIndexMetadata(indexMetadata); for (String r : indexSearchRouting) { - indexRouting.collectSearchShards(r, s -> set.add(RoutingTable.shardRoutingTable(indexRoutingTable, s))); + indexRouting.collectSearchShards( + r, + shardId -> result.add( + new SearchTargetShard( + RoutingTable.shardRoutingTable(indexRoutingTable, shardId), + SplitShardCountSummary.forSearch(indexMetadata, shardId) + ) + ) + ); } } else { - Iterator iterator = allSearchAddressableShards(projectState, index); - while (iterator.hasNext()) { - set.add(iterator.next()); - } + Iterator iterator = allSearchAddressableShards(projectState, index); + iterator.forEachRemaining(result::add); } } + return result; } - private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set set) { + private static Set collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices) { + var result = new HashSet(); for (String index : concreteIndices) { - Iterator iterator = allSearchAddressableShards(projectState, index); - while (iterator.hasNext()) { - set.add(iterator.next()); - } + Iterator iterator = allSearchAddressableShards(projectState, index); + iterator.forEachRemaining(result::add); } + return result; } /** * Returns an iterator of shards that can possibly serve searches. A shard may not be addressable during processes like resharding. * This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned. */ - private static Iterator allSearchAddressableShards(ProjectState projectState, String index) { - return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT); + private static Iterator allSearchAddressableShards(ProjectState projectState, String index) { + final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index); + final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index); + if (indexMetadata.getReshardingMetadata() == null) { + return indexRoutingTable.allShards() + .map(srt -> new SearchTargetShard(srt, SplitShardCountSummary.forSearch(indexMetadata, srt.shardId.id()))) + .iterator(); + } + + final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata(); + assert indexReshardingMetadata.isSplit(); + final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit(); + + var shards = new ArrayList(); + for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { + if (splitState.isTargetShard(shardId) == false + || splitState.targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.SPLIT)) { + shards.add( + new SearchTargetShard(indexRoutingTable.shard(shardId), SplitShardCountSummary.forSearch(indexMetadata, shardId)) + ); + } + } + return shards.iterator(); } /** @@ -184,17 +212,6 @@ private static Iterator allSearchAddressableShards(Proje * This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned. */ private static Iterator allWriteAddressableShards(ProjectState projectState, String index) { - return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF); - } - - /** - * Filters shards based on their state in resharding metadata. If resharing metadata is not present returns all shards. - */ - private static Iterator allShardsExceptSplitTargetsInStateBefore( - ProjectState projectState, - String index, - IndexReshardingState.Split.TargetShardState targetShardState - ) { final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index); final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index); if (indexMetadata.getReshardingMetadata() == null) { @@ -207,7 +224,8 @@ private static Iterator allShardsExceptSplitTargetsInSta var shards = new ArrayList(); for (int i = 0; i < indexRoutingTable.size(); i++) { - if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) { + if (splitState.isTargetShard(i) == false + || splitState.targetStateAtLeast(i, IndexReshardingState.Split.TargetShardState.HANDOFF)) { shards.add(indexRoutingTable.shard(i)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SearchShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/SearchShardRouting.java new file mode 100644 index 0000000000000..3cc533b7542df --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SearchShardRouting.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +public record SearchShardRouting(ShardIterator iterator, SplitShardCountSummary reshardSplitShardCountSummary) + implements + Comparable { + @Override + public int compareTo(SearchShardRouting o) { + return iterator.compareTo(o.iterator); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java index 70596cd3b926f..8b7b143829bbb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java @@ -47,6 +47,21 @@ * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards * are bundled together with and sent to their source shards. * + * Example 3: + * Suppose we are resharding an index from 4 -> 8 shards. While handling a search request, the coordinator observes + * that target shard 5 is in SPLIT state but target shards 4, 6, 7 are in CLONE/HANDOFF state. + * The coordinator will send shard search requests to all source shards (0, 1, 2, 3) and to all target shards + * that are at least in SPLIT state (5). + * Shard search request sent to source shards 0, 2, 3 has the "reshardSplitShardCountSummary" of 4 + * since corresponding target shards (4, 6, 7) have not advanced to SPLIT state. + * Shard search request sent to source shard 1 has the "reshardSplitShardCountSummary" of 8 + * since the corresponding target shard 5 is in SPLIT state. + * When a shard search request is executed on the source shard 1, "reshardSplitShardCountSummary" value + * is checked and documents that will be returned by target shard 5 are excluded + * (they are still present in the source shard because the resharding process is not complete). + * All other source shard search requests (0, 2, 3) return all available documents since corresponding target shards + * are not yet available to do that. + * * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary * will be treated as a Summary mismatch on the source shard node. */ diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 10d2fb0e23b3b..e10e95e955c6f 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; @@ -101,6 +102,16 @@ public class ShardSearchRequest extends AbstractTransportRequest implements Indi */ private final boolean forceSyntheticSource; + /** + * Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}. + */ + private final SplitShardCountSummary reshardSplitShardCountSummary; + + public static final TransportVersion SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY = TransportVersion.fromName( + "shard_search_request_reshard_shard_count_summary" + ); + + // Test only constructor. public ShardSearchRequest( OriginalIndices originalIndices, SearchRequest searchRequest, @@ -123,7 +134,8 @@ public ShardSearchRequest( nowInMillis, clusterAlias, null, - null + null, + SplitShardCountSummary.UNSET ); } @@ -138,7 +150,8 @@ public ShardSearchRequest( long nowInMillis, @Nullable String clusterAlias, ShardSearchContextId readerId, - TimeValue keepAlive + TimeValue keepAlive, + SplitShardCountSummary reshardSplitShardCountSummary ) { this( originalIndices, @@ -158,7 +171,8 @@ public ShardSearchRequest( keepAlive, computeWaitForCheckpoint(searchRequest.getWaitForCheckpoints(), shardId, shardRequestIndex), searchRequest.getWaitForCheckpointsTimeout(), - searchRequest.isForceSyntheticSource() + searchRequest.isForceSyntheticSource(), + reshardSplitShardCountSummary ); // If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted // at this stage. Any NPEs in the above are therefore an error in request preparation logic. @@ -180,10 +194,12 @@ public static long computeWaitForCheckpoint(Map indexToWaitForCh return waitForCheckpoint; } + // Used by ValidateQueryAction, ExplainAction, FieldCaps, TermsEnumAction, lookup join in ESQL public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter) { this(shardId, nowInMillis, aliasFilter, null); } + // Used by ESQL and field_caps API public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter, String clusterAlias) { this( OriginalIndices.NONE, @@ -203,7 +219,11 @@ public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFi null, SequenceNumbers.UNASSIGNED_SEQ_NO, SearchService.NO_TIMEOUT, - false + false, + // This parameter is specific to the resharding feature. + // TODO + // It is currently only supported in _search API and is stubbed here as a result. + SplitShardCountSummary.UNSET ); } @@ -226,7 +246,8 @@ public ShardSearchRequest( TimeValue keepAlive, long waitForCheckpoint, TimeValue waitForCheckpointsTimeout, - boolean forceSyntheticSource + boolean forceSyntheticSource, + SplitShardCountSummary reshardSplitShardCountSummary ) { this.shardId = shardId; this.shardRequestIndex = shardRequestIndex; @@ -248,6 +269,7 @@ public ShardSearchRequest( this.waitForCheckpoint = waitForCheckpoint; this.waitForCheckpointsTimeout = waitForCheckpointsTimeout; this.forceSyntheticSource = forceSyntheticSource; + this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } @SuppressWarnings("this-escape") @@ -273,6 +295,7 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.waitForCheckpoint = clone.waitForCheckpoint; this.waitForCheckpointsTimeout = clone.waitForCheckpointsTimeout; this.forceSyntheticSource = clone.forceSyntheticSource; + this.reshardSplitShardCountSummary = clone.reshardSplitShardCountSummary; } public ShardSearchRequest(StreamInput in) throws IOException { @@ -339,6 +362,12 @@ public ShardSearchRequest(StreamInput in) throws IOException { */ forceSyntheticSource = false; } + if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { + reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt()); + } else { + reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + } + originalIndices = OriginalIndices.readOriginalIndices(in); } @@ -399,6 +428,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce throw new IllegalArgumentException("force_synthetic_source is not supported before 8.4.0"); } } + if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) { + out.writeVInt(reshardSplitShardCountSummary.asInt()); + } } @Override diff --git a/server/src/main/resources/transport/definitions/referable/shard_search_request_reshard_shard_count_summary.csv b/server/src/main/resources/transport/definitions/referable/shard_search_request_reshard_shard_count_summary.csv new file mode 100644 index 0000000000000..ce3efd6a04801 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/shard_search_request_reshard_shard_count_summary.csv @@ -0,0 +1 @@ +9190000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index b947ec1f1d1ce..d4b5932a5c401 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -esql_resolve_fields_response_created,9189000 +shard_search_request_reshard_shard_count_summary,9190000 diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..34b0d6ba4cde6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.UUIDs; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; @@ -82,7 +83,9 @@ private AbstractSearchAsyncAction createAction( null, request, listener, - Collections.singletonList(new SearchShardIterator(null, new ShardId("index", "_na", 0), Collections.emptyList(), null)), + Collections.singletonList( + new SearchShardIterator(null, new ShardId("index", "_na", 0), Collections.emptyList(), null, SplitShardCountSummary.UNSET) + ), timeProvider, ClusterState.EMPTY_STATE, null, @@ -153,7 +156,8 @@ public void testBuildShardSearchTransportRequest() { clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), - new OriginalIndices(new String[] { "name", "name1" }, IndicesOptions.strictExpand()) + new OriginalIndices(new String[] { "name", "name1" }, IndicesOptions.strictExpand()), + SplitShardCountSummary.UNSET ); ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator, 10); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index afd3bee4c4ab8..4506c9f094d3c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.UUIDs; import org.elasticsearch.index.Index; @@ -640,7 +641,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { null, new ShardId(index, 0), Collections.emptyList(), - originalIndices + originalIndices, + SplitShardCountSummary.UNSET ); // Skip all the shards searchShardIterator.skip(true); @@ -760,7 +762,7 @@ static List getShardsIter( } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices)); + list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices, SplitShardCountSummary.UNSET)); } return list; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java index 79736427f634d..9fe31aeb8c5d6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.OriginalIndicesTests; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchShardTarget; @@ -45,7 +46,13 @@ private static List randomShardRoutings(ShardId shardId, int numRe public void testShardId() { ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); - SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + SearchShardIterator searchShardIterator = new SearchShardIterator( + null, + shardId, + Collections.emptyList(), + OriginalIndices.NONE, + SplitShardCountSummary.UNSET + ); assertSame(shardId, searchShardIterator.shardId()); } @@ -55,7 +62,13 @@ public void testGetOriginalIndices() { new String[] { randomAlphaOfLengthBetween(3, 10) }, IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) ); - SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), originalIndices); + SearchShardIterator searchShardIterator = new SearchShardIterator( + null, + shardId, + Collections.emptyList(), + originalIndices, + SplitShardCountSummary.UNSET + ); assertSame(originalIndices, searchShardIterator.getOriginalIndices()); } @@ -66,7 +79,8 @@ public void testGetClusterAlias() { clusterAlias, shardId, Collections.emptyList(), - OriginalIndices.NONE + OriginalIndices.NONE, + SplitShardCountSummary.UNSET ); assertEquals(clusterAlias, searchShardIterator.getClusterAlias()); } @@ -88,7 +102,8 @@ public void testNewSearchShardTarget() { null, null, false, - false + false, + SplitShardCountSummary.UNSET ); final SearchShardTarget searchShardTarget = searchShardIterator.nextOrNull(); assertNotNull(searchShardTarget); @@ -109,7 +124,8 @@ public void testEqualsAndHashcode() { s.getSearchContextId(), s.getSearchContextKeepAlive(), s.prefiltered(), - s.skip() + s.skip(), + s.getReshardSplitShardCountSummary() ), s -> { if (randomBoolean()) { @@ -127,7 +143,8 @@ public void testEqualsAndHashcode() { s.getSearchContextId(), s.getSearchContextKeepAlive(), s.prefiltered(), - s.skip() + s.skip(), + s.getReshardSplitShardCountSummary() ); } else { ShardId shardId = new ShardId( @@ -143,7 +160,8 @@ public void testEqualsAndHashcode() { s.getSearchContextId(), s.getSearchContextKeepAlive(), s.prefiltered(), - s.skip() + s.skip(), + s.getReshardSplitShardCountSummary() ); } } @@ -164,7 +182,13 @@ public void testCompareTo() { for (String uuid : uuids) { ShardId shardId = new ShardId(index, uuid, i); shardIterators.add( - new SearchShardIterator(null, shardId, randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices()) + new SearchShardIterator( + null, + shardId, + randomShardRoutings(shardId), + OriginalIndicesTests.randomOriginalIndices(), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) + ) ); for (String cluster : clusters) { shardIterators.add( @@ -172,7 +196,8 @@ public void testCompareTo() { cluster, shardId, randomShardRoutings(shardId), - OriginalIndicesTests.randomOriginalIndices() + OriginalIndicesTests.randomOriginalIndices(), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) ) ); } @@ -207,7 +232,8 @@ public void testCompareToEqualItems() { shardIterator1.getSearchContextId(), shardIterator1.getSearchContextKeepAlive(), shardIterator1.prefiltered(), - shardIterator1.skip() + shardIterator1.skip(), + shardIterator1.getReshardSplitShardCountSummary() ); assertEquals(shardIterator1, shardIterator2); assertEquals(0, shardIterator1.compareTo(shardIterator2)); @@ -217,6 +243,12 @@ public void testCompareToEqualItems() { private static SearchShardIterator randomSearchShardIterator() { String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomIntBetween(0, Integer.MAX_VALUE)); - return new SearchShardIterator(clusterAlias, shardId, randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices()); + return new SearchShardIterator( + clusterAlias, + shardId, + randomShardRoutings(shardId), + OriginalIndicesTests.randomOriginalIndices(), + SplitShardCountSummary.fromInt(randomIntBetween(0, 1024)) + ); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 61aa05f703018..a99b724e06936 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -153,7 +154,7 @@ private static SearchShardIterator createSearchShardIterator( ) { ShardId shardId = new ShardId(index, id); List shardRoutings = SearchShardIteratorTests.randomShardRoutings(shardId); - return new SearchShardIterator(clusterAlias, shardId, shardRoutings, originalIndices); + return new SearchShardIterator(clusterAlias, shardId, shardRoutings, originalIndices, SplitShardCountSummary.UNSET); } private static ResolvedIndices createMockResolvedIndices( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 679fd3f0e9fb2..4b55136e4e561 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -180,13 +180,18 @@ public void testFairSessionIdPreferences() throws InterruptedException, IOExcept for (int i = 0; i < numRepeatedSearches; i++) { List searchedShards = new ArrayList<>(numShards); Set selectedNodes = Sets.newHashSetWithExpectedSize(numShards); - final List groupIterator = opRouting.searchShards(state.projectState(projectId), indexNames, null, sessionKey); + final List groupIterator = opRouting.searchShards( + state.projectState(projectId), + indexNames, + null, + sessionKey + ); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); - for (ShardIterator shardIterator : groupIterator) { - assertThat(shardIterator.size(), equalTo(numReplicas + 1)); + for (SearchShardRouting routing : groupIterator) { + assertThat(routing.iterator().size(), equalTo(numReplicas + 1)); - ShardRouting firstChoice = shardIterator.nextOrNull(); + ShardRouting firstChoice = routing.iterator().nextOrNull(); assertNotNull(firstChoice); ShardRouting duelFirst = duelGetShards(state, firstChoice.shardId(), sessionKey).nextOrNull(); assertThat("Regression test failure", duelFirst, equalTo(firstChoice)); @@ -307,27 +312,27 @@ public void testARSRanking() throws Exception { TestThreadPool threadPool = new TestThreadPool("test"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); - List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); + List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); // Test that the shards use a round-robin pattern when there are no stats - assertThat(groupIterator.get(0).size(), equalTo(numReplicas + 1)); - ShardRouting firstChoice = groupIterator.get(0).nextOrNull(); + assertThat(groupIterator.get(0).iterator().size(), equalTo(numReplicas + 1)); + ShardRouting firstChoice = groupIterator.get(0).iterator().nextOrNull(); assertNotNull(firstChoice); searchedShards.add(firstChoice); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); - ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); + ShardRouting secondChoice = groupIterator.get(0).iterator().nextOrNull(); assertNotNull(secondChoice); searchedShards.add(secondChoice); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); - ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); + ShardRouting thirdChoice = groupIterator.get(0).iterator().nextOrNull(); assertNotNull(thirdChoice); searchedShards.add(thirdChoice); @@ -340,26 +345,26 @@ public void testARSRanking() throws Exception { collector.addNodeStatistics("node_2", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(200).nanos()); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + ShardRouting shardChoice = groupIterator.get(0).iterator().nextOrNull(); // node 1 should be the lowest ranked node to start assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // node 1 starts getting more loaded... collector.addNodeStatistics("node_1", 1, TimeValue.timeValueMillis(200).nanos(), TimeValue.timeValueMillis(100).nanos()); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - shardChoice = groupIterator.get(0).nextOrNull(); + shardChoice = groupIterator.get(0).iterator().nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and more loaded... collector.addNodeStatistics("node_1", 2, TimeValue.timeValueMillis(220).nanos(), TimeValue.timeValueMillis(120).nanos()); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - shardChoice = groupIterator.get(0).nextOrNull(); + shardChoice = groupIterator.get(0).iterator().nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); // and even more collector.addNodeStatistics("node_1", 3, TimeValue.timeValueMillis(250).nanos(), TimeValue.timeValueMillis(150).nanos()); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - shardChoice = groupIterator.get(0).nextOrNull(); + shardChoice = groupIterator.get(0).iterator().nextOrNull(); // finally, node 0 is chosen instead assertThat(shardChoice.currentNodeId(), equalTo("node_0")); @@ -388,7 +393,7 @@ public void testARSStatsAdjustment() throws Exception { ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); - List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); + List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); // We have two nodes, where the second has more load @@ -398,7 +403,7 @@ public void testARSStatsAdjustment() throws Exception { // Check the first node is usually selected, if it's stats don't change much for (int i = 0; i < 10; i++) { groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + ShardRouting shardChoice = groupIterator.get(0).iterator().nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_0")); int responseTime = 50 + randomInt(5); @@ -414,7 +419,7 @@ public void testARSStatsAdjustment() throws Exception { // Check that we try the second when the first node slows down more collector.addNodeStatistics("node_0", 2, TimeValue.timeValueMillis(60).nanos(), TimeValue.timeValueMillis(50).nanos()); groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, new HashMap<>()); - ShardRouting shardChoice = groupIterator.get(0).nextOrNull(); + ShardRouting shardChoice = groupIterator.get(0).iterator().nextOrNull(); assertThat(shardChoice.currentNodeId(), equalTo("node_1")); IOUtils.close(clusterService); @@ -449,11 +454,11 @@ public void testARSOutstandingRequestTracking() throws Exception { Map outstandingRequests = new HashMap<>(); // Check that we choose to search over both nodes - List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, outstandingRequests); + List groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, outstandingRequests); Set nodeIds = new HashSet<>(); - nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); - nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(0).iterator().nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).iterator().nextOrNull().currentNodeId()); assertThat(nodeIds, equalTo(Set.of("node_0", "node_1"))); assertThat(outstandingRequests.get("node_0"), equalTo(1L)); assertThat(outstandingRequests.get("node_1"), equalTo(1L)); @@ -466,8 +471,8 @@ public void testARSOutstandingRequestTracking() throws Exception { groupIterator = opRouting.searchShards(project, indexNames, null, null, collector, outstandingRequests); nodeIds = new HashSet<>(); - nodeIds.add(groupIterator.get(0).nextOrNull().currentNodeId()); - nodeIds.add(groupIterator.get(1).nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(0).iterator().nextOrNull().currentNodeId()); + nodeIds.add(groupIterator.get(1).iterator().nextOrNull().currentNodeId()); assertThat(nodeIds, equalTo(Set.of("node_1"))); assertThat(outstandingRequests.get("node_1"), equalTo(2L)); @@ -481,14 +486,14 @@ public void testOperationRoutingWithResharding() throws IOException { final ProjectId projectId = randomProjectIdOrDefault(); final String indexName = "test"; - final int shardCount = 1; - final int newShardCount = randomIntBetween(2, 5); + final int shardCount = randomIntBetween(1, 4); + final int newShardCount = shardCount * 2; var indexMetadata = IndexMetadata.builder(indexName) .settings(indexSettings(IndexVersion.current(), newShardCount, 1)) .numberOfShards(newShardCount) .numberOfReplicas(1) - .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(shardCount, newShardCount)) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(shardCount, 2)) .build(); ClusterState.Builder initialStateBuilder = ClusterState.builder(new ClusterName("test")); @@ -517,23 +522,31 @@ public void testOperationRoutingWithResharding() throws IOException { var initialSearchShards = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); assertEquals(shardCount, initialSearchShards.size()); - assertEquals(0, initialSearchShards.get(0).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, initialSearchShards.get(i).iterator().shardId().id()); + assertEquals(SplitShardCountSummary.fromInt(shardCount), initialSearchShards.get(i).reshardSplitShardCountSummary()); + } // We are testing a case when there is routing configuration but not for the index in question. // Actual routing behavior is tested in IndexRoutingTests. var initialSearchShardsWithRouting = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); assertEquals(shardCount, initialSearchShardsWithRouting.size()); - assertEquals(0, initialSearchShardsWithRouting.get(0).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, initialSearchShardsWithRouting.get(i).iterator().shardId().id()); + assertEquals(SplitShardCountSummary.fromInt(shardCount), initialSearchShardsWithRouting.get(i).reshardSplitShardCountSummary()); + } var initialWriteableShards = clusterService.operationRouting() .allWritableShards(clusterService.state().projectState(projectId), indexName); - assertEquals(0, initialWriteableShards.next().shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, initialWriteableShards.next().shardId().id()); + } assertFalse(initialWriteableShards.hasNext()); final Index index = clusterService.state().metadata().getProject(projectId).index(indexName).getIndex(); - var shardChangingSplitTargetState = randomIntBetween(1, newShardCount - 1); + var shardChangingSplitTargetState = randomIntBetween(shardCount, newShardCount - 1); var currentIndexMetadata = clusterService.state().projectState(projectId).metadata().index(indexName); var updatedReshardingMetadataOneShardInHandoff = IndexMetadata.builder(currentIndexMetadata) @@ -556,16 +569,30 @@ public void testOperationRoutingWithResharding() throws IOException { var searchShardsWithOneShardHandoff = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); assertEquals(shardCount, searchShardsWithOneShardHandoff.size()); - assertEquals(0, searchShardsWithOneShardHandoff.get(0).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardHandoff.get(i).iterator().shardId().id()); + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardHandoff.get(i).reshardSplitShardCountSummary() + ); + } var searchShardsWithOneShardHandoffAndRouting = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); assertEquals(shardCount, searchShardsWithOneShardHandoffAndRouting.size()); - assertEquals(0, searchShardsWithOneShardHandoffAndRouting.get(0).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardHandoffAndRouting.get(i).iterator().shardId().id()); + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardHandoffAndRouting.get(i).reshardSplitShardCountSummary() + ); + } var writeableShardsWithOneShardHandoff = clusterService.operationRouting() .allWritableShards(clusterService.state().projectState(projectId), indexName); - assertEquals(0, writeableShardsWithOneShardHandoff.next().shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, writeableShardsWithOneShardHandoff.next().shardId().id()); + } assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardHandoff.next().shardId().id()); assertFalse(writeableShardsWithOneShardHandoff.hasNext()); @@ -587,21 +614,72 @@ public void testOperationRoutingWithResharding() throws IOException { ); ClusterServiceUtils.setState(clusterService, newState); + // This shard is always last since target shards all have ids larger than the previous shards. + int indexOfShardWithNewState = shardCount; + int sourceShard = updatedReshardingMetadataOneShardInSplit.getReshardingMetadata() + .getSplit() + .sourceShard(shardChangingSplitTargetState); + var searchShardsWithOneShardSplit = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); assertEquals(shardCount + 1, searchShardsWithOneShardSplit.size()); - assertEquals(0, searchShardsWithOneShardSplit.get(0).shardId().id()); - assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardSplit.get(1).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardSplit.get(i).iterator().shardId().id()); + } + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardSplit.get(indexOfShardWithNewState).iterator().shardId().id()); + // Since the target shard is in SPLIT state, reshardSplitShardCountSummary is updated for it and the corresponding source shard. + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardSplit.get(indexOfShardWithNewState).reshardSplitShardCountSummary() + ); + for (int i = 0; i < shardCount; i++) { + if (i == sourceShard) { + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardSplit.get(i).reshardSplitShardCountSummary() + ); + } else { + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardSplit.get(i).reshardSplitShardCountSummary() + ); + } + } var searchShardsWithOneShardSplitAndRouting = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); assertEquals(shardCount + 1, searchShardsWithOneShardSplitAndRouting.size()); - assertEquals(0, searchShardsWithOneShardSplitAndRouting.get(0).shardId().id()); - assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardSplitAndRouting.get(1).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardSplitAndRouting.get(i).iterator().shardId().id()); + } + assertEquals( + shardChangingSplitTargetState, + searchShardsWithOneShardSplitAndRouting.get(indexOfShardWithNewState).iterator().shardId().id() + ); + // Since the target shard is in SPLIT state, reshardSplitShardCountSummary is updated for it and the corresponding source shard. + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardSplitAndRouting.get(indexOfShardWithNewState).reshardSplitShardCountSummary() + ); + for (int i = 0; i < shardCount; i++) { + if (i == sourceShard) { + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardSplitAndRouting.get(i).reshardSplitShardCountSummary() + ); + } else { + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardSplitAndRouting.get(i).reshardSplitShardCountSummary() + ); + } + } var writeableShardsWithOneShardSplit = clusterService.operationRouting() .allWritableShards(clusterService.state().projectState(projectId), indexName); - assertEquals(0, writeableShardsWithOneShardSplit.next().shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, writeableShardsWithOneShardSplit.next().shardId().id()); + } assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardSplit.next().shardId().id()); assertFalse(writeableShardsWithOneShardSplit.hasNext()); @@ -626,18 +704,63 @@ public void testOperationRoutingWithResharding() throws IOException { var searchShardsWithOneShardDone = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); assertEquals(shardCount + 1, searchShardsWithOneShardDone.size()); - assertEquals(0, searchShardsWithOneShardDone.get(0).shardId().id()); - assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardDone.get(1).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardDone.get(i).iterator().shardId().id()); + } + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardDone.get(indexOfShardWithNewState).iterator().shardId().id()); + // Since the target shard is past SPLIT state, reshardSplitShardCountSummary is updated for it and the corresponding source shard. + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardDone.get(indexOfShardWithNewState).reshardSplitShardCountSummary() + ); + for (int i = 0; i < shardCount; i++) { + if (i == sourceShard) { + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardDone.get(i).reshardSplitShardCountSummary() + ); + } else { + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardDone.get(i).reshardSplitShardCountSummary() + ); + } + } var searchShardsWithOneShardDoneAndRouting = clusterService.operationRouting() .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); assertEquals(shardCount + 1, searchShardsWithOneShardDoneAndRouting.size()); - assertEquals(0, searchShardsWithOneShardDoneAndRouting.get(0).shardId().id()); - assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardDoneAndRouting.get(1).shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, searchShardsWithOneShardDoneAndRouting.get(i).iterator().shardId().id()); + } + assertEquals( + shardChangingSplitTargetState, + searchShardsWithOneShardDoneAndRouting.get(indexOfShardWithNewState).iterator().shardId().id() + ); + // Since the target shard is past SPLIT state, reshardSplitShardCountSummary is updated for it and the corresponding source shard. + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardDoneAndRouting.get(indexOfShardWithNewState).reshardSplitShardCountSummary() + ); + for (int i = 0; i < shardCount; i++) { + if (i == sourceShard) { + assertEquals( + SplitShardCountSummary.fromInt(newShardCount), + searchShardsWithOneShardDoneAndRouting.get(i).reshardSplitShardCountSummary() + ); + } else { + assertEquals( + SplitShardCountSummary.fromInt(shardCount), + searchShardsWithOneShardDoneAndRouting.get(i).reshardSplitShardCountSummary() + ); + } + } var writeableShardsWithOneShardDone = clusterService.operationRouting() .allWritableShards(clusterService.state().projectState(projectId), indexName); - assertEquals(0, writeableShardsWithOneShardDone.next().shardId().id()); + for (int i = 0; i < shardCount; i++) { + assertEquals(i, writeableShardsWithOneShardDone.next().shardId().id()); + } assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardDone.next().shardId().id()); assertFalse(writeableShardsWithOneShardDone.hasNext()); diff --git a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 87230f9f8a9b0..07ebac7bc3a40 100644 --- a/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.RotationShardShuffler; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardShuffler; @@ -385,34 +386,34 @@ public void testShardsAndPreferNodeRouting() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); - List shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0"); - assertThat(shardIterators.size(), equalTo(1)); - assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); + List searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0"); + assertThat(searchShards.size(), equalTo(1)); + assertThat(searchShards.iterator().next().iterator().shardId().id(), equalTo(0)); - shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:1"); - assertThat(shardIterators.size(), equalTo(1)); - assertThat(shardIterators.iterator().next().shardId().id(), equalTo(1)); + searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:1"); + assertThat(searchShards.size(), equalTo(1)); + assertThat(searchShards.iterator().next().iterator().shardId().id(), equalTo(1)); // check node preference, first without preference to see they switch - shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|"); - assertThat(shardIterators.size(), equalTo(1)); - assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); - String firstRoundNodeId = shardIterators.iterator().next().nextOrNull().currentNodeId(); - - shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0"); - assertThat(shardIterators.size(), equalTo(1)); - assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); - assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), not(equalTo(firstRoundNodeId))); - - shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|_prefer_nodes:node1"); - assertThat(shardIterators.size(), equalTo(1)); - assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); - assertThat(shardIterators.iterator().next().nextOrNull().currentNodeId(), equalTo("node1")); - - shardIterators = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|_prefer_nodes:node1,node2"); - assertThat(shardIterators.size(), equalTo(1)); - Iterator iterator = shardIterators.iterator(); - final ShardIterator it = iterator.next(); + searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|"); + assertThat(searchShards.size(), equalTo(1)); + assertThat(searchShards.iterator().next().iterator().shardId().id(), equalTo(0)); + String firstRoundNodeId = searchShards.iterator().next().iterator().nextOrNull().currentNodeId(); + + searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0"); + assertThat(searchShards.size(), equalTo(1)); + assertThat(searchShards.iterator().next().iterator().shardId().id(), equalTo(0)); + assertThat(searchShards.iterator().next().iterator().nextOrNull().currentNodeId(), not(equalTo(firstRoundNodeId))); + + searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|_prefer_nodes:node1"); + assertThat(searchShards.size(), equalTo(1)); + assertThat(searchShards.iterator().next().iterator().shardId().id(), equalTo(0)); + assertThat(searchShards.iterator().next().iterator().nextOrNull().currentNodeId(), equalTo("node1")); + + searchShards = operationRouting.searchShards(project, new String[] { "test" }, null, "_shards:0|_prefer_nodes:node1,node2"); + assertThat(searchShards.size(), equalTo(1)); + Iterator iterator = searchShards.iterator(); + final ShardIterator it = iterator.next().iterator(); assertThat(it.shardId().id(), equalTo(0)); final String firstNodeId = it.nextOrNull().currentNodeId(); assertThat(firstNodeId, anyOf(equalTo("node1"), equalTo("node2"))); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 0cd60823a22cc..fb964ed1c0387 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; @@ -2459,7 +2460,8 @@ public void testWaitOnRefresh() throws ExecutionException, InterruptedException -1, null, null, - null + null, + SplitShardCountSummary.UNSET ); PlainActionFuture future = new PlainActionFuture<>(); service.executeQueryPhase(request, task, future.delegateFailure((l, r) -> { @@ -2495,7 +2497,8 @@ public void testWaitOnRefreshFailsWithRefreshesDisabled() { -1, null, null, - null + null, + SplitShardCountSummary.UNSET ); service.executeQueryPhase(request, task, future); IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, future::actionGet); @@ -2533,7 +2536,8 @@ public void testWaitOnRefreshFailsIfCheckpointNotIndexed() { -1, null, null, - null + null, + SplitShardCountSummary.UNSET ); service.executeQueryPhase(request, task, future); @@ -2570,7 +2574,8 @@ public void testWaitOnRefreshTimeout() { -1, null, null, - null + null, + SplitShardCountSummary.UNSET ); service.executeQueryPhase(request, task, future); diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index 302ce085ea317..e2207a2d388a3 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -82,10 +83,10 @@ public void testClone() throws Exception { } private ShardSearchRequest createShardSearchRequest() throws IOException { - return createShardSearchReqest(createSearchRequest()); + return createShardSearchRequest(createSearchRequest()); } - private ShardSearchRequest createShardSearchReqest(SearchRequest searchRequest) { + private ShardSearchRequest createShardSearchRequest(SearchRequest searchRequest) { ShardId shardId = new ShardId(randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10), randomInt()); final AliasFilter filteringAliases; if (randomBoolean()) { @@ -114,7 +115,8 @@ private ShardSearchRequest createShardSearchReqest(SearchRequest searchRequest) Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), shardSearchContextId, - keepAlive + keepAlive, + SplitShardCountSummary.fromInt(randomIntBetween(0, numberOfShards)) ); req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); if (randomBoolean()) { @@ -270,7 +272,7 @@ public void testForceSyntheticUnsupported() throws IOException { } } request.setForceSyntheticSource(true); - ShardSearchRequest shardRequest = createShardSearchReqest(request); + ShardSearchRequest shardRequest = createShardSearchRequest(request); StreamOutput out = new BytesStreamOutput(); out.setTransportVersion(TransportVersions.V_8_3_0); Exception e = expectThrows(IllegalArgumentException.class, () -> shardRequest.writeTo(out)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java index 51970f1d6eddb..4c38e78ae02e9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.project.ProjectResolver; -import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; @@ -208,11 +208,11 @@ protected Map> getNodeBundles(ProjectState project, String[ String[] singleIndex = { indexName }; - List shards = clusterService.operationRouting().searchShards(project, singleIndex, null, null); + List shards = clusterService.operationRouting().searchShards(project, singleIndex, null, null); - for (ShardIterator copiesOfShard : shards) { + for (SearchShardRouting copiesOfShard : shards) { ShardRouting selectedCopyOfShard = null; - for (ShardRouting copy : copiesOfShard) { + for (ShardRouting copy : copiesOfShard.iterator()) { // Pick the first active node with a copy of the shard if (copy.active() && copy.assignedToNode()) { selectedCopyOfShard = copy; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java index c273c39d216fc..bda1f3f1fcf2e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichShardMultiSearchAction.java @@ -219,7 +219,8 @@ protected ShardsIterator shards(ProjectState project, InternalRequest request) { } return clusterService.operationRouting() .searchShards(project, new String[] { index }, null, Preference.LOCAL.type()) - .getFirst(); + .getFirst() + .iterator(); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index c834b538c5e86..5079c4f9f0570 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.SearchShardRouting; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -231,13 +232,13 @@ protected static QueryList termQueryList( public final void lookupAsync(R request, CancellableTask parentTask, ActionListener> outListener) { ClusterState clusterState = clusterService.state(); var projectState = projectResolver.getProjectState(clusterState); - List shardIterators = clusterService.operationRouting() + List shards = clusterService.operationRouting() .searchShards(projectState, new String[] { request.index }, Map.of(), "_local"); - if (shardIterators.size() != 1) { + if (shards.size() != 1) { outListener.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", request.index)); return; } - ShardIterator shardIt = shardIterators.get(0); + ShardIterator shardIt = shards.get(0).iterator(); ShardRouting shardRouting = shardIt.nextOrNull(); ShardId shardId = shardIt.shardId(); if (shardRouting == null) {