Skip to content

Commit 20a7df7

Browse files
committed
consume new type
1 parent 61c51c5 commit 20a7df7

16 files changed

+88
-76
lines changed

server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.IndicesRequest;
1313
import org.elasticsearch.action.OriginalIndices;
1414
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.io.stream.Writeable;
@@ -65,7 +66,7 @@ public static class Shard implements Writeable {
6566
private final ShardSearchContextId readerId;
6667
private final TimeValue keepAlive;
6768
private final long waitForCheckpoint;
68-
private final int reshardSplitShardCountSummary;
69+
private final SplitShardCountSummary reshardSplitShardCountSummary;
6970

7071
public Shard(
7172
String[] indices,
@@ -76,7 +77,7 @@ public Shard(
7677
ShardSearchContextId readerId,
7778
TimeValue keepAlive,
7879
long waitForCheckpoint,
79-
int reshardSplitShardCountSummary
80+
SplitShardCountSummary reshardSplitShardCountSummary
8081
) {
8182
this.indices = indices;
8283
this.shardId = shardId;
@@ -101,9 +102,9 @@ public Shard(StreamInput in) throws IOException {
101102
waitForCheckpoint = in.readLong();
102103
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
103104
if (in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
104-
reshardSplitShardCountSummary = in.readVInt();
105+
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
105106
} else {
106-
reshardSplitShardCountSummary = 0;
107+
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
107108
}
108109
}
109110

@@ -118,7 +119,7 @@ public void writeTo(StreamOutput out) throws IOException {
118119
out.writeOptionalTimeValue(keepAlive);
119120
out.writeLong(waitForCheckpoint);
120121
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
121-
out.writeVInt(reshardSplitShardCountSummary);
122+
out.writeVInt(reshardSplitShardCountSummary.asInt());
122123
}
123124
}
124125

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.support.IndicesOptions;
2424
import org.elasticsearch.client.internal.Client;
2525
import org.elasticsearch.cluster.ClusterState;
26+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2627
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2728
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2829
import org.elasticsearch.common.io.stream.StreamInput;
@@ -360,7 +361,7 @@ private record ShardToQuery(
360361
int shardIndex,
361362
ShardId shardId,
362363
ShardSearchContextId contextId,
363-
int reshardSplitShardCountSummary
364+
SplitShardCountSummary reshardSplitShardCountSummary
364365
) implements Writeable {
365366

366367
static ShardToQuery readFrom(StreamInput in) throws IOException {
@@ -370,7 +371,9 @@ static ShardToQuery readFrom(StreamInput in) throws IOException {
370371
in.readVInt(),
371372
new ShardId(in),
372373
in.readOptionalWriteable(ShardSearchContextId::new),
373-
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY) ? in.readVInt() : 0
374+
in.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)
375+
? SplitShardCountSummary.fromInt(in.readVInt())
376+
: SplitShardCountSummary.UNSET
374377
);
375378
}
376379

@@ -382,7 +385,7 @@ public void writeTo(StreamOutput out) throws IOException {
382385
shardId.writeTo(out);
383386
out.writeOptionalWriteable(contextId);
384387
if (out.getTransportVersion().supports(ShardSearchRequest.SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
385-
out.writeVInt(reshardSplitShardCountSummary);
388+
out.writeVInt(reshardSplitShardCountSummary.asInt());
386389
}
387390
}
388391
}
@@ -662,7 +665,7 @@ private static ShardSearchRequest buildShardSearchRequest(
662665
int totalShardCount,
663666
long absoluteStartMillis,
664667
boolean hasResponse,
665-
int reshardSplitShardCountSummary
668+
SplitShardCountSummary reshardSplitShardCountSummary
666669
) {
667670
ShardSearchRequest shardRequest = new ShardSearchRequest(
668671
originalIndices,

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.OriginalIndices;
1313
import org.elasticsearch.cluster.routing.ShardRouting;
14+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
1415
import org.elasticsearch.common.util.PlainIterator;
1516
import org.elasticsearch.core.Nullable;
1617
import org.elasticsearch.core.TimeValue;
@@ -39,7 +40,7 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
3940
private final TimeValue searchContextKeepAlive;
4041
private final PlainIterator<String> targetNodesIterator;
4142
/** See {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}. */
42-
private final int reshardSplitShardCountSummary;
43+
private final SplitShardCountSummary reshardSplitShardCountSummary;
4344

4445
/**
4546
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
@@ -54,7 +55,7 @@ public SearchShardIterator(
5455
ShardId shardId,
5556
List<ShardRouting> shards,
5657
OriginalIndices originalIndices,
57-
int reshardSplitShardCountSummary
58+
SplitShardCountSummary reshardSplitShardCountSummary
5859
) {
5960
this(
6061
clusterAlias,
@@ -92,7 +93,7 @@ public SearchShardIterator(
9293
TimeValue searchContextKeepAlive,
9394
boolean prefiltered,
9495
boolean skip,
95-
int reshardSplitShardCountSummary
96+
SplitShardCountSummary reshardSplitShardCountSummary
9697
) {
9798
this.shardId = shardId;
9899
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
@@ -192,7 +193,7 @@ ShardId shardId() {
192193
return shardId;
193194
}
194195

195-
public int getReshardSplitShardCountSummary() {
196+
public SplitShardCountSummary getReshardSplitShardCountSummary() {
196197
return reshardSplitShardCountSummary;
197198
}
198199

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.cluster.metadata.IndexMetadata;
4242
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
4343
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
44-
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
4544
import org.elasticsearch.cluster.metadata.ProjectMetadata;
4645
import org.elasticsearch.cluster.node.DiscoveryNode;
4746
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -50,6 +49,7 @@
5049
import org.elasticsearch.cluster.routing.SearchShardRouting;
5150
import org.elasticsearch.cluster.routing.ShardIterator;
5251
import org.elasticsearch.cluster.routing.ShardRouting;
52+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
5353
import org.elasticsearch.cluster.service.ClusterService;
5454
import org.elasticsearch.common.Strings;
5555
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -1252,7 +1252,7 @@ static List<SearchShardIterator> getRemoteShardsIterator(
12521252
searchShardsGroup.skipped(),
12531253
// This parameter is specific to the resharding feature.
12541254
// Resharding is currently not supported with CCS.
1255-
IndexReshardingMetadata.NOOP_RESHARD_SPLIT_SHARD_COUNT_SUMMARY
1255+
SplitShardCountSummary.UNSET
12561256
);
12571257
remoteShardIterators.add(shardIterator);
12581258
}
@@ -1308,7 +1308,7 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(
13081308
false,
13091309
// This parameter is specific to the resharding feature.
13101310
// Resharding is currently not supported with CCS.
1311-
IndexReshardingMetadata.NOOP_RESHARD_SPLIT_SHARD_COUNT_SUMMARY
1311+
SplitShardCountSummary.UNSET
13121312
);
13131313
remoteShardIterators.add(shardIterator);
13141314
}
@@ -1999,7 +1999,7 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
19991999
// (it was calculated and used when PIT was created).
20002000
// In read-only scenarios (e.g. searchable snapshots) we don't expect resharding to happen
20012001
// so the value doesn't matter.
2002-
IndexReshardingMetadata.NOOP_RESHARD_SPLIT_SHARD_COUNT_SUMMARY
2002+
SplitShardCountSummary.UNSET
20032003
)
20042004
);
20052005
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,6 @@
8989
* to beginning another resharding operation.
9090
*/
9191
public class IndexReshardingMetadata implements ToXContentFragment, Writeable {
92-
/**
93-
* A value of `reshardSplitShardCountSummary` ({@link IndexMetadata#getReshardSplitShardCountSummaryForSearch(int)})
94-
* that is used when it is known that this value won't affect the correctness of the operation.
95-
*/
96-
public static int NOOP_RESHARD_SPLIT_SHARD_COUNT_SUMMARY = 0;
97-
9892
private static final String SPLIT_FIELD_NAME = "split";
9993
private static final ParseField SPLIT_FIELD = new ParseField(SPLIT_FIELD_NAME);
10094
// This exists only so that tests can verify that IndexReshardingMetadata supports more than one kind of operation.

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId
124124
return shard.activeInitializingShardsRandomIt();
125125
}
126126

127-
private record SearchTargetShard(IndexShardRoutingTable shardRoutingTable, int reshardSplitShardCountSummary) {}
127+
private record SearchTargetShard(IndexShardRoutingTable shardRoutingTable, SplitShardCountSummary reshardSplitShardCountSummary) {}
128128

129129
private static Set<SearchTargetShard> computeTargetedShards(
130130
ProjectState projectState,
@@ -156,7 +156,7 @@ private static Set<SearchTargetShard> collectTargetShardsWithRouting(
156156
shardId -> result.add(
157157
new SearchTargetShard(
158158
RoutingTable.shardRoutingTable(indexRoutingTable, shardId),
159-
indexMetadata.getReshardSplitShardCountSummaryForSearch(shardId)
159+
SplitShardCountSummary.forSearch(indexMetadata, shardId)
160160
)
161161
)
162162
);
@@ -186,18 +186,22 @@ private static Iterator<SearchTargetShard> allSearchAddressableShards(ProjectSta
186186
final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index);
187187
final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index);
188188
if (indexMetadata.getReshardingMetadata() == null) {
189-
return indexRoutingTable.allShards().map(srt -> new SearchTargetShard(srt, indexMetadata.getNumberOfShards())).iterator();
189+
return indexRoutingTable.allShards()
190+
.map(srt -> new SearchTargetShard(srt, SplitShardCountSummary.forSearch(indexMetadata, srt.shardId.id())))
191+
.iterator();
190192
}
191193

192194
final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata();
193195
assert indexReshardingMetadata.isSplit();
194196
final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit();
195197

196198
var shards = new ArrayList<SearchTargetShard>();
197-
for (int i = 0; i < indexRoutingTable.size(); i++) {
198-
if (splitState.isTargetShard(i) == false
199-
|| splitState.targetStateAtLeast(i, IndexReshardingState.Split.TargetShardState.SPLIT)) {
200-
shards.add(new SearchTargetShard(indexRoutingTable.shard(i), indexMetadata.getReshardSplitShardCountSummaryForSearch(i)));
199+
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
200+
if (splitState.isTargetShard(shardId) == false
201+
|| splitState.targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.SPLIT)) {
202+
shards.add(
203+
new SearchTargetShard(indexRoutingTable.shard(shardId), SplitShardCountSummary.forSearch(indexMetadata, shardId))
204+
);
201205
}
202206
}
203207
return shards.iterator();

server/src/main/java/org/elasticsearch/cluster/routing/SearchShardRouting.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
package org.elasticsearch.cluster.routing;
1111

12-
public record SearchShardRouting(ShardIterator iterator, int reshardSplitShardCountSummary) implements Comparable<SearchShardRouting> {
12+
public record SearchShardRouting(ShardIterator iterator, SplitShardCountSummary reshardSplitShardCountSummary)
13+
implements
14+
Comparable<SearchShardRouting> {
1315
@Override
1416
public int compareTo(SearchShardRouting o) {
1517
return iterator.compareTo(o.iterator);

server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.elasticsearch.action.support.IndicesOptions;
2020
import org.elasticsearch.cluster.metadata.AliasMetadata;
2121
import org.elasticsearch.cluster.metadata.IndexMetadata;
22-
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
22+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2323
import org.elasticsearch.common.CheckedBiConsumer;
2424
import org.elasticsearch.common.Strings;
2525
import org.elasticsearch.common.bytes.BytesArray;
@@ -102,24 +102,10 @@ public class ShardSearchRequest extends AbstractTransportRequest implements Indi
102102
*/
103103
private final boolean forceSyntheticSource;
104104

105-
/// The `reshardSplitShardCountSummary` has been added to accommodate the Resharding feature.
106-
///
107-
/// Resharding adds new shards to the index. A search coordinator only sends shard-level search request to the new shard
108-
/// (called a target shard) when coordinator has received a cluster state update with specific state of resharding metadata.
109-
/// See [org.elasticsearch.cluster.routing.OperationRouting#allSearchAddressableShards].
110-
///
111-
/// As such the original shard needs to be able to handle two types of search requests:
112-
/// a search request where target shard is not included in routing,
113-
/// and a search request where target is included.
114-
/// If this is a first type then the source shard performs a search using all data stored,
115-
/// if this is a second type then the source shard performs a search using only a subset of data that is not present
116-
/// on the target shard (since otherwise it would be duplicated between the two).
117-
///
118-
/// This field reflects the state of resharding metadata that the coordinator uses when creating this request
119-
/// and is an indicator of which of the two types of requests this is.
120-
///
121-
/// See also [org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary].
122-
private final int reshardSplitShardCountSummary;
105+
/**
106+
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
107+
*/
108+
private final SplitShardCountSummary reshardSplitShardCountSummary;
123109

124110
public static final TransportVersion SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY = TransportVersion.fromName(
125111
"shard_search_request_reshard_shard_count_summary"
@@ -149,7 +135,7 @@ public ShardSearchRequest(
149135
clusterAlias,
150136
null,
151137
null,
152-
0
138+
SplitShardCountSummary.UNSET
153139
);
154140
}
155141

@@ -165,7 +151,7 @@ public ShardSearchRequest(
165151
@Nullable String clusterAlias,
166152
ShardSearchContextId readerId,
167153
TimeValue keepAlive,
168-
int reshardSplitShardCountSummary
154+
SplitShardCountSummary reshardSplitShardCountSummary
169155
) {
170156
this(
171157
originalIndices,
@@ -237,7 +223,7 @@ public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFi
237223
// This parameter is specific to the resharding feature.
238224
// TODO
239225
// It is currently only supported in _search API and is stubbed here as a result.
240-
IndexReshardingMetadata.NOOP_RESHARD_SPLIT_SHARD_COUNT_SUMMARY
226+
SplitShardCountSummary.UNSET
241227
);
242228
}
243229

@@ -261,7 +247,7 @@ public ShardSearchRequest(
261247
long waitForCheckpoint,
262248
TimeValue waitForCheckpointsTimeout,
263249
boolean forceSyntheticSource,
264-
int reshardSplitShardCountSummary
250+
SplitShardCountSummary reshardSplitShardCountSummary
265251
) {
266252
this.shardId = shardId;
267253
this.shardRequestIndex = shardRequestIndex;
@@ -377,10 +363,11 @@ public ShardSearchRequest(StreamInput in) throws IOException {
377363
forceSyntheticSource = false;
378364
}
379365
if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
380-
reshardSplitShardCountSummary = in.readVInt();
366+
reshardSplitShardCountSummary = SplitShardCountSummary.fromInt(in.readVInt());
381367
} else {
382-
reshardSplitShardCountSummary = 0;
368+
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
383369
}
370+
384371
originalIndices = OriginalIndices.readOriginalIndices(in);
385372
}
386373

@@ -442,7 +429,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
442429
}
443430
}
444431
if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
445-
out.writeVInt(reshardSplitShardCountSummary);
432+
out.writeVInt(reshardSplitShardCountSummary.asInt());
446433
}
447434
}
448435

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
9185000
1+
9190000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_resolve_fields_response_created,9189000
1+
shard_search_request_reshard_shard_count_summary,9190000

0 commit comments

Comments
 (0)