Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
// so no additional space is going to be used until future merges
case LOCAL_SHARDS -> false;
case RESHARD_SPLIT_TARGET -> false;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -31,6 +32,7 @@
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
* - {@link SplitTargetRecoverySource} recovery of a shard that is created as a result of a resharding split
*/
public abstract class RecoverySource implements Writeable, ToXContentObject {

Expand All @@ -50,13 +52,16 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
}

public static RecoverySource readFrom(StreamInput in) throws IOException {
// TODO is transport version check needed?
Copy link
Contributor Author

@lkts lkts Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK master nodes are upgraded last meaning that data nodes will already be able to read the new value. Let me know if i am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand how this would work in general. Couldn't you have a cluster where all the indexing nodes are also master-eligible?

I do think we intend to fail resharding actions on a cluster until all nodes support it though. I thought we had a ticket for that but I don't see it now. I've stubbed in ES-12048 for this.

At any rate I don't think we have a problem on read, since by definition we know about the new source, and the other node either also does or simply won't send this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, i agree. If this logic existed it should have been on the writer but i think that's addressed by the check when enabling resharding that you mentioned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not rely on the master being upgraded last, but we can rely on the feature not being available. I suppose we have no rolling upgrade tests yet with the feature enabled, hence not having the version check should work just fine. So no change needed, just wanted to ensure we would not rely on the master being upgraded last.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the consensus (from my perspective) is that there will be a feature flag that enables resharding as a whole.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a feature flag, but a guard on all nodes in the cluster having a transport version that speaks resharding. This covers a case where a cluster is being upgraded from pre-resharding to post after a resharding feature flag has been set.


Type type = Type.values()[in.readByte()];
return switch (type) {
case EMPTY_STORE -> EmptyStoreRecoverySource.INSTANCE;
case EXISTING_STORE -> ExistingStoreRecoverySource.read(in);
case PEER -> PeerRecoverySource.INSTANCE;
case SNAPSHOT -> new SnapshotRecoverySource(in);
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
case RESHARD_SPLIT_TARGET -> new SplitTargetRecoverySource(in);
};
}

Expand All @@ -78,7 +83,8 @@ public enum Type {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
RESHARD_SPLIT_TARGET
}

public abstract Type getType();
Expand Down Expand Up @@ -319,4 +325,39 @@ public boolean expectEmptyRetentionLeases() {
return false;
}
}

