Skip to content

Commit 70bc143

Browse files
authored
Advance split target primary term to match source (#125738)
Whenever a split target that has not been handed off is being assigned the primary term is advanced to match at least the source primary term.
1 parent 71e74bd commit 70bc143

File tree

4 files changed

+59
-4
lines changed

4 files changed

+59
-4
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -893,15 +893,25 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
893893
* @return updated instance with incremented primary term
894894
*/
895895
public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
896-
final long[] incremented = this.primaryTerms.clone();
897-
incremented[shardId]++;
896+
return withSetPrimaryTerm(shardId, this.primaryTerms[shardId] + 1);
897+
}
898+
899+
/**
900+
* Creates a copy of this instance that has the primary term for the given shard id set to the value provided.
901+
* @param shardId shard id to set primary term for
902+
* @param primaryTerm primary term to set
903+
* @return updated instance with set primary term
904+
*/
905+
public IndexMetadata withSetPrimaryTerm(int shardId, long primaryTerm) {
906+
final long[] newPrimaryTerms = this.primaryTerms.clone();
907+
newPrimaryTerms[shardId] = primaryTerm;
898908
return new IndexMetadata(
899909
this.index,
900910
this.version,
901911
this.mappingVersion,
902912
this.settingsVersion,
903913
this.aliasesVersion,
904-
incremented,
914+
newPrimaryTerms,
905915
this.state,
906916
this.numberOfShards,
907917
this.numberOfReplicas,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingMetadata.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
1414
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.index.shard.ShardId;
1516
import org.elasticsearch.xcontent.ConstructingObjectParser;
1617
import org.elasticsearch.xcontent.ParseField;
1718
import org.elasticsearch.xcontent.ToXContentFragment;
1819
import org.elasticsearch.xcontent.XContentBuilder;
1920
import org.elasticsearch.xcontent.XContentParser;
2021

2122
import java.io.IOException;
23+
import java.util.Arrays;
2224
import java.util.Objects;
2325

2426
/**
@@ -205,6 +207,18 @@ public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int mul
205207
return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple));
206208
}
207209

210+
public IndexReshardingMetadata transitionSplitTargetToHandoff(ShardId shardId) {
211+
assert state instanceof IndexReshardingState.Split;
212+
IndexReshardingState.Split splitState = (IndexReshardingState.Split) state;
213+
IndexReshardingState.Split.TargetShardState[] newTargets = Arrays.copyOf(
214+
splitState.targetShards(),
215+
splitState.targetShards().length
216+
);
217+
int i = shardId.getId() - state.shardCountBefore();
218+
newTargets[i] = IndexReshardingState.Split.TargetShardState.HANDOFF;
219+
return new IndexReshardingMetadata(new IndexReshardingState.Split(splitState.sourceShards(), newTargets));
220+
}
221+
208222
/**
209223
* @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split
210224
*/
@@ -215,6 +229,10 @@ public IndexReshardingState.Split getSplit() {
215229
};
216230
}
217231

232+
public boolean isSplit() {
233+
return state instanceof IndexReshardingState.Split;
234+
}
235+
218236
/**
219237
* @return the number of shards the index has at the start of this operation
220238
*/

server/src/main/java/org/elasticsearch/cluster/metadata/IndexReshardingState.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ TargetShardState[] targetShards() {
250250
return targetShards.clone();
251251
}
252252

253+
public int sourceShard(int targetShard) {
254+
return targetShard % shardCountBefore();
255+
}
256+
253257
/**
254258
* Create resharding metadata representing a new split operation
255259
* Split only supports updating an index to a multiple of its current shard count
@@ -345,6 +349,10 @@ public SourceShardState getSourceShardState(int shardNum) {
345349
return sourceShards[shardNum];
346350
}
347351

352+
public boolean isTargetShard(int shardId) {
353+
return shardId >= shardCountBefore();
354+
}
355+
348356
/**
349357
* Get the current target state of a shard
350358
* @param shardNum an index into shards greater than or equal to the old shard count and less than the new shard count

server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetadataUpdater.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
1516
import org.elasticsearch.cluster.metadata.Metadata;
1617
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1718
import org.elasticsearch.cluster.routing.GlobalRoutingTable;
@@ -134,8 +135,17 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
134135
shardId,
135136
updates
136137
);
138+
IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata.getReshardingMetadata();
139+
boolean splitTarget = reshardingMetadata != null
140+
&& reshardingMetadata.isSplit()
141+
&& reshardingMetadata.getSplit().isTargetShard(shardId.id());
137142
updatedIndexMetadata = updates.increaseTerm
138-
? updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id())
143+
? splitTarget
144+
? updatedIndexMetadata.withSetPrimaryTerm(
145+
shardId.id(),
146+
splitPrimaryTerm(updatedIndexMetadata, reshardingMetadata, shardId)
147+
)
148+
: updatedIndexMetadata.withIncrementedPrimaryTerm(shardId.id())
139149
: updatedIndexMetadata;
140150
}
141151
if (updatedIndexMetadata != oldIndexMetadata) {
@@ -147,6 +157,15 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
147157
return updatedMetadata.build();
148158
}
149159

160+
private static long splitPrimaryTerm(IndexMetadata updatedIndexMetadata, IndexReshardingMetadata reshardingMetadata, ShardId shardId) {
161+
// We take the max of the source and target primary terms. This guarantees that the target primary term stays
162+
// greater than or equal to the source.
163+
return Math.max(
164+
updatedIndexMetadata.primaryTerm(reshardingMetadata.getSplit().sourceShard(shardId.id())),
165+
updatedIndexMetadata.primaryTerm(shardId.id()) + 1
166+
);
167+
}
168+
150169
/**
151170
* Updates in-sync allocations with routing changes that were made to the routing table.
152171
*/

0 commit comments

Comments
 (0)