Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4ee90b9
Changes
Tim-Brooks Feb 12, 2025
0927aab
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Feb 18, 2025
33d43a0
WIP
Tim-Brooks Feb 19, 2025
9c8ee34
WIP
Tim-Brooks Feb 20, 2025
b074858
Merge commit '88b59008375' into start_split_shard_task
Tim-Brooks Mar 7, 2025
2adadff
Change
Tim-Brooks Mar 7, 2025
68a6ea3
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 7, 2025
eff3f69
Change
Tim-Brooks Mar 7, 2025
f8c0ffd
WIP
Tim-Brooks Mar 8, 2025
739ee52
Changes
Tim-Brooks Mar 11, 2025
b3eb70e
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 11, 2025
19514be
Change
Tim-Brooks Mar 12, 2025
e5300c0
Changes
Tim-Brooks Mar 12, 2025
f9ed927
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 12, 2025
96cec40
Change'
Tim-Brooks Mar 13, 2025
bba78b0
Change
Tim-Brooks Mar 13, 2025
89f5b43
Changes
Tim-Brooks Mar 13, 2025
ea3b3c8
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 13, 2025
403f146
Change
Tim-Brooks Mar 14, 2025
e72afa5
Change
Tim-Brooks Mar 14, 2025
ecced4f
Fixes
Tim-Brooks Mar 14, 2025
8103712
Fix
Tim-Brooks Mar 14, 2025
28353a4
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 14, 2025
2008217
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 14, 2025
9578b16
WIP
Tim-Brooks Mar 14, 2025
f157bfd
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 14, 2025
c68c056
Merge remote-tracking branch 'origin/main' into start_split_shard_task
Tim-Brooks Mar 19, 2025
e09baf4
Change
Tim-Brooks Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
public static final TransportVersion SOURCE_PRIMARY_TERM_IN_START_SHARD = def(9_036_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
Expand All @@ -41,6 +43,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -580,6 +583,31 @@ public void shardStarted(
);
}

public void shardSplit(
final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampRange,
final ShardLongFieldRange eventIngestedRange,
final long sourcePrimaryTerm,
final ActionListener<Void> listener
) {
ClusterState currentState = clusterService.state();
remoteShardStateUpdateDeduplicator.executeOnce(
new StartedShardEntry(
shardRouting.shardId(),
shardRouting.allocationId().getId(),
primaryTerm,
message,
timestampRange,
eventIngestedRange,
new ShardSplit(sourcePrimaryTerm)
),
listener,
(req, l) -> sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, req, l)
);
}

// TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code
private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
private final MasterServiceTaskQueue<StartedShardUpdateTask> taskQueue;
Expand Down Expand Up @@ -691,6 +719,12 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
matched
);
tasksToBeApplied.add(taskContext);
} else if (invalidShardSplit(startedShardEntry, projectId, initialState)) {
logger.debug("{} failing shard started task because split validation failed", startedShardEntry.shardId);
// TODO: Currently invalid shard split triggers if the primary term changes, the source primary term changes or
// is >= the target primary term or if the source is relocating. In the second and third scenario this will be
// swallow currently. In the split process we will need to handle this.
taskContext.success(() -> task.onFailure(new IllegalStateException("Cannot start")));
} else {
logger.debug(
"{} starting shard {} (shard started task: [{}])",
Expand Down Expand Up @@ -789,6 +823,31 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
return maybeUpdatedState;
}

private static boolean invalidShardSplit(StartedShardEntry startedShardEntry, ProjectId projectId, ClusterState clusterState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, do we have a custom of invalid functions that return false if the state is valid? It seems like a double negative and using valid would be more natural to me, unless that is against custom.

ShardSplit shardSplit = startedShardEntry.shardSplit;
if (shardSplit == null) {
return false;
}
IndexRoutingTable routingTable = clusterState.routingTable(projectId).index(startedShardEntry.shardId.getIndex());
final IndexMetadata indexMetadata = clusterState.metadata().getProject(projectId).index(startedShardEntry.shardId.getIndex());
assert indexMetadata != null;
IndexReshardingMetadata reshardingMetadata = indexMetadata.getReshardingMetadata();
assert reshardingMetadata != null;
IndexReshardingState.Split split = reshardingMetadata.getSplit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could have getSplit return null if reshardingMetadata is null to avoid having to do this two step check.

int sourceShardId = startedShardEntry.shardId.getId() % split.shardCountBefore();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a function to IndexReshardingState.Split to return the source shard number for a given target? I know it's only modulo but it might be nice to have this kind of logic in one place.

long currentSourcePrimaryTerm = indexMetadata.primaryTerm(sourceShardId);
long primaryTermDiff = startedShardEntry.primaryTerm - currentSourcePrimaryTerm;
// The source primary term must not have changed, the target primary term must at least be equal to or greater and the source
// cannot be relocating.
if (startedShardEntry.shardSplit.sourcePrimaryTerm() != currentSourcePrimaryTerm
|| primaryTermDiff < 0
|| routingTable.shard(sourceShardId).primaryShard().relocating()) {
return true;
} else {
return false;
}
}

private static boolean assertStartedIndicesHaveCompleteTimestampRanges(ClusterState clusterState) {
for (ProjectId projectId : clusterState.metadata().projects().keySet()) {
for (Map.Entry<String, IndexRoutingTable> cursor : clusterState.routingTable(projectId).getIndicesRouting().entrySet()) {
Expand Down Expand Up @@ -827,13 +886,26 @@ public void clusterStatePublished(ClusterState newClusterState) {
}
}

record ShardSplit(long sourcePrimaryTerm) implements Writeable {

ShardSplit(StreamInput in) throws IOException {
this(in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(sourcePrimaryTerm);
}
}

public static class StartedShardEntry extends TransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;
final String message;
final ShardLongFieldRange timestampRange;
final ShardLongFieldRange eventIngestedRange;
final ShardSplit shardSplit;

StartedShardEntry(StreamInput in) throws IOException {
super(in);
Expand All @@ -847,6 +919,11 @@ public static class StartedShardEntry extends TransportRequest {
} else {
this.eventIngestedRange = ShardLongFieldRange.UNKNOWN;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SOURCE_PRIMARY_TERM_IN_START_SHARD)) {
this.shardSplit = in.readOptionalWriteable(ShardSplit::new);
} else {
this.shardSplit = null;
}
}

public StartedShardEntry(
Expand All @@ -856,13 +933,26 @@ public StartedShardEntry(
final String message,
final ShardLongFieldRange timestampRange,
final ShardLongFieldRange eventIngestedRange
) {
this(shardId, allocationId, primaryTerm, message, timestampRange, eventIngestedRange, null);
}

public StartedShardEntry(
final ShardId shardId,
final String allocationId,
final long primaryTerm,
final String message,
final ShardLongFieldRange timestampRange,
final ShardLongFieldRange eventIngestedRange,
@Nullable final ShardSplit shardSplit
) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
this.timestampRange = timestampRange;
this.eventIngestedRange = eventIngestedRange;
this.shardSplit = shardSplit;
}

@Override
Expand All @@ -876,6 +966,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
eventIngestedRange.writeTo(out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SOURCE_PRIMARY_TERM_IN_START_SHARD)) {
out.writeOptionalWriteable(shardSplit);
}
}

@Override
Expand All @@ -891,20 +984,20 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StartedShardEntry that = (StartedShardEntry) o;
return primaryTerm == that.primaryTerm
&& shardId.equals(that.shardId)
&& allocationId.equals(that.allocationId)
&& message.equals(that.message)
&& timestampRange.equals(that.timestampRange)
&& eventIngestedRange.equals(that.eventIngestedRange);
&& Objects.equals(shardId, that.shardId)
&& Objects.equals(allocationId, that.allocationId)
&& Objects.equals(message, that.message)
&& Objects.equals(timestampRange, that.timestampRange)
&& Objects.equals(eventIngestedRange, that.eventIngestedRange)
&& Objects.equals(shardSplit, that.shardSplit);
}

@Override
public int hashCode() {
return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange, eventIngestedRange);
return Objects.hash(shardId, allocationId, primaryTerm, message, timestampRange, eventIngestedRange, shardSplit);
}
}

