Skip to content

Commit efb503b

Browse files
committed
Delete unowned documents during split
1 parent b9360a4 commit efb503b

File tree

3 files changed

+28
-3
lines changed

3 files changed

+28
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int mul
207207
return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple));
208208
}
209209

210+
public static boolean isSplitSource(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
211+
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isSourceShard(shardId.id());
212+
}
213+
210214
public static boolean isSplitTarget(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
211215
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isTargetShard(shardId.id());
212216
}
@@ -221,6 +225,16 @@ public IndexReshardingMetadata transitionSplitTargetToNewState(
221225
return new IndexReshardingMetadata(builder.build());
222226
}
223227

228+
public IndexReshardingMetadata transitionSplitSourceToNewState(
229+
ShardId shardId,
230+
IndexReshardingState.Split.SourceShardState newSourceState
231+
) {
232+
assert state instanceof IndexReshardingState.Split;
233+
IndexReshardingState.Split.Builder builder = new IndexReshardingState.Split.Builder((IndexReshardingState.Split) state);
234+
builder.setSourceShardState(shardId.getId(), newSourceState);
235+
return new IndexReshardingMetadata(builder.build());
236+
}
237+
224238
/**
225239
* @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split
226240
*/

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121

2222
import java.io.IOException;
2323
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.Iterator;
2426
import java.util.List;
2527
import java.util.Objects;
28+
import java.util.stream.IntStream;
2629
import java.util.stream.Stream;
2730

2831
/**
@@ -350,8 +353,12 @@ public SourceShardState getSourceShardState(int shardNum) {
350353
return sourceShards[shardNum];
351354
}
352355

356+
public boolean isSourceShard(int shardId) {
357+
return shardId < shardCountBefore();
358+
}
359+
353360
public boolean isTargetShard(int shardId) {
354-
return shardId >= shardCountBefore();
361+
return isSourceShard(shardId) == false;
355362
}
356363

357364
/**
@@ -389,6 +396,10 @@ public Stream<TargetShardState> targetStates() {
389396
return Arrays.stream(targetShards);
390397
}
391398

399+
public Stream<SourceShardState> sourceStates() {
400+
return Arrays.stream(sourceShards);
401+
}
402+
392403
/**
393404
* Check whether all target shards for the given source shard are done.
394405
* @param shardNum a source shard index greater than or equal to 0 and less than the original shard count

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
10041004
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();
10051005

10061006
if (sourceShardRouting.active() == false) {
1007-
assert false : sourceShardRouting;
1007+
assert false : sourceShardRouting.shortSummary();
10081008
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
10091009
return null;
10101010
}
@@ -1014,7 +1014,7 @@ private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
10141014
assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId();
10151015
logger.trace(
10161016
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
1017-
sourceShardRouting
1017+
sourceShardRouting.shortSummary()
10181018
);
10191019
return null;
10201020
}

0 commit comments

Comments
 (0)