Skip to content

Commit b24bb35

Browse files
authored
Add new recovery source for reshard split target shards (#129159)
1 parent 4656a53 commit b24bb35

File tree

8 files changed

+91
-11
lines changed

8 files changed

+91
-11
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
5858
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
5959
// so no additional space is going to be used until future merges
6060
case LOCAL_SHARDS -> false;
61+
case RESHARD_SPLIT -> false;
6162
};
6263
}
6364

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
1616
import org.elasticsearch.index.IndexVersion;
17+
import org.elasticsearch.index.shard.ShardId;
1718
import org.elasticsearch.repositories.IndexId;
1819
import org.elasticsearch.snapshots.Snapshot;
1920
import org.elasticsearch.xcontent.ToXContent;
@@ -31,6 +32,7 @@
3132
* - {@link PeerRecoverySource} recovery from a primary on another node
3233
* - {@link SnapshotRecoverySource} recovery from a snapshot
3334
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
35+
* - {@link ReshardSplitRecoverySource} recovery of a shard that is created as a result of a resharding split
3436
*/
3537
public abstract class RecoverySource implements Writeable, ToXContentObject {
3638

@@ -57,6 +59,7 @@ public static RecoverySource readFrom(StreamInput in) throws IOException {
5759
case PEER -> PeerRecoverySource.INSTANCE;
5860
case SNAPSHOT -> new SnapshotRecoverySource(in);
5961
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
62+
case RESHARD_SPLIT -> new ReshardSplitRecoverySource(in);
6063
};
6164
}
6265

@@ -78,7 +81,8 @@ public enum Type {
7881
EXISTING_STORE,
7982
PEER,
8083
SNAPSHOT,
81-
LOCAL_SHARDS
84+
LOCAL_SHARDS,
85+
RESHARD_SPLIT
8286
}
8387