Expand Down Expand Up @@ -946,7 +1039,5 @@ public NoLongerPrimaryShardException(ShardId shardId, String msg) {
public NoLongerPrimaryShardException(StreamInput in) throws IOException {
super(in);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,18 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
* @return updated instance with incremented primary term
*/
public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
return withSetPrimaryTerm(shardId, this.primaryTerms[shardId] + 1);
}

/**
* Creates a copy of this instance that has the primary term for the given shard id set to the value provided.
* @param shardId shard id to set primary term for
* @param primaryTerm primary term to set
* @return updated instance with set primary term
*/
public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) {
final long[] incremented = this.primaryTerms.clone();
incremented[shardId]++;
incremented[shardId] = primaryTerm;
return new IndexMetadata(
this.index,
this.version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public static boolean shouldReserveSpaceForInitializingShard(ShardRouting shard,
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
// so no additional space is going to be used until future merges
case LOCAL_SHARDS -> false;

// Split currently does not require space locally as it maps to existing store. Future implementations might require the space
// locally.
case SPLIT -> false;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
* - {@link SplitRecoverySource} recovery that is split from a source shard
*/
public abstract class RecoverySource implements Writeable, ToXContentObject {

Expand All @@ -57,6 +58,7 @@ public static RecoverySource readFrom(StreamInput in) throws IOException {
case PEER -> PeerRecoverySource.INSTANCE;
case SNAPSHOT -> new SnapshotRecoverySource(in);
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
case SPLIT -> SplitRecoverySource.INSTANCE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this recovery type to the top of this class where the other ones are also listed.

};
}

Expand All @@ -78,7 +80,8 @@ public enum Type {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
SPLIT
}

public abstract Type getType();
Expand Down Expand Up @@ -319,4 +322,36 @@ public boolean expectEmptyRetentionLeases() {
return false;
}
}

/**
* split recovery from a source primary shard
*/
public static class SplitRecoverySource extends RecoverySource {

public static final SplitRecoverySource INSTANCE = new SplitRecoverySource();

private SplitRecoverySource() {}

@Override
public Type getType() {
return Type.SPLIT;
}

@Override
public String toString() {
return "split recovery";
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
super.writeAdditionalFields(out);
}

@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
super.addAdditionalFields(builder, params);
}

// TODO: Expect empty retention leases?
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.routing.GlobalRoutingTable;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
Expand Down Expand Up @@ -127,15 +129,15 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
for (Map.Entry<ShardId, Updates> shardEntry : indexChanges) {
ShardId shardId = shardEntry.getKey();
Updates updates = shardEntry.getValue();
updatedIndexMetadata = updateInSyncAllocations(
newRoutingTable.routingTable(projectMetadata.id()),
oldIndexMetadata,
updatedIndexMetadata,
shardId,
updates
);
RoutingTable routingTable = newRoutingTable.routingTable(projectMetadata.id());
updatedIndexMetadata = updateInSyncAllocations(routingTable, oldIndexMetadata, updatedIndexMetadata, shardId, updates);
IndexRoutingTable indexRoutingTable = routingTable.index(shardEntry.getKey().getIndex());
RecoverySource recoverySource = indexRoutingTable.shard(shardEntry.getKey().id()).primaryShard().recoverySource();
boolean split = recoverySource != null && recoverySource.getType() == RecoverySource.Type.SPLIT;
updatedIndexMetadata = updates.increaseTerm
? updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id())
? split
? updatedIndexMetadata.withSetPrimaryTerm(shardId.id(), splitPrimaryTerm(updatedIndexMetadata, shardId))
: updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id())
: updatedIndexMetadata;
}
if (updatedIndexMetadata != oldIndexMetadata) {
Expand All @@ -147,6 +149,18 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
return updatedMetadata.build();
}

private static long splitPrimaryTerm(IndexMetadata updatedIndexMetadata, ShardId shardId) {
IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata.getReshardingMetadata();
assert reshardingMetadata != null;

// We take the max of the source and target primary terms. This guarantees that the target primary term stays
// greater than or equal to the source.
return Math.max(
updatedIndexMetadata.primaryTerm(shardId.getId() % reshardingMetadata.shardCountBefore()),
updatedIndexMetadata.primaryTerm(shardId.id()) + 1
);
}

/**
* Updates in-sync allocations with routing changes that were made to the routing table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3339,7 +3339,12 @@ public void startRecovery(
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
case EMPTY_STORE, EXISTING_STORE, SPLIT -> executeRecovery(
"from store",
recoveryState,
recoveryListener,
this::recoverFromStore
);
case PEER -> {
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public final class StoreRecovery {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
: "expected store recovery type but was: " + recoveryType;
assert recoveryType == RecoverySource.Type.EMPTY_STORE
|| recoveryType == RecoverySource.Type.EXISTING_STORE
|| recoveryType == RecoverySource.Type.SPLIT : "expected store recovery type but was: " + recoveryType;
logger.debug("starting recovery from store ...");
final var recoveryListener = recoveryListener(indexShard, listener);
try {
Expand Down
Loading