diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 2a6391ad8665b..2b4e48e935c93 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -561,6 +561,8 @@ public Iterator> settings() { public static final String KEY_INFERENCE_FIELDS = "field_inference"; + public static final String KEY_RESHARDING = "resharding"; + public static final String INDEX_STATE_FILE_PREFIX = "state-"; static final TransportVersion STATS_AND_FORECAST_ADDED = TransportVersions.V_8_6_0; @@ -654,6 +656,8 @@ public Iterator> settings() { private final Double writeLoadForecast; @Nullable private final Long shardSizeInBytesForecast; + @Nullable + private final IndexReshardingMetadata reshardingMetadata; private IndexMetadata( final Index index, @@ -702,7 +706,8 @@ private IndexMetadata( final IndexVersion indexCompatibilityVersion, @Nullable final IndexMetadataStats stats, @Nullable final Double writeLoadForecast, - @Nullable Long shardSizeInBytesForecast + @Nullable Long shardSizeInBytesForecast, + @Nullable IndexReshardingMetadata reshardingMetadata ) { this.index = index; this.version = version; @@ -761,6 +766,7 @@ private IndexMetadata( this.writeLoadForecast = writeLoadForecast; this.shardSizeInBytesForecast = shardSizeInBytesForecast; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; + this.reshardingMetadata = reshardingMetadata; } IndexMetadata withMappingMetadata(MappingMetadata mapping) { @@ -814,7 +820,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) { this.indexCompatibilityVersion, this.stats, this.writeLoadForecast, - this.shardSizeInBytesForecast + this.shardSizeInBytesForecast, + this.reshardingMetadata ); } @@ -875,7 +882,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set inSyncSet) this.indexCompatibilityVersion, this.stats, this.writeLoadForecast, - this.shardSizeInBytesForecast + this.shardSizeInBytesForecast, + this.reshardingMetadata ); } @@ -934,7 +942,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) { this.indexCompatibilityVersion, this.stats, this.writeLoadForecast, - this.shardSizeInBytesForecast + this.shardSizeInBytesForecast, + this.reshardingMetadata ); } @@ -994,7 +1003,8 @@ public IndexMetadata withTimestampRanges(IndexLongFieldRange timestampRange, Ind this.indexCompatibilityVersion, this.stats, this.writeLoadForecast, - this.shardSizeInBytesForecast + this.shardSizeInBytesForecast, + this.reshardingMetadata ); } @@ -1049,7 +1059,8 @@ public IndexMetadata withIncrementedVersion() { this.indexCompatibilityVersion, this.stats, this.writeLoadForecast, - this.shardSizeInBytesForecast + this.shardSizeInBytesForecast, + this.reshardingMetadata ); } @@ -1900,6 +1911,7 @@ public static class Builder { private IndexMetadataStats stats = null; private Double indexWriteLoadForecast = null; private Long shardSizeInBytesForecast = null; + private IndexReshardingMetadata reshardingMetadata = null; public Builder(String index) { this.index = index; @@ -1935,6 +1947,7 @@ public Builder(IndexMetadata indexMetadata) { this.stats = indexMetadata.stats; this.indexWriteLoadForecast = indexMetadata.writeLoadForecast; this.shardSizeInBytesForecast = indexMetadata.shardSizeInBytesForecast; + this.reshardingMetadata = indexMetadata.reshardingMetadata; } public Builder index(String index) { @@ -2224,6 +2237,11 @@ public Builder putInferenceFields(Map values) { return this; } + public Builder reshardingMetadata(IndexReshardingMetadata reshardingMetadata) { + this.reshardingMetadata = reshardingMetadata; + return this; + } + public IndexMetadata build() { return build(false); } @@ -2423,7 +2441,8 @@ IndexMetadata build(boolean repair) { SETTING_INDEX_VERSION_COMPATIBILITY.get(settings), stats, indexWriteLoadForecast, - shardSizeInBytesForecast + shardSizeInBytesForecast, + reshardingMetadata ); } @@ -2563,6 +2582,12 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build builder.endObject(); } + if (indexMetadata.reshardingMetadata != null) { + builder.startObject(KEY_RESHARDING); + indexMetadata.reshardingMetadata.toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); } @@ -2649,6 +2674,9 @@ public static IndexMetadata fromXContent(XContentParser parser, Map PARSER = new ConstructingObjectParser<>( + "index_resharding_metadata", + args -> { + // the parser ensures exactly one argument will not be null + if (args[0] != null) { + return new IndexReshardingMetadata((IndexReshardingState) args[0]); + } else { + return new IndexReshardingMetadata((IndexReshardingState) args[1]); + } + } + ); + + static { + PARSER.declareObjectOrNull( + ConstructingObjectParser.optionalConstructorArg(), + (parser, c) -> IndexReshardingState.Split.fromXContent(parser), + null, + SPLIT_FIELD + ); + PARSER.declareObjectOrNull( + ConstructingObjectParser.optionalConstructorArg(), + (parser, c) -> IndexReshardingState.Noop.fromXContent(parser), + null, + NOOP_FIELD + ); + PARSER.declareExclusiveFieldSet(SPLIT_FIELD.getPreferredName(), NOOP_FIELD.getPreferredName()); + PARSER.declareRequiredFieldSet(SPLIT_FIELD.getPreferredName(), NOOP_FIELD.getPreferredName()); + } + + private final IndexReshardingState state; + + // visible for testing + IndexReshardingMetadata(IndexReshardingState state) { + this.state = state; + } + + public IndexReshardingMetadata(StreamInput in) throws IOException { + var stateName = in.readString(); + + state = switch (stateName) { + case NOOP_FIELD_NAME -> new IndexReshardingState.Noop(in); + case SPLIT_FIELD_NAME -> new IndexReshardingState.Split(in); + default -> throw new IllegalStateException("unknown operation [" + stateName + "]"); + }; + } + + // for testing + IndexReshardingState getState() { + return state; + } + + static IndexReshardingMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String name = switch (state) { + case IndexReshardingState.Noop ignored -> NOOP_FIELD.getPreferredName(); + case IndexReshardingState.Split ignored -> SPLIT_FIELD.getPreferredName(); + }; + builder.startObject(name); + state.toXContent(builder, params); + builder.endObject(); + + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + String name = switch (state) { + case IndexReshardingState.Noop ignored -> NOOP_FIELD.getPreferredName(); + case IndexReshardingState.Split ignored -> SPLIT_FIELD.getPreferredName(); + }; + out.writeString(name); + state.writeTo(out); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + IndexReshardingMetadata otherMetadata = (IndexReshardingMetadata) other; + + return Objects.equals(state, otherMetadata.state); + } + + @Override + public int hashCode() { + return Objects.hash(state); + } + + public String toString() { + return "IndexReshardingMetadata [state=" + state + "]"; + } + + /** + * Create resharding metadata representing a new split operation + * Split only supports updating an index to a multiple of its current shard count + * @param shardCount the number of shards in the index at the start of the operation + * @param multiple the new shard count is shardCount * multiple + * @return resharding metadata representing the start of the requested split + */ + public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int multiple) { + return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple)); + } + + public IndexReshardingState.Split getSplit() { + return switch (state) { + case IndexReshardingState.Noop ignored -> throw new IllegalArgumentException("resharding metadata is not a split"); + case IndexReshardingState.Split s -> s; + }; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java new file mode 100644 index 0000000000000..c89417ccdcc0f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java @@ -0,0 +1,384 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * IndexReshardingState is an abstract class holding the persistent state of a generic resharding operation. It contains + * concrete subclasses for the operations that are currently defined (which is only split for now). + */ +public abstract sealed class IndexReshardingState implements Writeable, ToXContentFragment { + // This class exists only so that tests can check that IndexReshardingMetadata can support more than one kind of operation. + // When we have another real operation such as Shrink this can be removed. + public static final class Noop extends IndexReshardingState { + private static final ObjectParser NOOP_PARSER = new ObjectParser<>("noop", Noop::new); + + Noop() {} + + Noop(StreamInput in) throws IOException { + this(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + + static IndexReshardingState fromXContent(XContentParser parser) throws IOException { + return NOOP_PARSER.parse(parser, null); + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + return other != null && getClass() == other.getClass(); + } + + @Override + public int hashCode() { + return 0; + } + } + + public static final class Split extends IndexReshardingState { + public enum SourceShardState implements Writeable { + /** + * The argument is for serialization because using the ordinal breaks if + * any values in the enum are reordered. It must not be changed once defined. + */ + SOURCE((byte) 0), + DONE((byte) 1); + + private final byte code; + + SourceShardState(byte code) { + this.code = code; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(this.code); + } + + public static SourceShardState readFrom(StreamInput in) throws IOException { + var code = in.readByte(); + return switch (code) { + case 0 -> SOURCE; + case 1 -> DONE; + default -> throw new IllegalStateException("unknown source shard state [" + code + "]"); + }; + } + } + + public enum TargetShardState implements Writeable { + /** + * The argument is for serialization because using the ordinal breaks if + * any values in the enum are reordered. It must not be changed once defined. + */ + CLONE((byte) 0), + HANDOFF((byte) 1), + SPLIT((byte) 2), + DONE((byte) 3); + + private final byte code; + + TargetShardState(byte code) { + this.code = code; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(this.code); + } + + public static TargetShardState readFrom(StreamInput in) throws IOException { + var code = in.readByte(); + return switch (code) { + case 0 -> CLONE; + case 1 -> HANDOFF; + case 2 -> SPLIT; + case 3 -> DONE; + default -> throw new IllegalStateException("unknown target shard state [" + code + "]"); + }; + } + } + + private static final ParseField SOURCE_SHARDS_FIELD = new ParseField("source_shards"); + private static final ParseField TARGET_SHARDS_FIELD = new ParseField("target_shards"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser SPLIT_PARSER = new ConstructingObjectParser<>( + "split", + args -> new Split( + ((List) args[0]).toArray(new SourceShardState[0]), + ((List) args[1]).toArray(new TargetShardState[0]) + ) + ); + + static { + SPLIT_PARSER.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (parser, c) -> SourceShardState.valueOf(parser.text()), + SOURCE_SHARDS_FIELD + ); + SPLIT_PARSER.declareObjectArray( + ConstructingObjectParser.constructorArg(), + (parser, c) -> TargetShardState.valueOf(parser.text()), + TARGET_SHARDS_FIELD + ); + } + + private final int oldShardCount; + private final int newShardCount; + private final SourceShardState[] sourceShards; + private final TargetShardState[] targetShards; + + Split(SourceShardState[] sourceShards, TargetShardState[] targetShards) { + this.sourceShards = sourceShards; + this.targetShards = targetShards; + + oldShardCount = sourceShards.length; + newShardCount = oldShardCount + targetShards.length; + } + + Split(StreamInput in) throws IOException { + this( + in.readArray(SourceShardState::readFrom, SourceShardState[]::new), + in.readArray(TargetShardState::readFrom, TargetShardState[]::new) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeArray(sourceShards); + out.writeArray(targetShards); + } + + static Split fromXContent(XContentParser parser) throws IOException { + return SPLIT_PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(SOURCE_SHARDS_FIELD.getPreferredName(), sourceShards); + builder.field(TARGET_SHARDS_FIELD.getPreferredName(), targetShards); + + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + Split otherState = (Split) other; + // we can ignore oldShardCount and newShardCount since they are derived + return Arrays.equals(sourceShards, otherState.sourceShards) && Arrays.equals(targetShards, otherState.targetShards); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(sourceShards), Arrays.hashCode(targetShards)); + } + + // visible for testing + int oldShardCount() { + return oldShardCount; + } + + // visible for testing + int newShardCount() { + return newShardCount; + } + + // visible for testing + SourceShardState[] sourceShards() { + return sourceShards.clone(); + } + + // visible for testing + TargetShardState[] targetShards() { + return targetShards.clone(); + } + + /** + * Create resharding metadata representing a new split operation + * Split only supports updating an index to a multiple of its current shard count + * @param shardCount the number of shards in the index at the start of the operation + * @param multiple the new shard count is shardCount * multiple + * @return Split representing the start of the requested split + */ + public static Split newSplitByMultiple(int shardCount, int multiple) { + assert multiple > 1 : "multiple must be greater than 1"; + + final int newShardCount = shardCount * multiple; + final var sourceShards = new SourceShardState[shardCount]; + final var targetShards = new TargetShardState[newShardCount - shardCount]; + Arrays.fill(sourceShards, SourceShardState.SOURCE); + Arrays.fill(targetShards, TargetShardState.CLONE); + + return new Split(sourceShards, targetShards); + } + + public static class Builder { + private final SourceShardState[] sourceShards; + private final TargetShardState[] targetShards; + + public Builder(IndexReshardingState.Split split) { + this.sourceShards = split.sourceShards(); + this.targetShards = split.targetShards(); + } + + /** + * Set the shard state of a source shard + * Currently the only legal transition is from SOURCE to DONE and any other transition will assert. + * This could be expressed through a markSourceDone API but this form is the same shape as {@link #setTargetShardState} + * and leaves the door open for additional source states. + * @param shardNum an index into the shards which must be no greater than the number of shards before split + * @param sourceShardState the state to which the shard should be set + */ + public void setSourceShardState(int shardNum, SourceShardState sourceShardState) { + assert shardNum >= 0 && shardNum < sourceShards.length : "source shardNum is out of bounds"; + assert sourceShards[shardNum].ordinal() + 1 == sourceShardState.ordinal() : "invalid source shard state transition"; + assert sourceShardState == SourceShardState.DONE : "can only move source shard state to DONE"; + var split = new Split(sourceShards, targetShards); + for (var target : split.getTargetStatesFor(shardNum)) { + assert target == TargetShardState.DONE : "can only move source shard to DONE when all targets are DONE"; + } + + sourceShards[shardNum] = sourceShardState; + } + + /** + * Set the target state of a shard + * The only legal state in the split state machine is the one following the shard's current state. + * The reason for this API rather than an advanceState API is to confirm that the caller knows + * what the current state is when setting it. + * @param shardNum an index into shards greater than or equal to the old shard count and less than the new shard count + * @param targetShardState the state to which the shard should be set + */ + public void setTargetShardState(int shardNum, TargetShardState targetShardState) { + var targetShardNum = shardNum - sourceShards.length; + + assert targetShardNum >= 0 && targetShardNum < targetShards.length : "target shardNum is out of bounds"; + assert targetShards[targetShardNum].ordinal() + 1 == targetShardState.ordinal() : "invalid target shard state transition"; + + targetShards[targetShardNum] = targetShardState; + } + + /** + * Build a new Split + * @return Split reflecting the current state of the builder + */ + public Split build() { + return new Split(sourceShards, targetShards); + } + } + + /** + * Create a Builder from the state of this Split + * The Split itself is immutable. Modifications can be applied to the builder, + * which can be used to replace the current Split in IndexMetadata when it in turn is built. + * @return a Builder reflecting the state of this split + */ + public Builder builder() { + return new Builder(this); + } + + /** + * Get the current shard state of a source shard + * @param shardNum an index into the shards which must be no greater than the number of shards before split + * @return the source shard state of the shard identified by shardNum + */ + public SourceShardState getSourceShardState(int shardNum) { + assert shardNum >= 0 && shardNum < sourceShards.length : "source shardNum is out of bounds"; + + return sourceShards[shardNum]; + } + + /** + * Get the current target state of a shard + * @param shardNum an index into shards greater than or equal to the old shard count and less than the new shard count + * @return the target shard state for the shard identified by shardNum + */ + public TargetShardState getTargetShardState(int shardNum) { + var targetShardNum = shardNum - oldShardCount; + + assert targetShardNum >= 0 && targetShardNum < targetShards.length : "target shardNum is out of bounds"; + + return targetShards[targetShardNum]; + } + + /** + * Check whether this metadata represents an incomplete split + * @return true if the split is incomplete (not all source shards are DONE) + */ + public boolean inProgress() { + for (int i = 0; i < oldShardCount; i++) { + if (sourceShards[i] == SourceShardState.SOURCE) { + return true; + } + } + + return false; + } + + /** + * Check whether all target shards for the given source shard are done. + * @param shardNum a source shard index greater than or equal to 0 and less than the original shard count + * @return true if all target shards for the given source shard are done. + */ + public boolean targetsDone(int shardNum) { + var targets = getTargetStatesFor(shardNum); + return Arrays.stream(targets).allMatch(target -> target == IndexReshardingState.Split.TargetShardState.DONE); + } + + /** + * Get all the target shard states related to the given source shard + * @param shardNum a source shard index greater than or equal to 0 and less than the original shard count + * @return an array of target shard states in order for the given shard + */ + private TargetShardState[] getTargetStatesFor(int shardNum) { + final int numTargets = newShardCount / oldShardCount - 1; + // it might be useful to return the target shard's index as well as state, can iterate on this + TargetShardState[] targets = new TargetShardState[numTargets]; + + int cur = shardNum + oldShardCount; + for (int i = 0; i < numTargets; i++) { + targets[i] = getTargetShardState(cur); + cur += oldShardCount; + } + + return targets; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index 4abd0c4a9d469..6549feda6dc6e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -60,7 +60,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -94,6 +93,7 @@ public void testIndexMetadataSerialization() throws IOException { Double indexWriteLoadForecast = randomBoolean() ? randomDoubleBetween(0.0, 128, true) : null; Long shardSizeInBytesForecast = randomBoolean() ? randomLongBetween(1024, 10240) : null; Map inferenceFields = randomInferenceFields(); + IndexReshardingMetadata reshardingMetadata = randomBoolean() ? randomIndexReshardingMetadata(numShard) : null; IndexMetadata metadata = IndexMetadata.builder("foo") .settings(indexSettings(numShard, numberOfReplicas).put("index.version.created", 1)) @@ -129,6 +129,7 @@ public void testIndexMetadataSerialization() throws IOException { IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(5000000, 5500000)) ) ) + .reshardingMetadata(reshardingMetadata) .build(); assertEquals(system, metadata.isSystem()); @@ -706,4 +707,8 @@ private IndexMetadataStats randomIndexStats(int numberOfShards) { } return new IndexMetadataStats(indexWriteLoadBuilder.build(), randomLongBetween(100, 1024), randomIntBetween(1, 2)); } + + private IndexReshardingMetadata randomIndexReshardingMetadata(int oldShards) { + return IndexReshardingMetadata.newSplitByMultiple(oldShards, randomIntBetween(2, 5)); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataSerializationTests.java new file mode 100644 index 0000000000000..1decbe64554d9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataSerializationTests.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +public class IndexReshardingMetadataSerializationTests extends AbstractXContentSerializingTestCase { + @Override + protected IndexReshardingMetadata doParseInstance(XContentParser parser) throws IOException { + return IndexReshardingMetadata.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return IndexReshardingMetadata::new; + } + + @Override + protected IndexReshardingMetadata createTestInstance() { + if (randomBoolean()) { + return new IndexReshardingMetadata(createTestSplit()); + } else { + return new IndexReshardingMetadata(new IndexReshardingState.Noop()); + } + } + + private IndexReshardingState.Split createTestSplit() { + final int oldShards = randomIntBetween(1, 100); + final int newShards = randomIntBetween(2, 5) * oldShards; + final var sourceShardStates = new IndexReshardingState.Split.SourceShardState[oldShards]; + final var targetShardStates = new IndexReshardingState.Split.TargetShardState[newShards - oldShards]; + // Semantically it is illegal for SourceShardState to be DONE before all corresponding target shards are + // DONE but these tests are exercising equals/hashcode not semantic integrity. Letting the source shard + // state vary randomly gives better coverage in fewer instances even though it is wrong semantically. + for (int i = 0; i < oldShards; i++) { + sourceShardStates[i] = randomFrom(IndexReshardingState.Split.SourceShardState.values()); + } + for (int i = 0; i < targetShardStates.length; i++) { + targetShardStates[i] = randomFrom(IndexReshardingState.Split.TargetShardState.values()); + } + + return new IndexReshardingState.Split(sourceShardStates, targetShardStates); + } + + @Override + protected IndexReshardingMetadata mutateInstance(IndexReshardingMetadata instance) throws IOException { + var state = switch (instance.getState()) { + case IndexReshardingState.Noop ignored -> createTestSplit(); + case IndexReshardingState.Split split -> mutateSplit(split); + }; + return new IndexReshardingMetadata(state); + } + + // To exercise equals() we want to mutate exactly one thing randomly so we know that each component + // is contributing to equality testing. Some of these mutations are semantically illegal but we ignore + // that here. + private IndexReshardingState.Split mutateSplit(IndexReshardingState.Split split) { + enum Mutation { + SOURCE_SHARD_STATES, + TARGET_SHARD_STATES + } + + var sourceShardStates = split.sourceShards().clone(); + var targetShardStates = split.targetShards().clone(); + + switch (randomFrom(Mutation.values())) { + case SOURCE_SHARD_STATES: + var is = randomInt(sourceShardStates.length - 1); + sourceShardStates[is] = IndexReshardingState.Split.SourceShardState.values()[(sourceShardStates[is].ordinal() + 1) + % IndexReshardingState.Split.SourceShardState.values().length]; + break; + case TARGET_SHARD_STATES: + var it = randomInt(targetShardStates.length - 1); + targetShardStates[it] = IndexReshardingState.Split.TargetShardState.values()[(targetShardStates[it].ordinal() + 1) + % IndexReshardingState.Split.TargetShardState.values().length]; + } + + return new IndexReshardingState.Split(sourceShardStates, targetShardStates); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java new file mode 100644 index 0000000000000..3f3acb0b9aa2c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadataTests.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.test.ESTestCase; + +public class IndexReshardingMetadataTests extends ESTestCase { + // test that we can drive a split through all valid state transitions in random order and terminate + public void testSplit() { + final var numShards = randomIntBetween(1, 10); + final var multiple = randomIntBetween(2, 5); + + final var metadata = IndexReshardingMetadata.newSplitByMultiple(numShards, multiple); + var split = metadata.getSplit(); + + // starting state is as expected + assert split.oldShardCount() == numShards; + assert split.newShardCount() == numShards * multiple; + for (int i = 0; i < numShards; i++) { + assert split.getSourceShardState(i) == IndexReshardingState.Split.SourceShardState.SOURCE; + } + for (int i = numShards; i < numShards * multiple; i++) { + assert split.getTargetShardState(i) == IndexReshardingState.Split.TargetShardState.CLONE; + } + + // advance split state randomly and expect to terminate + while (split.inProgress()) { + var splitBuilder = split.builder(); + // pick a shard at random and see if we can advance it + int idx = randomIntBetween(0, numShards * multiple - 1); + if (idx < numShards) { + // can we advance source? + var sourceState = split.getSourceShardState(idx); + var nextState = randomFrom(IndexReshardingState.Split.SourceShardState.values()); + if (nextState.ordinal() == sourceState.ordinal() + 1) { + if (split.targetsDone(idx)) { + splitBuilder.setSourceShardState(idx, nextState); + } else { + assertThrows(AssertionError.class, () -> splitBuilder.setSourceShardState(idx, nextState)); + } + } else { + assertThrows(AssertionError.class, () -> splitBuilder.setSourceShardState(idx, nextState)); + } + } else { + // can we advance target? + var targetState = split.getTargetShardState(idx); + var nextState = randomFrom(IndexReshardingState.Split.TargetShardState.values()); + if (nextState.ordinal() == targetState.ordinal() + 1) { + splitBuilder.setTargetShardState(idx, nextState); + } else { + assertThrows(AssertionError.class, () -> splitBuilder.setTargetShardState(idx, nextState)); + } + } + split = splitBuilder.build(); + } + + for (int i = 0; i < numShards; i++) { + assert split.getSourceShardState(i) == IndexReshardingState.Split.SourceShardState.DONE; + } + for (int i = numShards; i < numShards * multiple; i++) { + assert split.getTargetShardState(i) == IndexReshardingState.Split.TargetShardState.DONE; + } + } +}