Skip to content

Commit a1a5f14

Browse files
committed
Add reshardSplitShardCountSummary field to ShardSearchRequest and friends
1 parent b7dbb26 commit a1a5f14

File tree

15 files changed

+253
-105
lines changed

15 files changed

+253
-105
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.cluster.project.ProjectResolver;
25+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2526
import org.elasticsearch.cluster.routing.ShardIterator;
2627
import org.elasticsearch.cluster.routing.ShardRouting;
2728
import org.elasticsearch.cluster.service.ClusterService;
@@ -109,17 +110,18 @@ protected void masterOperation(
109110
}
110111

111112
Set<String> nodeIds = new HashSet<>();
112-
List<ShardIterator> groupShardsIterator = clusterService.operationRouting()
113+
List<SearchShardRouting> groupShardRouting = clusterService.operationRouting()
113114
.searchShards(project, concreteIndices, routingMap, request.preference());
114115
ShardRouting shard;
115-
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
116+
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardRouting.size()];
116117
int currentGroup = 0;
117-
for (ShardIterator shardIt : groupShardsIterator) {
118-
ShardId shardId = shardIt.shardId();
119-
ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
118+
for (SearchShardRouting shardRouting : groupShardRouting) {
119+
ShardIterator shardIterator = shardRouting.iterator();
120+
ShardId shardId = shardIterator.shardId();
121+
ShardRouting[] shardRoutings = new ShardRouting[shardIterator.size()];
120122
int currentShard = 0;
121-
shardIt.reset();
122-
while ((shard = shardIt.nextOrNull()) != null) {
123+
shardIterator.reset();
124+
while ((shard = shardIterator.nextOrNull()) != null) {
123125
shardRoutings[currentShard++] = shard;
124126
nodeIds.add(shard.currentNodeId());
125127
}

server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.cluster.project.ProjectResolver;
26+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2627
import org.elasticsearch.cluster.routing.ShardIterator;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.cluster.service.ClusterService;
@@ -220,15 +221,17 @@ protected AnalyzeIndexDiskUsageResponse newResponse(
220221
@Override
221222
protected List<ShardIterator> shards(ClusterState clusterState, AnalyzeIndexDiskUsageRequest request, String[] concreteIndices) {
222223
ProjectState project = projectResolver.getProjectState(clusterState);
223-
final List<ShardIterator> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);
224+
final List<SearchShardRouting> groups = clusterService.operationRouting().searchShards(project, concreteIndices, null, null);
224225

225-
for (ShardIterator group : groups) {
226+
var shardIterators = new ArrayList<ShardIterator>(groups.size());
227+
for (SearchShardRouting group : groups) {
226228
// fails fast if any non-active groups
227-
if (group.size() == 0) {
228-
throw new NoShardAvailableActionException(group.shardId());
229+
if (group.iterator().size() == 0) {
230+
throw new NoShardAvailableActionException(group.iterator().shardId());
229231
}
232+
shardIterators.add(group.iterator());
230233
}
231-
return groups;
234+
return shardIterators;
232235
}
233236

234237
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/TransportValidateQueryAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2525
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
2626
import org.elasticsearch.cluster.project.ProjectResolver;
27+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2728
import org.elasticsearch.cluster.routing.ShardIterator;
2829
import org.elasticsearch.cluster.routing.ShardRouting;
2930
import org.elasticsearch.cluster.service.ClusterService;
@@ -180,7 +181,11 @@ protected List<ShardIterator> shards(ClusterState clusterState, ValidateQueryReq
180181
routing,
181182
request.indices()
182183
);
183-
return clusterService.operationRouting().searchShards(project, concreteIndices, routingMap, "_local");
184+
return clusterService.operationRouting()
185+
.searchShards(project, concreteIndices, routingMap, "_local")
186+
.stream()
187+
.map(SearchShardRouting::iterator)
188+
.toList();
184189
}
185190

186191
@Override

server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.ProjectState;
2121
import org.elasticsearch.cluster.node.DiscoveryNode;
2222
import org.elasticsearch.cluster.project.ProjectResolver;
23+
import org.elasticsearch.cluster.routing.SearchShardRouting;
2324
import org.elasticsearch.cluster.routing.ShardIterator;
2425
import org.elasticsearch.cluster.routing.ShardRouting;
2526
import org.elasticsearch.cluster.service.ClusterService;
@@ -105,16 +106,16 @@ final class RequestDispatcher {
105106
ProjectState project = projectResolver.getProjectState(clusterState);
106107

107108
for (String index : indices) {
108-
final List<ShardIterator> shardIts;
109+
final List<SearchShardRouting> searchShards;
109110
try {
110-
shardIts = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null);
111+
searchShards = clusterService.operationRouting().searchShards(project, new String[] { index }, null, null);
111112
} catch (Exception e) {
112113
onIndexFailure.accept(index, e);
113114
continue;
114115
}
115116
final IndexSelector indexResult = new IndexSelector(
116117
fieldCapsRequest.clusterAlias(),
117-
shardIts,
118+
searchShards,
118119
fieldCapsRequest.indexFilter(),
119120
nowInMillis,
120121
coordinatorRewriteContextProvider
@@ -270,14 +271,15 @@ private static class IndexSelector {
270271

271272
IndexSelector(
272273
String clusterAlias,
273-
List<ShardIterator> shardIts,
274+
List<SearchShardRouting> searchShards,
274275
QueryBuilder indexFilter,
275276
long nowInMillis,
276277
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
277278
) {
278-
for (ShardIterator shardIt : shardIts) {
279+
for (SearchShardRouting routing : searchShards) {
279280
boolean canMatch = true;
280-
final ShardId shardId = shardIt.shardId();
281+
ShardIterator iterator = routing.iterator();
282+
final ShardId shardId = iterator.shardId();
281283
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
282284
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
283285
if (coordinatorRewriteContext != null) {
@@ -291,7 +293,7 @@ private static class IndexSelector {
291293
}
292294
}
293295
if (canMatch) {
294-
for (ShardRouting shard : shardIt) {
296+
for (ShardRouting shard : iterator) {
295297
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
296298
}
297299
} else {

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
710710
timeProvider.absoluteStartMillis(),
711711
shardIt.getClusterAlias(),
712712
shardIt.getSearchContextId(),
713-
shardIt.getSearchContextKeepAlive()
713+
shardIt.getSearchContextKeepAlive(),
714+
shardIt.getReshardSplitShardCountSummary()
714715
);
715716
// if we already received a search result we can inform the shard that it
716717
// can return a null response if the request rewrites to match none rather

server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public static class Shard implements Writeable {
6767
private final ShardSearchContextId readerId;
6868
private final TimeValue keepAlive;
6969
private final long waitForCheckpoint;
70+
private final int reshardSplitShardCountSummary;
7071

7172
public Shard(
7273
String[] indices,
@@ -76,7 +77,8 @@ public Shard(
7677
float indexBoost,
7778
ShardSearchContextId readerId,
7879
TimeValue keepAlive,
79-
long waitForCheckpoint
80+
long waitForCheckpoint,
81+
int reshardSplitShardCountSummary
8082
) {
8183
this.indices = indices;
8284
this.shardId = shardId;
@@ -87,6 +89,7 @@ public Shard(
8789
this.keepAlive = keepAlive;
8890
this.waitForCheckpoint = waitForCheckpoint;
8991
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
92+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
9093
}
9194

9295
public Shard(StreamInput in) throws IOException {
@@ -99,6 +102,11 @@ public Shard(StreamInput in) throws IOException {
99102
keepAlive = in.readOptionalTimeValue();
100103
waitForCheckpoint = in.readLong();
101104
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
105+
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
106+
reshardSplitShardCountSummary = in.readVInt();
107+
} else {
108+
reshardSplitShardCountSummary = 0;
109+
}
102110
}
103111

104112
@Override
@@ -111,6 +119,9 @@ public void writeTo(StreamOutput out) throws IOException {
111119
out.writeOptionalWriteable(readerId);
112120
out.writeOptionalTimeValue(keepAlive);
113121
out.writeLong(waitForCheckpoint);
122+
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
123+
out.writeVInt(reshardSplitShardCountSummary);
124+
}
114125
}
115126

116127
public int getShardRequestIndex() {
@@ -249,7 +260,8 @@ public ShardSearchRequest createShardSearchRequest(Shard r) {
249260
r.keepAlive,
250261
r.waitForCheckpoint,
251262
waitForCheckpointsTimeout,
252-
false
263+
false,
264+
r.reshardSplitShardCountSummary
253265
);
254266
shardSearchRequest.setParentTask(getParentTask());
255267
return shardSearchRequest;

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
425425
indexBoost,
426426
shardIt.getSearchContextId(),
427427
shardIt.getSearchContextKeepAlive(),
428-
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex)
428+
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
429+
shardIt.getReshardSplitShardCountSummary()
429430
);
430431
}
431432

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -352,17 +352,23 @@ public IndicesOptions indicesOptions() {
352352
}
353353
}
354354

355-
private record ShardToQuery(float boost, String[] originalIndices, int shardIndex, ShardId shardId, ShardSearchContextId contextId)
356-
implements
357-
Writeable {
355+
private record ShardToQuery(
356+
float boost,
357+
String[] originalIndices,
358+
int shardIndex,
359+
ShardId shardId,
360+
ShardSearchContextId contextId,
361+
int reshardSplitShardCountSummary
362+
) implements Writeable {
358363

359364
static ShardToQuery readFrom(StreamInput in) throws IOException {
360365
return new ShardToQuery(
361366
in.readFloat(),
362367
in.readStringArray(),
363368
in.readVInt(),
364369
new ShardId(in),
365-
in.readOptionalWriteable(ShardSearchContextId::new)
370+
in.readOptionalWriteable(ShardSearchContextId::new),
371+
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY) ? in.readVInt() : 0
366372
);
367373
}
368374

@@ -373,6 +379,9 @@ public void writeTo(StreamOutput out) throws IOException {
373379
out.writeVInt(shardIndex);
374380
shardId.writeTo(out);
375381
out.writeOptionalWriteable(contextId);
382+
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
383+
out.writeVInt(reshardSplitShardCountSummary);
384+
}
376385
}
377386
}
378387

@@ -452,7 +461,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
452461
getOriginalIndices(shardIndex).indices(),
453462
shardIndex,
454463
routing.getShardId(),
455-
shardRoutings.getSearchContextId()
464+
shardRoutings.getSearchContextId(),
465+
shardRoutings.getReshardSplitShardCountSummary()
456466
)
457467
);
458468
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);
@@ -643,7 +653,8 @@ private static ShardSearchRequest buildShardSearchRequest(
643653
SearchRequest searchRequest,
644654
int totalShardCount,
645655
long absoluteStartMillis,
646-
boolean hasResponse
656+
boolean hasResponse,
657+
int reshardSplitShardCountSummary
647658
) {
648659
ShardSearchRequest shardRequest = new ShardSearchRequest(
649660
originalIndices,
@@ -656,7 +667,8 @@ private static ShardSearchRequest buildShardSearchRequest(
656667
absoluteStartMillis,
657668
clusterAlias,
658669
searchContextId,
659-
searchContextKeepAlive
670+
searchContextKeepAlive,
671+
reshardSplitShardCountSummary
660672
);
661673
// if we already received a search result we can inform the shard that it
662674
// can return a null response if the request rewrites to match none rather
@@ -694,7 +706,8 @@ private static void executeShardTasks(QueryPerNodeState state) {
694706
searchRequest,
695707
nodeQueryRequest.totalShards,
696708
nodeQueryRequest.absoluteStartMillis,
697-
state.hasResponse.getAcquire()
709+
state.hasResponse.getAcquire(),
710+
shardToQuery.reshardSplitShardCountSummary
698711
)
699712
),
700713
state.task,

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,50 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
3838
private final ShardSearchContextId searchContextId;
3939
private final TimeValue searchContextKeepAlive;
4040
private final PlainIterator<String> targetNodesIterator;
41+
/** See {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}. */
42+
private final int reshardSplitShardCountSummary;
4143

4244
/**
43-
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
44-
* this the a given <code>shardId</code>.
45+
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
4546
*
4647
* @param clusterAlias the alias of the cluster where the shard is located
4748
* @param shardId shard id of the group
4849
* @param shards shards to iterate
4950
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
5051
*/
51-
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
52-
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, false);
52+
public SearchShardIterator(
53+
@Nullable String clusterAlias,
54+
ShardId shardId,
55+
List<ShardRouting> shards,
56+
OriginalIndices originalIndices,
57+
int reshardSplitShardCountSummary
58+
) {
59+
this(
60+
clusterAlias,
61+
shardId,
62+
shards.stream().map(ShardRouting::currentNodeId).toList(),
63+
originalIndices,
64+
null,
65+
null,
66+
false,
67+
false,
68+
reshardSplitShardCountSummary
69+
);
5370
}
5471

5572
/**
56-
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards
73+
* Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard
74+
* with provided <code>shardId</code>.
5775
*
58-
* @param clusterAlias the alias of the cluster where the shard is located
59-
* @param shardId shard id of the group
60-
* @param targetNodeIds the list of nodes hosting shard copies
61-
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
62-
* @param searchContextId the point-in-time specified for this group if exists
63-
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
64-
* @param prefiltered if true, then this group already executed the can_match phase
65-
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
76+
* @param clusterAlias the alias of the cluster where the shard is located
77+
* @param shardId shard id of the group
78+
* @param targetNodeIds the list of nodes hosting shard copies
79+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
80+
* @param searchContextId the point-in-time specified for this group if exists
81+
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
82+
* @param prefiltered if true, then this group already executed the can_match phase
83+
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
84+
* @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}
6685
*/
6786
public SearchShardIterator(
6887
@Nullable String clusterAlias,
@@ -72,7 +91,8 @@ public SearchShardIterator(
7291
ShardSearchContextId searchContextId,
7392
TimeValue searchContextKeepAlive,
7493
boolean prefiltered,
75-
boolean skip
94+
boolean skip,
95+
int reshardSplitShardCountSummary
7696
) {
7797
this.shardId = shardId;
7898
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
@@ -84,6 +104,7 @@ public SearchShardIterator(
84104
this.prefiltered = prefiltered;
85105
this.skip = skip;
86106
assert skip == false || prefiltered : "only prefiltered shards are skip-able";
107+
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
87108
}
88109

89110
/**
@@ -171,6 +192,10 @@ ShardId shardId() {
171192
return shardId;
172193
}
173194

195+
public int getReshardSplitShardCountSummary() {
196+
return reshardSplitShardCountSummary;
197+
}
198+
174199
@Override
175200
public boolean equals(Object o) {
176201
if (this == o) return true;

0 commit comments

Comments
 (0)