1212import org .apache .logging .log4j .Logger ;
1313import org .elasticsearch .cluster .ClusterState ;
1414import org .elasticsearch .cluster .metadata .IndexMetadata ;
15+ import org .elasticsearch .cluster .metadata .IndexReshardingMetadata ;
1516import org .elasticsearch .cluster .metadata .Metadata ;
1617import org .elasticsearch .cluster .metadata .ProjectMetadata ;
1718import org .elasticsearch .cluster .routing .GlobalRoutingTable ;
19+ import org .elasticsearch .cluster .routing .IndexRoutingTable ;
1820import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
1921import org .elasticsearch .cluster .routing .RecoverySource ;
2022import org .elasticsearch .cluster .routing .RoutingChangesObserver ;
@@ -127,15 +129,19 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
127129 for (Map .Entry <ShardId , Updates > shardEntry : indexChanges ) {
128130 ShardId shardId = shardEntry .getKey ();
129131 Updates updates = shardEntry .getValue ();
130- updatedIndexMetadata = updateInSyncAllocations (
131- newRoutingTable .routingTable (projectMetadata .id ()),
132- oldIndexMetadata ,
133- updatedIndexMetadata ,
134- shardId ,
135- updates
136- );
132+ RoutingTable routingTable = newRoutingTable .routingTable (projectMetadata .id ());
133+ updatedIndexMetadata = updateInSyncAllocations (routingTable , oldIndexMetadata , updatedIndexMetadata , shardId , updates );
134+ IndexRoutingTable indexRoutingTable = routingTable .index (shardEntry .getKey ().getIndex ());
135+ RecoverySource recoverySource = indexRoutingTable .shard (shardEntry .getKey ().id ()).primaryShard ().recoverySource ();
136+ IndexReshardingMetadata reshardingMetadata = updatedIndexMetadata .getReshardingMetadata ();
137+ boolean split = recoverySource != null && reshardingMetadata != null ;
137138 updatedIndexMetadata = updates .increaseTerm
138- ? updatedIndexMetadata .withIncrementedPrimaryTerm (shardId .id ())
139+ ? split
140+ ? updatedIndexMetadata .withSetPrimaryTerm (
141+ shardId .id (),
142+ splitPrimaryTerm (updatedIndexMetadata , reshardingMetadata , shardId )
143+ )
144+ : updatedIndexMetadata .withIncrementedPrimaryTerm (shardId .id ())
139145 : updatedIndexMetadata ;
140146 }
141147 if (updatedIndexMetadata != oldIndexMetadata ) {
@@ -147,6 +153,15 @@ public Metadata applyChanges(Metadata oldMetadata, GlobalRoutingTable newRouting
147153 return updatedMetadata .build ();
148154 }
149155
156+ private static long splitPrimaryTerm (IndexMetadata updatedIndexMetadata , IndexReshardingMetadata reshardingMetadata , ShardId shardId ) {
157+ // We take the max of the source and target primary terms. This guarantees that the target primary term stays
158+ // greater than or equal to the source.
159+ return Math .max (
160+ updatedIndexMetadata .primaryTerm (shardId .getId () % reshardingMetadata .shardCountBefore ()),
161+ updatedIndexMetadata .primaryTerm (shardId .id ()) + 1
162+ );
163+ }
164+
150165 /**
151166 * Updates in-sync allocations with routing changes that were made to the routing table.
152167 */
0 commit comments