Skip to content

Commit 70673eb

Browse files
lktsbcully
andauthored
Add reshardSplitShardCountSummary field to ShardSearchRequest and friends (#135804)
Co-authored-by: Brendan Cully <[email protected]>
1 parent 5a18127 commit 70673eb

29 files changed

+548
-169
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.cluster.project.ProjectResolver;
25-
import org.elasticsearch.cluster.routing.ShardIterator;
25+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2626
import org.elasticsearch.cluster.routing.ShardRouting;
2727
import org.elasticsearch.cluster.service.ClusterService;
2828
import org.elasticsearch.index.shard.ShardId;
@@ -109,12 +109,12 @@ protected void masterOperation(
109109
}
110110

111111
Set<String> nodeIds = new HashSet<>();
112-
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
112+
List<SearchShardRouting> groupShardsIterator = clusterService.operationRouting()
113113
.searchShards(project, concreteIndices, routingMap, request.preference());
114114
ShardRouting shard;
115115
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
116116
int currentGroup = 0;
117-
for (ShardIterator shardIt : groupShardsIterator) {
117+
for (SearchShardRouting shardIt : groupShardsIterator) {
118118
ShardId shardId = shardIt.shardId();
119119
ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
120120
int currentShard = 0;

server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.cluster.project.ProjectResolver;
26+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2627
import org.elasticsearch.cluster.routing.ShardIterator;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.cluster.service.ClusterService;
@@ -218,9 +219,13 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
218219
}
219220

220221
@Override
221-
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
222+
protected List<? extends ShardIterator> shards(
223+
ClusterState clusterState,
224+
AnalyzeIndexDiskUsageRequest request,
225+
String[] concreteIndices
226+
) {
222227
ProjectState project = projectResolver.getProjectState(clusterState);
223-
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);
228+
final List<SearchShardRouting> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);
224229

225230
for (ShardIterator group : groups) {
226231
// fails fast if any non-active groups

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2525
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
2626
import org.elasticsearch.cluster.project.ProjectResolver;
27-
import org.elasticsearch.cluster.routing.ShardIterator;
27+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2828
import org.elasticsearch.cluster.routing.ShardRouting;
2929
import org.elasticsearch.cluster.service.ClusterService;
3030
import org.elasticsearch.common.ParsingException;
@@ -166,7 +166,7 @@ protected ShardValidateQueryResponse readShardResponse(StreamInput in) throws IO
166166
}
167167

168168
@Override
169-
protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
169+
protected List<SearchShardRouting> shards(ClusterState clusterState, ValidateQueryRequest request, String[] concreteIndices) {
170170
final String routing;
171171
if (request.allShards()) {
172172
routing = null;
@@ -181,6 +181,7 @@ protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryReq
181181
request.indices()
182182
);
183183
return clusterService.operationRouting().searchShards(project, concreteIndices, routingMap, "_local");
184+
184185
}
185186

186187
@Override

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.ProjectState;
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.cluster.project.ProjectResolver;
23+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2324
import org.elasticsearch.cluster.routing.ShardIterator;
2425
import org.elasticsearch.cluster.routing.ShardRouting;
2526
import org.elasticsearch.cluster.service.ClusterService;
@@ -105,7 +106,7 @@ public RequestDispatcher(
105106
ProjectState project = projectResolver.getProjectState(clusterState);
106107

107108
for (String index : indices) {
108-
final List<ShardIterator> shardIts;
109+
final List<SearchShardRouting> shardIts;
109110
try {
110111
shardIts = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null);
111112
} catch (Exception e) {
@@ -270,7 +271,7 @@ private static class IndexSelector {
270271

271272
IndexSelector(
272273
String clusterAlias,
273-
List<ShardIterator> shardIts,
274+
List<SearchShardRouting> shardIts,
274275
QueryBuilder indexFilter,
275276
long nowInMillis,
276277
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
724724
timeProvider.absoluteStartMillis(),
725725
shardIt.getClusterAlias(),
726726
shardIt.getSearchContextId(),
727-
shardIt.getSearchContextKeepAlive()
727+
shardIt.getSearchContextKeepAlive(),
728+
shardIt.getReshardSplitShardCountSummary()
728729
);
729730
// if we already received a search result we can inform the shard that it
730731
// can return a null response if the request rewrites to match none rather

server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.IndicesRequest;
1313
import org.elasticsearch.action.OriginalIndices;
1414
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.io.stream.Writeable;
@@ -65,6 +66,7 @@ public static class Shard implements Writeable {
6566
private final ShardSearchContextId readerId;
6667
private final TimeValue keepAlive;
6768
private final long waitForCheckpoint;
69+
private final SplitShardCountSummary reshardSplitShardCountSummary;
6870

6971
public Shard(
7072
String[] indices,
@@ -74,7 +76,8 @@ public Shard(
7476
float indexBoost,
7577
ShardSearchContextId readerId,
7678
TimeValue keepAlive,
77-
long waitForCheckpoint
79+
long waitForCheckpoint,
80+
SplitShardCountSummary reshardSplitShardCountSummary
7881
) {
7982
this.indices = indices;
8083
this.shardId = shardId;
@@ -85,6 +88,7 @@ public Shard(
8588
this.keepAlive = keepAlive;
8689
this.waitForCheckpoint = waitForCheckpoint;
8790
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
91+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
8892
}
8993

9094
public Shard(StreamInput in) throws IOException {
@@ -97,6 +101,11 @@ public Shard(StreamInput in) throws IOException {
97101
keepAlive = in.readOptionalTimeValue();
98102
waitForCheckpoint = in.readLong();
99103
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
104+
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
105+
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
106+
} else {
107+
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
108+
}
100109
}
101110

102111
@Override
@@ -109,6 +118,9 @@ public void writeTo(StreamOutput out) throws IOException {
109118
out.writeOptionalWriteable(readerId);
110119
out.writeOptionalTimeValue(keepAlive);
111120
out.writeLong(waitForCheckpoint);
121+
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
122+
out.writeVInt(reshardSplitShardCountSummary.asInt());
123+
}
112124
}
113125

114126
public int getShardRequestIndex() {
@@ -234,7 +246,8 @@ public ShardSearchRequest createShardSearchRequest(Shard r) {
234246
r.keepAlive,
235247
r.waitForCheckpoint,
236248
waitForCheckpointsTimeout,
237-
false
249+
false,
250+
r.reshardSplitShardCountSummary
238251
);
239252
shardSearchRequest.setParentTask(getParentTask());
240253
return shardSearchRequest;

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
425425
indexBoost,
426426
shardIt.getSearchContextId(),
427427
shardIt.getSearchContextKeepAlive(),
428-
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex)
428+
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
429+
shardIt.getReshardSplitShardCountSummary()
429430
);
430431
}
431432

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.support.IndicesOptions;
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2627
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2728
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2829
import org.elasticsearch.common.io.stream.StreamInput;
@@ -358,17 +359,25 @@ public IndicesOptions indicesOptions() {
358359
}
359360
}
360361

361-
private record ShardToQuery(float boost, String[] originalIndices, int shardIndex, ShardId shardId, ShardSearchContextId contextId)
362-
implements
363-
Writeable {
362+
private record ShardToQuery(
363+
float boost,
364+
String[] originalIndices,
365+
int shardIndex,
366+
ShardId shardId,
367+
ShardSearchContextId contextId,
368+
SplitShardCountSummary reshardSplitShardCountSummary
369+
) implements Writeable {
364370

365371
static ShardToQuery readFrom(StreamInput in) throws IOException {
366372
return new ShardToQuery(
367373
in.readFloat(),
368374
in.readStringArray(),
369375
in.readVInt(),
370376
new ShardId(in),
371-
in.readOptionalWriteable(ShardSearchContextId::new)
377+
in.readOptionalWriteable(ShardSearchContextId::new),
378+
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)
379+
? SplitShardCountSummary.fromInt(in.readVInt())
380+
: SplitShardCountSummary.UNSET
372381
);
373382
}
374383

@@ -379,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException {
379388
out.writeVInt(shardIndex);
380389
shardId.writeTo(out);
381390
out.writeOptionalWriteable(contextId);
391+
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
392+
out.writeVInt(reshardSplitShardCountSummary.asInt());
393+
}
382394
}
383395
}
384396

@@ -458,7 +470,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
458470
getOriginalIndices(shardIndex).indices(),
459471
shardIndex,
460472
routing.getShardId(),
461-
shardRoutings.getSearchContextId()
473+
shardRoutings.getSearchContextId(),
474+
shardRoutings.getReshardSplitShardCountSummary()
462475
)
463476
);
464477
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);
@@ -655,7 +668,8 @@ private static ShardSearchRequest buildShardSearchRequest(
655668
SearchRequest searchRequest,
656669
int totalShardCount,
657670
long absoluteStartMillis,
658-
boolean hasResponse
671+
boolean hasResponse,
672+
SplitShardCountSummary reshardSplitShardCountSummary
659673
) {
660674
ShardSearchRequest shardRequest = new ShardSearchRequest(
661675
originalIndices,
@@ -668,7 +682,8 @@ private static ShardSearchRequest buildShardSearchRequest(
668682
absoluteStartMillis,
669683
clusterAlias,
670684
searchContextId,
671-
searchContextKeepAlive
685+
searchContextKeepAlive,
686+
reshardSplitShardCountSummary
672687
);
673688
// if we already received a search result we can inform the shard that it
674689
// can return a null response if the request rewrites to match none rather
@@ -706,7 +721,8 @@ private static void executeShardTasks(QueryPerNodeState state) {
706721
searchRequest,
707722
nodeQueryRequest.totalShards,
708723
nodeQueryRequest.absoluteStartMillis,
709-
state.hasResponse.getAcquire()
724+
state.hasResponse.getAcquire(),
725+
shardToQuery.reshardSplitShardCountSummary
710726
)
711727
),
712728
state.task,

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.OriginalIndices;
1313
import org.elasticsearch.cluster.routing.ShardRouting;
14+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1415
import org.elasticsearch.common.util.PlainIterator;
1516
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.core.TimeValue;
@@ -38,31 +39,52 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
3839
private final ShardSearchContextId searchContextId;
3940
private final TimeValue searchContextKeepAlive;
4041
private final PlainIterator<String> targetNodesIterator;
42+
/**
43+
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
44+
*/
45+
private final SplitShardCountSummary reshardSplitShardCountSummary;
4146

4247
/**
43-
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
44-
* this the a given <code>shardId</code>.
48+
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
4549
*
4650
* @param clusterAlias the alias of the cluster where the shard is located
4751
* @param shardId shard id of the group
4852
* @param shards shards to iterate
4953
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
5054
*/
51-
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
52-
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, false);
55+
public SearchShardIterator(
56+
@Nullable String clusterAlias,
57+
ShardId shardId,
58+
List<ShardRouting> shards,
59+
OriginalIndices originalIndices,
60+
SplitShardCountSummary reshardSplitShardCountSummary
61+
) {
62+
this(
63+
clusterAlias,
64+
shardId,
65+
shards.stream().map(ShardRouting::currentNodeId).toList(),
66+
originalIndices,
67+
null,
68+
null,
69+
false,
70+
false,
71+
reshardSplitShardCountSummary
72+
);
5373
}
5474

