Skip to content

Commit 078e6c6

Browse files
committed
Add new recovery source for reshard split target shards
1 parent 8d0c9ce commit 078e6c6

File tree

5 files changed

+79
-3
lines changed

5 files changed

+79
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
5858
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
5959
// so no additional space is going to be used until future merges
6060
case LOCAL_SHARDS -> false;
61+
case RESHARD_SPLIT_TARGET -> false;
6162
};
6263
}
6364

server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
1616
import org.elasticsearch.index.IndexVersion;
17+
import org.elasticsearch.index.shard.ShardId;
1718
import org.elasticsearch.repositories.IndexId;
1819
import org.elasticsearch.snapshots.Snapshot;
1920
import org.elasticsearch.xcontent.ToXContent;
@@ -31,6 +32,7 @@
3132
* - {@link PeerRecoverySource} recovery from a primary on another node
3233
* - {@link SnapshotRecoverySource} recovery from a snapshot
3334
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
35+
* - {@link SplitTargetRecoverySource} recovery of a shard that is created as a result of a resharding split
3436
*/
3537
public abstract class RecoverySource implements Writeable, ToXContentObject {
3638

@@ -50,13 +52,16 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
5052
}
5153

5254
public static RecoverySource readFrom(StreamInput in) throws IOException {
55+
// TODO is transport version check needed?
56+
5357
Type type = Type.values()[in.readByte()];
5458
return switch (type) {
5559
case EMPTY_STORE -> EmptyStoreRecoverySource.INSTANCE;
5660
case EXISTING_STORE -> ExistingStoreRecoverySource.read(in);
5761
case PEER -> PeerRecoverySource.INSTANCE;
5862
case SNAPSHOT -> new SnapshotRecoverySource(in);
5963
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
64+
case RESHARD_SPLIT_TARGET -> new SplitTargetRecoverySource(in);
6065
};
6166
}
6267

@@ -78,7 +83,8 @@ public enum Type {
7883
EXISTING_STORE,
7984
PEER,
8085
SNAPSHOT,
81-
LOCAL_SHARDS
86+
LOCAL_SHARDS,
87+
RESHARD_SPLIT_TARGET
8288
}
8389

8490
public abstract Type getType();
@@ -319,4 +325,39 @@ public boolean expectEmptyRetentionLeases() {
319325
return false;
320326
}
321327
}
328+
329+
/**
330+
* Recovery of a shard that is created as a result of a resharding split.
331+
* Not to be confused with _split API.
332+
*/
333+
public static class SplitTargetRecoverySource extends RecoverySource {
334+
private final ShardId sourceShardId;
335+
336+
public SplitTargetRecoverySource(ShardId sourceShardId) {
337+
this.sourceShardId = sourceShardId;
338+
}
339+
340+
SplitTargetRecoverySource(StreamInput in) throws IOException {
341+
sourceShardId = new ShardId(in);
342+
}
343+
344+
@Override
345+
public Type getType() {
346+
return Type.RESHARD_SPLIT_TARGET;
347+
}
348+
349+
public ShardId getSourceShardId() {
350+
return sourceShardId;
351+
}
352+
353+
@Override
354+
protected void writeAdditionalFields(StreamOutput out) throws IOException {
355+
sourceShardId.writeTo(out);
356+
}
357+
358+
@Override
359+
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
360+
sourceShardId.toXContent(builder, params);
361+
}
362+
}
322363
}

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.node.DiscoveryNodes;
3232
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
33+
import org.elasticsearch.cluster.routing.RecoverySource;
3334
import org.elasticsearch.cluster.routing.RecoverySource.Type;
3435
import org.elasticsearch.cluster.routing.RoutingNode;
3536
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -701,6 +702,14 @@ private void createShard(ShardRouting shardRouting, ClusterState state) {
701702
logger.trace("ignoring initializing shard {} - no source node can be found.", shardId);
702703
return;
703704
}
705+
} else if (shardRouting.recoverySource().getType() == Type.RESHARD_SPLIT_TARGET) {
706+
ShardId sourceShardId = ((RecoverySource.SplitTargetRecoverySource) shardRouting.recoverySource()).getSourceShardId();
707+
sourceNode = findSourceNodeForSplitTargetRecovery(
708+
state.routingTable(project.id()),
709+
state.nodes(),
710+
sourceShardId
711+
712+
);
704713
} else {
705714
sourceNode = null;
706715
}
@@ -988,6 +997,29 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingT
988997
return sourceNode;
989998
}
990999

1000+
private static DiscoveryNode findSourceNodeForSplitTargetRecovery(
1001+
RoutingTable routingTable,
1002+
DiscoveryNodes nodes,
1003+
ShardId sourceShardId
1004+
) {
1005+
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();
1006+
1007+
if (sourceShardRouting.active() == false) {
1008+
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
1009+
return null;
1010+
}
1011+
1012+
DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId());
1013+
if (sourceNode == null) {
1014+
logger.trace(
1015+
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
1016+
sourceShardRouting
1017+
);
1018+
return null;
1019+
}
1020+
return sourceNode;
1021+
}
1022+
9911023
private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {}
9921024

9931025
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ public void testRecoverySourceTypeOrder() {
3434
assertEquals(RecoverySource.Type.PEER.ordinal(), 2);
3535
assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3);
3636
assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4);
37+
assertEquals(RecoverySource.Type.RESHARD_SPLIT_TARGET.ordinal(), 5);
3738
// check exhaustiveness
3839
for (RecoverySource.Type type : RecoverySource.Type.values()) {
3940
assertThat(type.ordinal(), greaterThanOrEqualTo(0));
40-
assertThat(type.ordinal(), lessThanOrEqualTo(4));
41+
assertThat(type.ordinal(), lessThanOrEqualTo(5));
4142
}
4243
}
4344
}

test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ public static RecoverySource buildRecoverySource() {
275275
new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())),
276276
IndexVersion.current(),
277277
new IndexId("some_index", randomUUID())
278-
)
278+
),
279+
new RecoverySource.SplitTargetRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000)))
279280
);
280281
}
281282
}

0 commit comments

Comments
 (0)