/**
* Recovery of a shard that is created as a result of a resharding split.
* Not to be confused with _split API.
*/
public static class SplitTargetRecoverySource extends RecoverySource {
private final ShardId sourceShardId;

public SplitTargetRecoverySource(ShardId sourceShardId) {
this.sourceShardId = sourceShardId;
}

SplitTargetRecoverySource(StreamInput in) throws IOException {
sourceShardId = new ShardId(in);
}

@Override
public Type getType() {
return Type.RESHARD_SPLIT_TARGET;
}

public ShardId getSourceShardId() {
return sourceShardId;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
sourceShardId.writeTo(out);
}

@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
sourceShardId.toXContent(builder, params);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3501,7 +3501,12 @@ public void startRecovery(
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
case EMPTY_STORE, EXISTING_STORE, RESHARD_SPLIT_TARGET -> executeRecovery(
"from store",
recoveryState,
recoveryListener,
this::recoverFromStore
);
case PEER -> {
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ public final class StoreRecovery {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
: "expected store recovery type but was: " + recoveryType;
assert recoveryType == RecoverySource.Type.EMPTY_STORE
|| recoveryType == RecoverySource.Type.EXISTING_STORE
|| recoveryType == RecoverySource.Type.RESHARD_SPLIT_TARGET
: "expected one of store recovery types but was: " + recoveryType;
logger.debug("starting recovery from store ...");
final var recoveryListener = recoveryListener(indexShard, listener);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -701,6 +702,13 @@ private void createShard(ShardRouting shardRouting, ClusterState state) {
logger.trace("ignoring initializing shard {} - no source node can be found.", shardId);
return;
}
} else if (shardRouting.recoverySource().getType() == Type.RESHARD_SPLIT_TARGET) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a request for change, this isn't your API, but it would be nice if the ShardRouting API didn't make us get type and then cast in two steps. I could imagine something like a shardRouting.recoverySource().asReshardSplit() maybe that did the check and returned the cast object if it passed or null.

I'm just griping though, it's probably not worth doing now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could instanceof the shardRouting.recoverySource() instead. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that seems nicer

ShardId sourceShardId = ((RecoverySource.SplitTargetRecoverySource) shardRouting.recoverySource()).getSourceShardId();
sourceNode = findSourceNodeForSplitTargetRecovery(state.routingTable(project.id()), state.nodes(), sourceShardId);
if (sourceNode == null) {
logger.trace("ignoring initializing reshard target shard {} - no source node can be found.", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert false here? I think this situation should be invalid, the shardRouting is taken from state further out - and we should assume that. I think we could add the same above, but would not want to complicate the work here with figuring that out.

Copy link
Contributor Author

@lkts lkts Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to make sure i understand - are you saying that if routing was updated for the target shard then the same cluster state should contain routing for the source shard as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added asserts inside findSourceNodeForReshardSplitRecovery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly that having a target shard recovery without an active source shard is not sound. If we fail the source shard we should probably also fail the target shard. At least that is how peer recovery works, if the primary fails, we fail the replicas too in the same cluster state update.

I am also not fond of the way we just ignore recovery here. If we accept this case we need a test case to verify that we resume it later.

Hence the asserts to clarify the situation. We may refine later.

return;
}
} else {
sourceNode = null;
}
Expand Down Expand Up @@ -988,6 +996,29 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingT
return sourceNode;
}

private static DiscoveryNode findSourceNodeForSplitTargetRecovery(
RoutingTable routingTable,
DiscoveryNodes nodes,
ShardId sourceShardId
) {
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();

if (sourceShardRouting.active() == false) {
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assert false here and below as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it fails we can leave it out for now, but then we should add a jira to ensure we have these properties in the routing table.

return null;
}

DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId());
if (sourceNode == null) {
logger.trace(
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
sourceShardRouting
);
return null;
}
return sourceNode;
}

private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {}

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,10 @@ public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nulla
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
this(shardRouting.shardId(), shardRouting.primary(), shardRouting.recoverySource(), sourceNode, targetNode, index, new Timer());
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
assert (shardRouting.recoverySource().getType() == RecoverySource.Type.PEER) == (sourceNode != null)
: "peer recovery requires source node, recovery type: "
+ shardRouting.recoverySource().getType()
+ " source node: "
+ sourceNode;
assert shardRouting.recoverySource().getType() != RecoverySource.Type.PEER || sourceNode != null
: "peer recovery requires source node but it is null";
assert shardRouting.recoverySource().getType() != RecoverySource.Type.RESHARD_SPLIT_TARGET || sourceNode != null
: "reshard split target recovery requires source node but it is null";
timer.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public void testRecoverySourceTypeOrder() {
assertEquals(RecoverySource.Type.PEER.ordinal(), 2);
assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3);
assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4);
assertEquals(RecoverySource.Type.RESHARD_SPLIT_TARGET.ordinal(), 5);
// check exhaustiveness
for (RecoverySource.Type type : RecoverySource.Type.values()) {
assertThat(type.ordinal(), greaterThanOrEqualTo(0));
assertThat(type.ordinal(), lessThanOrEqualTo(4));
assertThat(type.ordinal(), lessThanOrEqualTo(5));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public static RecoverySource buildRecoverySource() {
new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())),
IndexVersion.current(),
new IndexId("some_index", randomUUID())
)
),
new RecoverySource.SplitTargetRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000)))
);
}
}