5575
/**
56-
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
76+
* Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard
77+
* with provided <code>shardId</code>.
5778
*
58-
* @param clusterAlias the alias of the cluster where the shard is located
59-
* @param shardId shard id of the group
60-
* @param targetNodeIds the list of nodes hosting shard copies
61-
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
62-
* @param searchContextId the point-in-time specified for this group if exists
63-
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
64-
* @param prefiltered if true, then this group already executed the can_match phase
65-
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
79+
* @param clusterAlias the alias of the cluster where the shard is located
80+
* @param shardId shard id of the group
81+
* @param targetNodeIds the list of nodes hosting shard copies
82+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
83+
* @param searchContextId the point-in-time specified for this group if exists
84+
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
85+
* @param prefiltered if true, then this group already executed the can_match phase
86+
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
87+
* @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}
6688
*/
6789
public SearchShardIterator(
6890
@Nullable String clusterAlias,
@@ -72,7 +94,8 @@ public SearchShardIterator(
7294
ShardSearchContextId searchContextId,
7395
TimeValue searchContextKeepAlive,
7496
boolean prefiltered,
75-
boolean skip
97+
boolean skip,
98+
SplitShardCountSummary reshardSplitShardCountSummary
7699
) {
77100
this.shardId = shardId;
78101
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
@@ -84,6 +107,7 @@ public SearchShardIterator(
84107
this.prefiltered = prefiltered;
85108
this.skip = skip;
86109
assert skip == false || prefiltered : "only prefiltered shards are skip-able";
110+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
87111
}
88112

89113
/**
@@ -171,6 +195,10 @@ ShardId shardId() {
171195
return shardId;
172196
}
173197

198+
public SplitShardCountSummary getReshardSplitShardCountSummary() {
199+
return reshardSplitShardCountSummary;
200+
}
201+
174202
@Override
175203
public boolean equals(Object o) {
176204
if (this == o) return true;

0 commit comments

Comments
 (0)