From 170e7348ecc5f1ff1c56946b9597472b743cc3db Mon Sep 17 00:00:00 2001 From: Brendan Cully Date: Mon, 6 Oct 2025 10:43:37 -0700 Subject: [PATCH] Split SplitShardCountSummary into index and search types This was mostly motivated by the fact that they depend on different TransportVersions for serialization, but it also seems like it might be good to protect against using a search summary as an index summary when making staleness decisions, and vice versa. --- .../action/bulk/BulkOperation.java | 6 +- .../action/bulk/BulkShardRequest.java | 10 +-- .../replication/ReplicatedWriteRequest.java | 4 +- .../replication/ReplicationRequest.java | 51 +++----------- .../TransportReplicationAction.java | 6 +- .../routing/IndexSplitShardCountSummary.java | 68 +++++++++++++++++++ .../routing/SearchSplitShardCountSummary.java | 61 +++++++++++++++++ .../routing/SplitShardCountSummary.java | 67 ++---------------- .../search_reshard_shardcount_summary.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- .../routing/SplitShardCountSummaryTests.java | 22 +++--- 11 files changed, 172 insertions(+), 126 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/IndexSplitShardCountSummary.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/SearchSplitShardCountSummary.java create mode 100644 server/src/main/resources/transport/definitions/referable/search_reshard_shardcount_summary.csv diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 69492c67f6ba1..ed858f3f433b9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -41,7 +41,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.IndexRouting; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -404,9 +404,9 @@ private void executeBulkRequestsByShard( // Get effective shardCount for shardId and pass it on as parameter to new BulkShardRequest var indexMetadata = project.index(shardId.getIndexName()); - SplitShardCountSummary reshardSplitShardCountSummary = SplitShardCountSummary.UNSET; + IndexSplitShardCountSummary reshardSplitShardCountSummary = IndexSplitShardCountSummary.UNSET; if (indexMetadata != null) { - reshardSplitShardCountSummary = SplitShardCountSummary.forIndexing(indexMetadata, shardId.getId()); + reshardSplitShardCountSummary = IndexSplitShardCountSummary.fromMetadata(indexMetadata, shardId.getId()); } BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 1ab570d7ffed6..1474e81b145d0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -18,7 +18,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.metadata.InferenceFieldMetadata; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.set.Sets; @@ -53,7 +53,7 @@ public BulkShardRequest(StreamInput in) throws IOException { public BulkShardRequest( ShardId shardId, - SplitShardCountSummary reshardSplitShardCountSummary, + IndexSplitShardCountSummary reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items ) { @@ -61,16 +61,16 @@ public BulkShardRequest( } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) { - this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, false); + this(shardId, IndexSplitShardCountSummary.UNSET, refreshPolicy, items, false); } public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated) { - this(shardId, SplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated); + this(shardId, IndexSplitShardCountSummary.UNSET, refreshPolicy, items, isSimulated); } public BulkShardRequest( ShardId shardId, - SplitShardCountSummary reshardSplitShardCountSummary, + IndexSplitShardCountSummary reshardSplitShardCountSummary, RefreshPolicy refreshPolicy, BulkItemRequest[] items, boolean isSimulated diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index 38dc9a4515322..365fef015b11b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; @@ -48,7 +48,7 @@ public ReplicatedWriteRequest(@Nullable ShardId shardId) { super(shardId); } - public ReplicatedWriteRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { + public ReplicatedWriteRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary) { super(shardId, reshardSplitShardCountSummary); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 7e7a5e0fcd8b1..017a8fcfa9840 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -16,7 +16,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; +import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; @@ -52,42 +52,9 @@ public abstract class ReplicationRequest 4 shards. While splitting a bulk request, the coordinator observes - * that target shards are not ready for indexing. So requests that are meant for shard 0 and 2 are bundled together, - * sent to shard 0 with “reshardSplitShardCountSummary” 2 in the request. - * Requests that are meant for shard 1 and 3 are bundled together, - * sent to shard 1 with “reshardSplitShardCountSummary” 2 in the request. - * - * Example 2: - * Suppose we are resharding an index from 4 -> 8 shards. While splitting a bulk request, the coordinator observes - * that source shard 0 has completed HANDOFF but source shards 1, 2, 3 have not completed handoff. - * So, the shard-bulk-request it sends to shard 0 and 4 has the "reshardSplitShardCountSummary" 8, - * while the shard-bulk-request it sends to shard 1,2,3 has the "reshardSplitShardCountSummary" 4. - * Note that in this case no shard-bulk-request is sent to shards 5, 6, 7 and the requests that were meant for these target shards - * are bundled together with and sent to their source shards. - * - * A value of 0 indicates an INVALID reshardSplitShardCountSummary. Hence, a request with INVALID reshardSplitShardCountSummary - * will be treated as a Summary mismatch on the source shard node. + * See {@link IndexSplitShardCountSummary} for details. */ - protected final SplitShardCountSummary reshardSplitShardCountSummary; + protected final IndexSplitShardCountSummary reshardSplitShardCountSummary; /** * The number of shard copies that must be active before proceeding with the replication action. @@ -101,10 +68,10 @@ public ReplicationRequest(StreamInput in) throws IOException { } public ReplicationRequest(@Nullable ShardId shardId, StreamInput in) throws IOException { - this(shardId, SplitShardCountSummary.UNSET, in); + this(shardId, IndexSplitShardCountSummary.UNSET, in); } - public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary, StreamInput in) + public ReplicationRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary, StreamInput in) throws IOException { super(in); final boolean thinRead = shardId != null; @@ -128,7 +95,7 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh if (thinRead) { this.reshardSplitShardCountSummary = reshardSplitShardCountSummary; } else { - this.reshardSplitShardCountSummary = new SplitShardCountSummary(in); + this.reshardSplitShardCountSummary = new IndexSplitShardCountSummary(in); } } @@ -136,13 +103,13 @@ public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary resh * Creates a new request with resolved shard id */ public ReplicationRequest(@Nullable ShardId shardId) { - this(shardId, SplitShardCountSummary.UNSET); + this(shardId, IndexSplitShardCountSummary.UNSET); } /** * Creates a new request with resolved shard id and reshardSplitShardCountSummary */ - public ReplicationRequest(@Nullable ShardId shardId, SplitShardCountSummary reshardSplitShardCountSummary) { + public ReplicationRequest(@Nullable ShardId shardId, IndexSplitShardCountSummary reshardSplitShardCountSummary) { this.index = shardId == null ? null : shardId.getIndexName(); this.shardId = shardId; this.timeout = DEFAULT_TIMEOUT; @@ -199,7 +166,7 @@ public ShardId shardId() { * @return The effective shard count as seen by the coordinator when creating this request. * can be 0 if this has not yet been resolved. */ - public SplitShardCountSummary reshardSplitShardCountSummary() { + public IndexSplitShardCountSummary reshardSplitShardCountSummary() { return reshardSplitShardCountSummary; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index eabfaf673bffb..b9230cf60aef3 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -32,8 +32,8 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.IndexSplitShardCountSummary; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.SplitShardCountSummary; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -470,10 +470,10 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere throw blockException; } - SplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary(); + IndexSplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary(); assert reshardSplitShardCountSummary.isUnset() || reshardSplitShardCountSummary.equals( - SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId()) + IndexSplitShardCountSummary.fromMetadata(indexMetadata, primaryRequest.getRequest().shardId().getId()) ); if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexSplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexSplitShardCountSummary.java new file mode 100644 index 0000000000000..177670d4b7d30 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexSplitShardCountSummary.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class IndexSplitShardCountSummary extends SplitShardCountSummary { + // superseded + private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary"); + // bumped to use VInt instead of Int + private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small"); + + public static final IndexSplitShardCountSummary UNSET = new IndexSplitShardCountSummary(UNSET_VALUE); + + public IndexSplitShardCountSummary(StreamInput in) throws IOException { + super(readShardCountSummary(in)); + } + + /** + * Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for indexing operations. + * + * See {@code getReshardSplitShardCountSummary} for more details. + * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public static IndexSplitShardCountSummary fromMetadata(IndexMetadata indexMetadata, int shardId) { + return new IndexSplitShardCountSummary( + getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + out.writeVInt(shardCountSummary); + } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + out.writeInt(shardCountSummary); + } + } + + private static int readShardCountSummary(StreamInput in) throws IOException { + if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { + return in.readVInt(); + } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { + return in.readInt(); + } else { + return UNSET_VALUE; + } + } + + IndexSplitShardCountSummary(int shardCountSummary) { + super(shardCountSummary); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SearchSplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SearchSplitShardCountSummary.java new file mode 100644 index 0000000000000..5369337ae0ccf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SearchSplitShardCountSummary.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class SearchSplitShardCountSummary extends SplitShardCountSummary { + private static final TransportVersion SEARCH_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName( + "search_reshard_shardcount_summary" + ); + + public SearchSplitShardCountSummary(StreamInput in) throws IOException { + super(readShardCountSummary(in)); + } + + /** + * Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count + * as seen by this IndexMetadata, for indexing operations. + * + * See {@code getReshardSplitShardCountSummary} for more details. + * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count + * @param shardId Input shardId for which we want to calculate the effective shard count + */ + public static SearchSplitShardCountSummary fromMetadata(IndexMetadata indexMetadata, int shardId) { + return new SearchSplitShardCountSummary( + getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.SPLIT) + ); + } + + private static int readShardCountSummary(StreamInput in) throws IOException { + if (in.getTransportVersion().supports(SEARCH_RESHARD_SHARDCOUNT_SUMMARY)) { + return in.readVInt(); + } else { + return UNSET_VALUE; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().supports(SEARCH_RESHARD_SHARDCOUNT_SUMMARY)) { + out.writeVInt(shardCountSummary); + } + } + + SearchSplitShardCountSummary(int shardCountSummary) { + super(shardCountSummary); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java index 1d68bba8f338f..bbbbc40a248f8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/SplitShardCountSummary.java @@ -9,16 +9,11 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; import org.elasticsearch.cluster.metadata.IndexReshardingState; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import java.io.IOException; - /** * The SplitShardCountSummary has been added to accommodate in-place index resharding. * This is populated when the coordinator is deciding which shards a request applies to. @@ -57,38 +52,11 @@ * will be treated as a Summary mismatch on the source shard node. */ -public class SplitShardCountSummary implements Writeable { - // superseded - private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SUMMARY = TransportVersion.fromName("index_reshard_shardcount_summary"); - // bumped to use VInt instead of Int - private static final TransportVersion INDEX_RESHARD_SHARDCOUNT_SMALL = TransportVersion.fromName("index_reshard_shardcount_small"); - - public static final SplitShardCountSummary UNSET = new SplitShardCountSummary(0); - +public abstract class SplitShardCountSummary implements Writeable { /** - * Given {@code IndexMetadata} and a {@code shardId}, this method returns the "effective" shard count - * as seen by this IndexMetadata, for indexing operations. - * - * See {@code getReshardSplitShardCountSummary} for more details. - * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count - * @param shardId Input shardId for which we want to calculate the effective shard count + * This value indicates that the shard count summary has not been provided. */ - public static SplitShardCountSummary forIndexing(IndexMetadata indexMetadata, int shardId) { - return getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.HANDOFF); - } - - /** - * This method is used in the context of the resharding feature. - * Given a {@code shardId}, this method returns the "effective" shard count - * as seen by this IndexMetadata, for search operations. - * - * See {@code getReshardSplitShardCount} for more details. - * @param indexMetadata IndexMetadata of the shard for which we want to calculate the effective shard count - * @param shardId Input shardId for which we want to calculate the effective shard count - */ - public static SplitShardCountSummary forSearch(IndexMetadata indexMetadata, int shardId) { - return getReshardSplitShardCountSummary(indexMetadata, shardId, IndexReshardingState.Split.TargetShardState.SPLIT); - } + protected static final int UNSET_VALUE = 0; /** * This method is used in the context of the resharding feature. @@ -104,14 +72,12 @@ public static SplitShardCountSummary forSearch(IndexMetadata indexMetadata, int * undergoing a resharding operation. This method is used to populate a field in the shard level requests sent to * source and target shards, as a proxy for the cluster state version. The same calculation is then done at the source shard * to verify if the coordinator and source node's view of the resharding state have a mismatch. - * See {@link org.elasticsearch.action.support.replication.ReplicationRequest#reshardSplitShardCountSummary} - * for a detailed description of how this value is used. * * @param shardId Input shardId for which we want to calculate the effective shard count * @param minShardState Minimum target shard state required for the target to be considered ready * @return Effective shard count as seen by an operation using this IndexMetadata */ - private static SplitShardCountSummary getReshardSplitShardCountSummary( + protected static int getReshardSplitShardCountSummary( IndexMetadata indexMetadata, int shardId, IndexReshardingState.Split.TargetShardState minShardState @@ -134,26 +100,16 @@ private static SplitShardCountSummary getReshardSplitShardCountSummary( } } } - return new SplitShardCountSummary(shardCount); + return shardCount; } - private final int shardCountSummary; - - public SplitShardCountSummary(StreamInput in) throws IOException { - if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - this.shardCountSummary = in.readVInt(); - } else if (in.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { - this.shardCountSummary = in.readInt(); - } else { - this.shardCountSummary = UNSET.shardCountSummary; - } - } + protected final int shardCountSummary; /** * Returns whether this shard count summary is carrying an actual value or is UNSET */ public boolean isUnset() { - return this.shardCountSummary == UNSET.shardCountSummary; + return this.shardCountSummary == UNSET_VALUE; } // visible for testing @@ -161,15 +117,6 @@ public boolean isUnset() { this.shardCountSummary = shardCountSummary; } - @Override - public void writeTo(StreamOutput out) throws IOException { - if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SMALL)) { - out.writeVInt(shardCountSummary); - } else if (out.getTransportVersion().supports(INDEX_RESHARD_SHARDCOUNT_SUMMARY)) { - out.writeInt(shardCountSummary); - } - } - @Override public boolean equals(Object other) { if (this == other) { diff --git a/server/src/main/resources/transport/definitions/referable/search_reshard_shardcount_summary.csv b/server/src/main/resources/transport/definitions/referable/search_reshard_shardcount_summary.csv new file mode 100644 index 0000000000000..c6a65950bd0ab --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/search_reshard_shardcount_summary.csv @@ -0,0 +1 @@ +9188000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index a1daf1f9747d4..b3a934063bc53 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -resolved_index_expressions,9187000 +search_reshard_shardcount_summary,9188000 diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java index 59d9b5008613d..06095ace2d4f5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/SplitShardCountSummaryTests.java @@ -38,11 +38,12 @@ public void testReshardShardCountCalculation() { assertNull(reshardingMetadata); - final var preSplitSummary = new SplitShardCountSummary(numSourceShards); + final var preSplitIndexSummary = new IndexSplitShardCountSummary(numSourceShards); + final var preSplitSearchSummary = new SearchSplitShardCountSummary(numSourceShards); // When there is no resharding going on, the ReshardSplitShardCount is same as number of shards in the index for (int i = 0; i < numSourceShards; i++) { - assertThat(SplitShardCountSummary.forIndexing(indexMetadata, i), equalTo(preSplitSummary)); - assertThat(SplitShardCountSummary.forSearch(indexMetadata, i), equalTo(preSplitSummary)); + assertThat(IndexSplitShardCountSummary.fromMetadata(indexMetadata, i), equalTo(preSplitIndexSummary)); + assertThat(SearchSplitShardCountSummary.fromMetadata(indexMetadata, i), equalTo(preSplitSearchSummary)); } // Now reshard-split from 2 shards to 4 shards @@ -58,14 +59,15 @@ public void testReshardShardCountCalculation() { assertEquals(numSourceShards, reshardingMetadata.shardCountBefore()); assertEquals(numSourceShards * multiple, reshardingMetadata.shardCountAfter()); final int numShardsAfterReshard = reshardingMetadata.shardCountAfter(); - final var postSplitSummary = new SplitShardCountSummary(numShardsAfterReshard); + final var postSplitIndexSummary = new IndexSplitShardCountSummary(numShardsAfterReshard); + final var postSplitSearchSummary = new SearchSplitShardCountSummary(numShardsAfterReshard); // All target shards in CLONE state for (int i = 0; i < numSourceShards; i++) { assertTrue(reshardingMetadata.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.CLONE)); - assertThat(SplitShardCountSummary.forIndexing(indexMetadataAfterReshard, i), equalTo(preSplitSummary)); - assertThat(SplitShardCountSummary.forSearch(indexMetadataAfterReshard, i), equalTo(preSplitSummary)); + assertThat(IndexSplitShardCountSummary.fromMetadata(indexMetadataAfterReshard, i), equalTo(preSplitIndexSummary)); + assertThat(SearchSplitShardCountSummary.fromMetadata(indexMetadataAfterReshard, i), equalTo(preSplitSearchSummary)); } var indexReshardingMetadataHandoff = reshardingMetadata; @@ -85,8 +87,8 @@ public void testReshardShardCountCalculation() { indexReshardingMetadataHandoff.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.HANDOFF) ); - assertThat(SplitShardCountSummary.forIndexing(indexMetadataHandoff, i), equalTo(postSplitSummary)); - assertThat(SplitShardCountSummary.forSearch(indexMetadataHandoff, i), equalTo(preSplitSummary)); + assertThat(IndexSplitShardCountSummary.fromMetadata(indexMetadataHandoff, i), equalTo(postSplitIndexSummary)); + assertThat(SearchSplitShardCountSummary.fromMetadata(indexMetadataHandoff, i), equalTo(preSplitSearchSummary)); } var indexReshardingMetadataSplit = indexReshardingMetadataHandoff; @@ -104,8 +106,8 @@ public void testReshardShardCountCalculation() { indexReshardingMetadataSplit.getSplit().allTargetStatesAtLeast(i, IndexReshardingState.Split.TargetShardState.SPLIT) ); - assertThat(SplitShardCountSummary.forIndexing(indexMetadataSplit, i), equalTo(postSplitSummary)); - assertThat(SplitShardCountSummary.forSearch(indexMetadataSplit, i), equalTo(postSplitSummary)); + assertThat(IndexSplitShardCountSummary.fromMetadata(indexMetadataSplit, i), equalTo(postSplitIndexSummary)); + assertThat(SearchSplitShardCountSummary.fromMetadata(indexMetadataSplit, i), equalTo(postSplitSearchSummary)); } } }