8488
public abstract Type getType();
@@ -319,4 +323,39 @@ public boolean expectEmptyRetentionLeases() {
319323
return false;
320324
}
321325
}
326+
327+
/**
328+
* Recovery of a shard that is created as a result of a resharding split.
329+
* Not to be confused with _split API.
330+
*/
331+
public static class ReshardSplitRecoverySource extends RecoverySource {
332+
private final ShardId sourceShardId;
333+
334+
public ReshardSplitRecoverySource(ShardId sourceShardId) {
335+
this.sourceShardId = sourceShardId;
336+
}
337+
338+
ReshardSplitRecoverySource(StreamInput in) throws IOException {
339+
sourceShardId = new ShardId(in);
340+
}
341+
342+
@Override
343+
public Type getType() {
344+
return Type.RESHARD_SPLIT;
345+
}
346+
347+
public ShardId getSourceShardId() {
348+
return sourceShardId;
349+
}
350+
351+
@Override
352+
protected void writeAdditionalFields(StreamOutput out) throws IOException {
353+
sourceShardId.writeTo(out);
354+
}
355+
356+
@Override
357+
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
358+
sourceShardId.toXContent(builder, params);
359+
}
360+
}
322361
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3507,7 +3507,12 @@ public void startRecovery(
35073507
// }
35083508
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
35093509
switch (recoveryState.getRecoverySource().getType()) {
3510-
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
3510+
case EMPTY_STORE, EXISTING_STORE, RESHARD_SPLIT -> executeRecovery(
3511+
"from store",
3512+
recoveryState,
3513+
recoveryListener,
3514+
this::recoverFromStore
3515+
);
35113516
case PEER -> {
35123517
try {
35133518
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ public final class StoreRecovery {
8989
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
9090
if (canRecover(indexShard)) {
9191
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
92-
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
93-
: "expected store recovery type but was: " + recoveryType;
92+
assert recoveryType == RecoverySource.Type.EMPTY_STORE
93+
|| recoveryType == RecoverySource.Type.EXISTING_STORE
94+
|| recoveryType == RecoverySource.Type.RESHARD_SPLIT : "expected one of store recovery types but was: " + recoveryType;
9495
logger.debug("starting recovery from store ...");
9596
final var recoveryListener = recoveryListener(indexShard, listener);
9697
try {

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.node.DiscoveryNodes;
3232
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
33+
import org.elasticsearch.cluster.routing.RecoverySource;
3334
import org.elasticsearch.cluster.routing.RecoverySource.Type;
3435
import org.elasticsearch.cluster.routing.RoutingNode;
3536
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -701,6 +702,13 @@ private void createShard(ShardRouting shardRouting, ClusterState state) {
701702
logger.trace("ignoring initializing shard {} - no source node can be found.", shardId);
702703
return;
703704
}
705+
} else if (shardRouting.recoverySource() instanceof RecoverySource.ReshardSplitRecoverySource reshardSplitRecoverySource) {
706+
ShardId sourceShardId = reshardSplitRecoverySource.getSourceShardId();
707+
sourceNode = findSourceNodeForReshardSplitRecovery(state.routingTable(project.id()), state.nodes(), sourceShardId);
708+
if (sourceNode == null) {
709+
logger.trace("ignoring initializing reshard target shard {} - no source node can be found.", shardId);
710+
return;
711+
}
704712
} else {
705713
sourceNode = null;
706714
}
@@ -988,6 +996,31 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingT
988996
return sourceNode;
989997
}
990998

999+
private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
1000+
RoutingTable routingTable,
1001+
DiscoveryNodes nodes,
1002+
ShardId sourceShardId
1003+
) {
1004+
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();
1005+
1006+
if (sourceShardRouting.active() == false) {
1007+
assert false : sourceShardRouting;
1008+
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
1009+
return null;
1010+
}
1011+
1012+
DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId());
1013+
if (sourceNode == null) {
1014+
assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId();
1015+
logger.trace(
1016+
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
1017+
sourceShardRouting
1018+
);
1019+
return null;
1020+
}
1021+
return sourceNode;
1022+
}
1023+
9911024
private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {}
9921025

9931026
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,10 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
110110
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
111111
this(shardRouting.shardId(), shardRouting.primary(), shardRouting.recoverySource(), sourceNode, targetNode, index, new Timer());
112112
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
113-
assert (shardRouting.recoverySource().getType() == RecoverySource.Type.PEER) == (sourceNode != null)
114-
: "peer recovery requires source node, recovery type: "
115-
+ shardRouting.recoverySource().getType()
116-
+ " source node: "
117-
+ sourceNode;
113+
assert shardRouting.recoverySource().getType() != RecoverySource.Type.PEER || sourceNode != null
114+
: "peer recovery requires source node but it is null";
115+
assert shardRouting.recoverySource().getType() != RecoverySource.Type.RESHARD_SPLIT || sourceNode != null
116+
: "reshard split target recovery requires source node but it is null";
118117
timer.start();
119118
}
120119

server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ public void testRecoverySourceTypeOrder() {
3434
assertEquals(RecoverySource.Type.PEER.ordinal(), 2);
3535
assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3);
3636
assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4);
37+
assertEquals(RecoverySource.Type.RESHARD_SPLIT.ordinal(), 5);
3738
// check exhaustiveness
3839
for (RecoverySource.Type type : RecoverySource.Type.values()) {
3940
assertThat(type.ordinal(), greaterThanOrEqualTo(0));
40-
assertThat(type.ordinal(), lessThanOrEqualTo(4));
41+
assertThat(type.ordinal(), lessThanOrEqualTo(5));
4142
}
4243
}
4344
}

test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ public static RecoverySource buildRecoverySource() {
275275
new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())),
276276
IndexVersion.current(),
277277
new IndexId("some_index", randomUUID())
278-
)
278+
),
279+
new RecoverySource.ReshardSplitRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000)))
279280
);
280281
}
281282
}

0 commit comments

Comments
 (0)