Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
// so no additional space is going to be used until future merges
case LOCAL_SHARDS -> false;
case RESHARD_SPLIT -> false;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -31,6 +32,7 @@
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
* - {@link ReshardSplitRecoverySource} recovery of a shard that is created as a result of a resharding split
*/
public abstract class RecoverySource implements Writeable, ToXContentObject {

Expand All @@ -57,6 +59,7 @@ public static RecoverySource readFrom(StreamInput in) throws IOException {
case PEER -> PeerRecoverySource.INSTANCE;
case SNAPSHOT -> new SnapshotRecoverySource(in);
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
case RESHARD_SPLIT -> new ReshardSplitRecoverySource(in);
};
}

Expand All @@ -78,7 +81,8 @@ public enum Type {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
RESHARD_SPLIT
}

public abstract Type getType();
Expand Down Expand Up @@ -319,4 +323,39 @@ public boolean expectEmptyRetentionLeases() {
return false;
}
}

/**
* Recovery of a shard that is created as a result of a resharding split.
* Not to be confused with _split API.
*/
public static class ReshardSplitRecoverySource extends RecoverySource {
private final ShardId sourceShardId;

public ReshardSplitRecoverySource(ShardId sourceShardId) {
this.sourceShardId = sourceShardId;
}

ReshardSplitRecoverySource(StreamInput in) throws IOException {
sourceShardId = new ShardId(in);
}

@Override
public Type getType() {
return Type.RESHARD_SPLIT;
}

public ShardId getSourceShardId() {
return sourceShardId;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
sourceShardId.writeTo(out);
}

@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
sourceShardId.toXContent(builder, params);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3504,7 +3504,12 @@ public void startRecovery(
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
case EMPTY_STORE, EXISTING_STORE, RESHARD_SPLIT -> executeRecovery(
"from store",
recoveryState,
recoveryListener,
this::recoverFromStore
);
case PEER -> {
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public final class StoreRecovery {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
: "expected store recovery type but was: " + recoveryType;
assert recoveryType == RecoverySource.Type.EMPTY_STORE
|| recoveryType == RecoverySource.Type.EXISTING_STORE
|| recoveryType == RecoverySource.Type.RESHARD_SPLIT : "expected one of store recovery types but was: " + recoveryType;
logger.debug("starting recovery from store ...");
final var recoveryListener = recoveryListener(indexShard, listener);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -701,6 +702,13 @@ private void createShard(ShardRouting shardRouting, ClusterState state) {
logger.trace("ignoring initializing shard {} - no source node can be found.", shardId);
return;
}
} else if (shardRouting.recoverySource() instanceof RecoverySource.ReshardSplitRecoverySource reshardSplitRecoverySource) {
ShardId sourceShardId = reshardSplitRecoverySource.getSourceShardId();
sourceNode = findSourceNodeForReshardSplitRecovery(state.routingTable(project.id()), state.nodes(), sourceShardId);
if (sourceNode == null) {
logger.trace("ignoring initializing reshard target shard {} - no source node can be found.", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert false here? I think this situation should be invalid, the shardRouting is taken from state further out - and we should assume that. I think we could add the same above, but would not want to complicate the work here with figuring that out.

Copy link
Contributor Author

@lkts lkts Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to make sure i understand - are you saying that if routing was updated for the target shard then the same cluster state should contain routing for the source shard as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added asserts inside findSourceNodeForReshardSplitRecovery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly that having a target shard recovery without an active source shard is not sound. If we fail the source shard we should probably also fail the target shard. At least that is how peer recovery works, if the primary fails, we fail the replicas too in the same cluster state update.

I am also not fond of the way we just ignore recovery here. If we accept this case we need a test case to verify that we resume it later.

Hence the asserts to clarify the situation. We may refine later.

return;
}
} else {
sourceNode = null;
}
Expand Down Expand Up @@ -988,6 +996,29 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingT
return sourceNode;
}

private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
RoutingTable routingTable,
DiscoveryNodes nodes,
ShardId sourceShardId
) {
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();

if (sourceShardRouting.active() == false) {
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assert false here and below as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it fails we can leave it out for now, but then we should add a jira to ensure we have these properties in the routing table.

return null;
}

DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId());
if (sourceNode == null) {
logger.trace(
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
sourceShardRouting
);
return null;
}
return sourceNode;
}

private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {}

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,10 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
this(shardRouting.shardId(), shardRouting.primary(), shardRouting.recoverySource(), sourceNode, targetNode, index, new Timer());
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
assert (shardRouting.recoverySource().getType() == RecoverySource.Type.PEER) == (sourceNode != null)
: "peer recovery requires source node, recovery type: "
+ shardRouting.recoverySource().getType()
+ " source node: "
+ sourceNode;
assert shardRouting.recoverySource().getType() != RecoverySource.Type.PEER || sourceNode != null
: "peer recovery requires source node but it is null";
assert shardRouting.recoverySource().getType() != RecoverySource.Type.RESHARD_SPLIT || sourceNode != null
: "reshard split target recovery requires source node but it is null";
timer.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public void testRecoverySourceTypeOrder() {
assertEquals(RecoverySource.Type.PEER.ordinal(), 2);
assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3);
assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4);
assertEquals(RecoverySource.Type.RESHARD_SPLIT.ordinal(), 5);
// check exhaustiveness
for (RecoverySource.Type type : RecoverySource.Type.values()) {
assertThat(type.ordinal(), greaterThanOrEqualTo(0));
assertThat(type.ordinal(), lessThanOrEqualTo(4));
assertThat(type.ordinal(), lessThanOrEqualTo(5));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public static RecoverySource buildRecoverySource() {
new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())),
IndexVersion.current(),
new IndexId("some_index", randomUUID())
)
),
new RecoverySource.ReshardSplitRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000)))
);
}
}