Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ public static TargetShardState readFrom(StreamInput in) throws IOException {
private final TargetShardState[] targetShards;

Split(SourceShardState[] sourceShards, TargetShardState[] targetShards) {
// The resharding metadata is deleted when the last source shard transitions to done
assert Arrays.stream(sourceShards).allMatch((state) -> state == SourceShardState.DONE) == false;

this.sourceShards = sourceShards;
this.targetShards = targetShards;

Expand Down Expand Up @@ -375,20 +378,6 @@ public boolean targetStateAtLeast(int shardNum, TargetShardState targetShardStat
return getTargetShardState(shardNum).ordinal() >= targetShardState.ordinal();
}

/**
* Check whether this metadata represents an incomplete split
* @return true if the split is incomplete (not all source shards are DONE)
*/
public boolean inProgress() {
for (int i = 0; i < oldShardCount; i++) {
if (sourceShards[i] == SourceShardState.SOURCE) {
return true;
}
}

return false;
}

public Stream<TargetShardState> targetStates() {
return Arrays.stream(targetShards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.stream.IntStream;

public class IndexReshardingMetadataSerializationTests extends AbstractXContentSerializingTestCase<IndexReshardingMetadata> {
@Override
Expand Down Expand Up @@ -46,6 +48,10 @@ private IndexReshardingState.Split createTestSplit() {
for (int i = 0; i < oldShards; i++) {
sourceShardStates[i] = randomFrom(IndexReshardingState.Split.SourceShardState.values());
}
// All sources done is an invalid state, will fail assert in Split constructor
if (Arrays.stream(sourceShardStates).allMatch(state -> state == IndexReshardingState.Split.SourceShardState.DONE)) {
sourceShardStates[0] = IndexReshardingState.Split.SourceShardState.SOURCE;
}
for (int i = 0; i < targetShardStates.length; i++) {
targetShardStates[i] = randomFrom(IndexReshardingState.Split.TargetShardState.values());
}
Expand Down Expand Up @@ -74,16 +80,18 @@ enum Mutation {
var sourceShardStates = split.sourceShards().clone();
var targetShardStates = split.targetShards().clone();

switch (randomFrom(Mutation.values())) {
case SOURCE_SHARD_STATES:
var is = randomInt(sourceShardStates.length - 1);
sourceShardStates[is] = IndexReshardingState.Split.SourceShardState.values()[(sourceShardStates[is].ordinal() + 1)
% IndexReshardingState.Split.SourceShardState.values().length];
break;
case TARGET_SHARD_STATES:
var it = randomInt(targetShardStates.length - 1);
targetShardStates[it] = IndexReshardingState.Split.TargetShardState.values()[(targetShardStates[it].ordinal() + 1)
% IndexReshardingState.Split.TargetShardState.values().length];
var is = randomInt(sourceShardStates.length - 1);
boolean allOtherSourcesDone = IntStream.range(0, sourceShardStates.length)
.filter(i -> i != is)
.allMatch(i -> sourceShardStates[i] == IndexReshardingState.Split.SourceShardState.DONE);
// All sources done is an invalid state, will fail assert in Split constructor
if (allOtherSourcesDone || randomFrom(Mutation.values()) == Mutation.TARGET_SHARD_STATES) {
var it = randomInt(targetShardStates.length - 1);
targetShardStates[it] = IndexReshardingState.Split.TargetShardState.values()[(targetShardStates[it].ordinal() + 1)
% IndexReshardingState.Split.TargetShardState.values().length];
} else {
sourceShardStates[is] = IndexReshardingState.Split.SourceShardState.values()[(sourceShardStates[is].ordinal() + 1)
% IndexReshardingState.Split.SourceShardState.values().length];
}

return new IndexReshardingState.Split(sourceShardStates, targetShardStates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testSplit() {
}

// advance split state randomly and expect to terminate
while (split.inProgress()) {
while (true) {
var splitBuilder = split.builder();
// pick a shard at random and see if we can advance it
int idx = randomIntBetween(0, numShards * multiple - 1);
Expand All @@ -43,6 +43,11 @@ public void testSplit() {
var nextState = randomFrom(IndexReshardingState.Split.SourceShardState.values());
if (nextState.ordinal() == sourceState.ordinal() + 1) {
if (split.targetsDone(idx)) {
long ongoingSourceShards = split.sourceStates()
.filter(state -> state != IndexReshardingState.Split.SourceShardState.DONE)
.count();
// all source shards done is an invalid state, terminate
if (ongoingSourceShards == 1) break;
splitBuilder.setSourceShardState(idx, nextState);
} else {
assertThrows(AssertionError.class, () -> splitBuilder.setSourceShardState(idx, nextState));
Expand All @@ -63,10 +68,10 @@ public void testSplit() {
split = splitBuilder.build();
}

for (int i = 0; i < numShards; i++) {
assertSame(IndexReshardingState.Split.SourceShardState.DONE, split.getSourceShardState(i));
assertFalse(split.isTargetShard(i));
}
assertEquals(
split.sourceStates().filter(state -> state == IndexReshardingState.Split.SourceShardState.DONE).count(),
numShards - 1
);
for (int i = numShards; i < numShards * multiple; i++) {
assertSame(IndexReshardingState.Split.TargetShardState.DONE, split.getTargetShardState(i));
assertTrue(split.isTargetShard(i));
Expand Down