diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 2548836b9e634..2021818db57b1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.service.ClusterService; @@ -54,14 +55,18 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; import static java.util.stream.IntStream.range; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -130,7 +135,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { setUpMockTransportIndicesStatsResponse( harness.firstDiscoveryNode, indexMetadata.getNumberOfShards(), - createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId) ); setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); @@ -235,7 +240,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { setUpMockTransportIndicesStatsResponse( harness.firstDiscoveryNode, indexMetadata.getNumberOfShards(), - createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId) ); setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); @@ -333,7 +338,7 @@ public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferre setUpMockTransportIndicesStatsResponse( harness.firstDiscoveryNode, indexMetadata.getNumberOfShards(), - createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId) ); setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); @@ -429,15 +434,12 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() { * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). */ - IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) - .state() - .getMetadata() - .getProject() - .index(harness.indexName); + final ClusterState originalClusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final IndexMetadata indexMetadata = originalClusterState.getMetadata().getProject().index(harness.indexName); setUpMockTransportIndicesStatsResponse( harness.firstDiscoveryNode, indexMetadata.getNumberOfShards(), - createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + createShardStatsResponseForIndex(indexMetadata, harness.maxShardWriteLoad, harness.firstDataNodeId) ); setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); @@ -483,6 +485,7 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() { harness.randomNumberOfShards, countShardsStillAssignedToFirstNode + 1 ); + assertThatTheBestShardWasMoved(harness, originalClusterState, desiredBalanceResponse); } catch (AssertionError error) { ClusterState state = client().admin() .cluster() @@ -498,6 +501,36 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() { } } + /** + * Determine which shard was moved and check that it's the "best" according to + * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator} + */ + private void assertThatTheBestShardWasMoved( + TestHarness harness, + ClusterState originalClusterState, + DesiredBalanceResponse desiredBalanceResponse + ) { + int movedShardId = desiredBalanceResponse.getRoutingTable().get(harness.indexName).entrySet().stream().filter(e -> { + Set desiredNodeIds = e.getValue().desired().nodeIds(); + return desiredNodeIds.contains(harness.secondDiscoveryNode.getId()) + || desiredNodeIds.contains(harness.thirdDiscoveryNode.getId()); + }).findFirst().map(Map.Entry::getKey).orElseThrow(() -> new AssertionError("No shard was moved to a non-hot-spotting node")); + + final BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator comparator = + new BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator( + desiredBalanceResponse.getClusterInfo(), + originalClusterState.getRoutingNodes().node(harness.firstDataNodeId) + ); + + final List bestShardsToMove = StreamSupport.stream( + originalClusterState.getRoutingNodes().node(harness.firstDataNodeId).spliterator(), + false + ).sorted(comparator).toList(); + + // The moved shard should be at the head of the sorted list + assertThat(movedShardId, equalTo(bestShardsToMove.get(0).shardId().id())); + } + public void testMaxQueueLatencyMetricIsPublished() { final Settings settings = Settings.builder() .put( @@ -659,16 +692,35 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( } /** - * Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat. + * Helper to create a list of dummy {@link ShardStats} for the given index, each shard being randomly allocated a peak write load + * between 0 and {@code maximumShardWriteLoad}. There will always be at least one shard reporting the specified + * {@code maximumShardWriteLoad}. */ private List createShardStatsResponseForIndex( IndexMetadata indexMetadata, - float peakShardWriteLoad, + float maximumShardWriteLoad, String assignedShardNodeId ) { - List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); + // Randomly distribute shards' peak write-loads so that we can check later that shard movements are prioritized correctly + final double writeLoadThreshold = maximumShardWriteLoad + * BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO; + final List shardPeakWriteLoads = new ArrayList<>(); + // Need at least one with the maximum write-load + shardPeakWriteLoads.add((double) maximumShardWriteLoad); + final int remainingShards = indexMetadata.getNumberOfShards() - 1; + // Some over-threshold, some under + for (int i = 0; i < remainingShards; ++i) { + if (randomBoolean()) { + shardPeakWriteLoads.add(randomDoubleBetween(writeLoadThreshold, maximumShardWriteLoad, true)); + } else { + shardPeakWriteLoads.add(randomDoubleBetween(0.0, writeLoadThreshold, true)); + } + } + assertThat(shardPeakWriteLoads, hasSize(indexMetadata.getNumberOfShards())); + Collections.shuffle(shardPeakWriteLoads, random()); + final List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, peakShardWriteLoad, assignedShardNodeId)); + shardStats.add(createShardStats(indexMetadata, i, shardPeakWriteLoads.get(i), assignedShardNodeId)); } return shardStats; } @@ -719,7 +771,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { int randomUtilizationThresholdPercent = randomIntBetween(50, 100); int randomNumberOfWritePoolThreads = randomIntBetween(2, 20); long randomQueueLatencyThresholdMillis = randomLongBetween(1, 20_000); - float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); + float maximumShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent, randomQueueLatencyThresholdMillis); internalCluster().startMasterOnlyNode(settings); @@ -756,8 +808,8 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { + randomUtilizationThresholdPercent + ", write threads: " + randomNumberOfWritePoolThreads - + ", individual shard write loads: " - + randomShardWriteLoad + + ", maximum shard write load: " + + maximumShardWriteLoad ); /** @@ -775,7 +827,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { // Calculate the maximum utilization a node can report while still being able to accept all relocating shards int shardWriteLoadOverhead = shardLoadUtilizationOverhead( - randomShardWriteLoad * randomNumberOfShards, + maximumShardWriteLoad * randomNumberOfShards, randomNumberOfWritePoolThreads ); int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1; @@ -819,7 +871,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { randomUtilizationThresholdPercent, randomNumberOfWritePoolThreads, randomQueueLatencyThresholdMillis, - randomShardWriteLoad, + maximumShardWriteLoad, indexName, randomNumberOfShards, maxUtilBelowThresholdThatAllowsAllShardsToRelocate @@ -842,7 +894,7 @@ record TestHarness( int randomUtilizationThresholdPercent, int randomNumberOfWritePoolThreads, long randomQueueLatencyThresholdMillis, - float randomShardWriteLoad, + float maxShardWriteLoad, String indexName, int randomNumberOfShards, int maxUtilBelowThresholdThatAllowsAllShardsToRelocate diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 4f4503207412e..fe42a3dfe7ecd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -50,10 +50,12 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize; @@ -794,7 +796,7 @@ protected int comparePivot(int j) { } /** - * Move started shards that can not be allocated to a node anymore + * Move started shards that cannot be allocated to a node anymore, or are in a non-preferred allocation * * For each shard to be moved this function executes a move operation * to the minimal eligible node with respect to the @@ -805,40 +807,88 @@ protected int comparePivot(int j) { */ public boolean moveShards() { boolean shardMoved = false; + final BestShardMovementsTracker bestNonPreferredShardMovementsTracker = new BestShardMovementsTracker(); // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { - ShardRouting shardRouting = it.next(); - ProjectIndex index = projectIndex(shardRouting); - final MoveDecision moveDecision = decideMove(index, shardRouting); + final ShardRouting shardRouting = it.next(); + final ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMove( + index, + shardRouting, + bestNonPreferredShardMovementsTracker::shardIsBetterThanCurrent + ); if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); - sourceNode.removeShard(index, shardRouting); - Tuple relocatingShards = routingNodes.relocateShard( - shardRouting, - targetNode.getNodeId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - "move", - allocation.changes() - ); - final ShardRouting shard = relocatingShards.v2(); - targetNode.addShard(projectIndex(shard), shard); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); - } - shardMoved = true; - if (completeEarlyOnShardAssignmentChange) { - return true; + // Defer moving of not-preferred until we've moved the NOs + if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) { + bestNonPreferredShardMovementsTracker.putBestMoveDecision(shardRouting, moveDecision); + } else { + executeMove(shardRouting, index, moveDecision, "move"); + if (completeEarlyOnShardAssignmentChange) { + return true; + } + shardMoved = true; } } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } } + + // If we get here, attempt to move one of the best not-preferred shards that we identified earlier + for (var storedShardMovement : bestNonPreferredShardMovementsTracker.getBestShardMovements()) { + final var shardRouting = storedShardMovement.shardRouting(); + final var index = projectIndex(shardRouting); + // If `shardMoved` is true, there may have been moves that have made our previous move decision + // invalid, so we must call `decideMove` again. If not, we know we haven't made any moves, and we + // can use the cached decision. + final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision(); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + executeMove(shardRouting, index, moveDecision, "move-non-preferred"); + // Return after a single move so that the change can be simulated before further moves are made. + return true; + } else { + logger.trace("[{}][{}] can no longer move (not-preferred)", shardRouting.index(), shardRouting.id()); + } + } + return shardMoved; } + private void executeMove(ShardRouting shardRouting, ProjectIndex index, MoveDecision moveDecision, String reason) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + final Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + reason, + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + } + + /** + * Makes a decision on whether to move a started shard to another node. + *

+ * This overload will always search for relocation targets for {@link Decision#NOT_PREFERRED} + * allocations. + * + * @see #decideMove(ProjectIndex, ShardRouting, Predicate) + * @param index The index that the shard being considered belongs to + * @param shardRouting The shard routing being considered for movement + * @return The {@link MoveDecision} for the shard + */ + public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { + // Always assess options for non-preferred allocations + return decideMove(index, shardRouting, ignored -> true); + } + /** * Makes a decision on whether to move a started shard to another node. The following rules apply * to the {@link MoveDecision} return object: @@ -846,12 +896,21 @@ public boolean moveShards() { * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be - * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then + * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} returns {@code true}, then * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then * {@link MoveDecision#getNodeDecisions} will have a non-null value. + * + * @param index The index that the shard being considered belongs to + * @param shardRouting The shard routing being considered for movement + * @param nonPreferredPredicate A predicate applied to assignments for which + * {@link AllocationDeciders#canRemain(ShardRouting, RoutingNode, RoutingAllocation)} returns + * {@link Type#NOT_PREFERRED}. If the predicate returns true, a search for relocation targets will be + * performed, if it returns false no search will be performed and {@link MoveDecision#NOT_TAKEN} will + * be returned. + * @return The {@link MoveDecision} for the shard */ - public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { + private MoveDecision decideMove(ProjectIndex index, ShardRouting shardRouting, Predicate nonPreferredPredicate) { NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); index.assertMatch(shardRouting); @@ -868,6 +927,11 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar return MoveDecision.remain(canRemain); } + // Check predicate to decide whether to assess movement options + if (canRemain.type() == Type.NOT_PREFERRED && nonPreferredPredicate.test(shardRouting) == false) { + return MoveDecision.NOT_TAKEN; + } + sorter.reset(index); /* * the sorter holds the minimum weight node first for the shards index. @@ -875,7 +939,7 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar * This is not guaranteed to be balanced after this operation we still try best effort to * allocate on the minimal eligible node. */ - MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); + final MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); if (shardsOnReplacedNode) { @@ -935,6 +999,146 @@ private MoveDecision decideMove( ); } + /** + * Keeps track of the single "best" shard movement we could make from each node, as scored by + * the {@link PrioritiseByShardWriteLoadComparator}. Provides a utility for checking if + * a proposed movement is "better" than the current best for that node. + */ + private class BestShardMovementsTracker { + + public record StoredShardMovement(ShardRouting shardRouting, MoveDecision moveDecision) {} + + // LinkedHashMap so we iterate in insertion order + private final Map bestShardMovementsByNode = new LinkedHashMap<>(); + private final Map comparatorCache = new HashMap<>(); + + /** + * Is the provided {@link ShardRouting} potentially a better shard to move than the one + * we currently have stored for this node? + * + * @param shardRouting The shard routing being considered for movement + * @return true if the shard is more desirable to move that the current one we stored for this node, false otherwise. + */ + public boolean shardIsBetterThanCurrent(ShardRouting shardRouting) { + final var currentShardForNode = bestShardMovementsByNode.get(shardRouting.currentNodeId()); + if (currentShardForNode == null) { + return true; + } + int comparison = comparatorCache.computeIfAbsent( + shardRouting.currentNodeId(), + nodeId -> new PrioritiseByShardWriteLoadComparator(allocation.clusterInfo(), allocation.routingNodes().node(nodeId)) + ).compare(shardRouting, currentShardForNode.shardRouting()); + // Ignore inferior non-preferred moves + return comparison < 0; + } + + public void putBestMoveDecision(ShardRouting shardRouting, MoveDecision moveDecision) { + bestShardMovementsByNode.put(shardRouting.currentNodeId(), new StoredShardMovement(shardRouting, moveDecision)); + } + + public Iterable getBestShardMovements() { + return bestShardMovementsByNode.values(); + } + } + + /** + * Sorts shards by desirability to move, sort order goes: + *

    + *
  1. Shards with write-load in {@link PrioritiseByShardWriteLoadComparator#threshold} → + * {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode} (exclusive)
  2. + *
  3. Shards with write-load in {@link PrioritiseByShardWriteLoadComparator#threshold} → 0
  4. + *
  5. Shards with write-load == {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode}
  6. + *
  7. Shards with missing write-load
  8. + *
+ * + * e.g., for any two ShardRoutings, r1 and r2, + *
    + *
  • compare(r1, r2) > 0 when r2 is more desirable to move
  • + *
  • compare(r1, r2) == 0 when the two shards are equally desirable to move
  • + *
  • compare(r1, r2) < 0 when r1 is more desirable to move
  • + *
+ */ + // Visible for testing + public static class PrioritiseByShardWriteLoadComparator implements Comparator { + + /** + * This is the threshold over which we consider shards to have a "high" write load represented + * as a ratio of the maximum write-load present on the node. + *

+ * We prefer to move shards that have a write-load close to this value x {@link #maxWriteLoadOnNode}. + */ + public static final double THRESHOLD_RATIO = 0.5; + private static final double MISSING_WRITE_LOAD = -1; + private final Map shardWriteLoads; + private final double maxWriteLoadOnNode; + private final double threshold; + private final String nodeId; + + public PrioritiseByShardWriteLoadComparator(ClusterInfo clusterInfo, RoutingNode routingNode) { + shardWriteLoads = clusterInfo.getShardWriteLoads(); + double maxWriteLoadOnNode = MISSING_WRITE_LOAD; + for (ShardRouting shardRouting : routingNode) { + maxWriteLoadOnNode = Math.max( + maxWriteLoadOnNode, + shardWriteLoads.getOrDefault(shardRouting.shardId(), MISSING_WRITE_LOAD) + ); + } + this.maxWriteLoadOnNode = maxWriteLoadOnNode; + threshold = maxWriteLoadOnNode * THRESHOLD_RATIO; + nodeId = routingNode.nodeId(); + } + + @Override + public int compare(ShardRouting lhs, ShardRouting rhs) { + assert nodeId.equals(lhs.currentNodeId()) && nodeId.equals(rhs.currentNodeId()) + : this.getClass().getSimpleName() + + " is node-specific. comparator=" + + nodeId + + ", lhs=" + + lhs.currentNodeId() + + ", rhs=" + + rhs.currentNodeId(); + + // If we have no shard write-load data, shortcut + if (maxWriteLoadOnNode == MISSING_WRITE_LOAD) { + return 0; + } + + final double lhsWriteLoad = shardWriteLoads.getOrDefault(lhs.shardId(), MISSING_WRITE_LOAD); + final double rhsWriteLoad = shardWriteLoads.getOrDefault(rhs.shardId(), MISSING_WRITE_LOAD); + + // prefer any known write-load over any unknown write-load + final var rhsIsMissing = rhsWriteLoad == MISSING_WRITE_LOAD; + final var lhsIsMissing = lhsWriteLoad == MISSING_WRITE_LOAD; + if (rhsIsMissing && lhsIsMissing) { + return 0; + } + if (rhsIsMissing ^ lhsIsMissing) { + return lhsIsMissing ? 1 : -1; + } + + if (lhsWriteLoad < maxWriteLoadOnNode && rhsWriteLoad < maxWriteLoadOnNode) { + final var lhsOverThreshold = lhsWriteLoad >= threshold; + final var rhsOverThreshold = rhsWriteLoad >= threshold; + if (lhsOverThreshold && rhsOverThreshold) { + // Both values between threshold and maximum, prefer lowest + return Double.compare(lhsWriteLoad, rhsWriteLoad); + } else if (lhsOverThreshold) { + // lhs between threshold and maximum, rhs below threshold, prefer lhs + return -1; + } else if (rhsOverThreshold) { + // lhs below threshold, rhs between threshold and maximum, prefer rhs + return 1; + } + // Both values below the threshold, prefer highest + return Double.compare(rhsWriteLoad, lhsWriteLoad); + } + + // prefer the non-max write load if there is one + return Double.compare(lhsWriteLoad, rhsWriteLoad); + } + } + private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { // don't use canRebalance as we want hard filtering rules to apply. See #17698 return allocation.deciders().canAllocate(shardRouting, target, allocation); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 2bacd38325f41..38dba2acf3250 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; @@ -35,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -51,7 +53,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.hamcrest.Matchers; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -59,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.DoubleSupplier; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; @@ -71,6 +76,7 @@ import static java.util.stream.Collectors.toSet; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; +import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator.THRESHOLD_RATIO; import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING; import static org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction.getIndexDiskUsageInBytes; import static org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS; @@ -78,14 +84,19 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class BalancedShardsAllocatorTests extends ESAllocationTestCase { + private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() { + }; private static final Settings WITH_DISK_BALANCING = Settings.builder().put(DISK_USAGE_BALANCE_FACTOR_SETTING.getKey(), "1e-9").build(); public void testDecideShardAllocation() { @@ -681,11 +692,12 @@ public void testPartitionedClusterWithSeparateWeights() { } public void testReturnEarlyOnShardAssignmentChanges() { + var shardWriteLoads = new HashMap(); var allocationService = new MockAllocationService( prefixAllocationDeciders(), new TestGatewayAllocator(), new BalancedShardsAllocator(BalancerSettings.DEFAULT, TEST_WRITE_LOAD_FORECASTER), - EmptyClusterInfoService.INSTANCE, + () -> ClusterInfo.builder().shardWriteLoads(shardWriteLoads).build(), SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES ); @@ -731,6 +743,45 @@ public void testReturnEarlyOnShardAssignmentChanges() { ) ); + // A started index with a non-preferred allocation (low write load) + final double notPreferredLowerWriteLoad = randomDoubleBetween(0.0, 5.0, true); + final IndexMetadata notPreferredAllocation = anIndex("large-not-preferred-allocation", indexSettings(IndexVersion.current(), 1, 0)) + .putInSyncAllocationIds(0, Set.of(UUIDs.randomBase64UUID())) + .build(); + projectMetadataBuilder.put(notPreferredAllocation, false); + routingTableBuilder.add( + IndexRoutingTable.builder(notPreferredAllocation.getIndex()) + .addShard( + shardRoutingBuilder(notPreferredAllocation.getIndex().getName(), 0, "large-1", true, ShardRoutingState.STARTED) + .withAllocationId(AllocationId.newInitializing(notPreferredAllocation.inSyncAllocationIds(0).iterator().next())) + .build() + ) + ); + shardWriteLoads.put(new ShardId(notPreferredAllocation.getIndex(), 0), notPreferredLowerWriteLoad); + + // A started index with a non-preferred allocation (max write load) + final double notPreferredMaxWriteLoad = randomDoubleBetween(notPreferredLowerWriteLoad, 15.0, true); + final IndexMetadata notPreferredAllocationMaxWriteLoad = anIndex( + "large-not-preferred-allocation-max-write-load", + indexSettings(IndexVersion.current(), 1, 0) + ).putInSyncAllocationIds(0, Set.of(UUIDs.randomBase64UUID())).build(); + projectMetadataBuilder.put(notPreferredAllocationMaxWriteLoad, false); + routingTableBuilder.add( + IndexRoutingTable.builder(notPreferredAllocationMaxWriteLoad.getIndex()) + .addShard( + shardRoutingBuilder( + notPreferredAllocationMaxWriteLoad.getIndex().getName(), + 0, + "large-1", + true, + ShardRoutingState.STARTED + ).withAllocationId( + AllocationId.newInitializing(notPreferredAllocationMaxWriteLoad.inSyncAllocationIds(0).iterator().next()) + ).build() + ) + ); + shardWriteLoads.put(new ShardId(notPreferredAllocationMaxWriteLoad.getIndex(), 0), notPreferredMaxWriteLoad); + // Indices with unbalanced weight of write loads final var numWriteLoadIndices = between(3, 5); for (int i = 0; i < numWriteLoadIndices; i++) { @@ -768,8 +819,22 @@ public void testReturnEarlyOnShardAssignmentChanges() { assertTrue("unexpected shard state: " + replicaShard, replicaShard.initializing()); // Undesired allocation is not moved because allocate call returns early - final var shard = routingTable.shardRoutingTable(undesiredAllocation.getIndex().getName(), 0).primaryShard(); - assertTrue("unexpected shard state: " + shard, shard.started()); + final var undesiredShard = routingTable.shardRoutingTable(undesiredAllocation.getIndex().getName(), 0).primaryShard(); + assertTrue("unexpected shard state: " + undesiredShard, undesiredShard.started()); + + // Not-preferred shard is not moved because allocate call returns early + final var notPreferredShard = routingTable.shardRoutingTable(notPreferredAllocation.getIndex().getName(), 0).primaryShard(); + assertFalse("unexpected shard state: " + notPreferredShard, notPreferredShard.relocating()); + + // Not-preferred (max-write-load) shard is not moved because allocate call returns early + final var notPreferredAllocationMaxWriteLoadShard = routingTable.shardRoutingTable( + notPreferredAllocationMaxWriteLoad.getIndex().getName(), + 0 + ).primaryShard(); + assertFalse( + "unexpected shard state: " + notPreferredAllocationMaxWriteLoadShard, + notPreferredAllocationMaxWriteLoadShard.relocating() + ); // Also no rebalancing for indices with unbalanced write loads due to returning early for (int i = 0; i < numWriteLoadIndices; i++) { @@ -783,8 +848,22 @@ public void testReturnEarlyOnShardAssignmentChanges() { { // Undesired allocation is now relocating final RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT); - final var shard = routingTable.shardRoutingTable(undesiredAllocation.getIndex().getName(), 0).primaryShard(); - assertTrue("unexpected shard state: " + shard, shard.relocating()); + final var undesiredShard = routingTable.shardRoutingTable(undesiredAllocation.getIndex().getName(), 0).primaryShard(); + assertTrue("unexpected shard state: " + undesiredShard, undesiredShard.relocating()); + + // Not-preferred shard is not moved because allocate call returns early + final var notPreferredShard = routingTable.shardRoutingTable(notPreferredAllocation.getIndex().getName(), 0).primaryShard(); + assertFalse("unexpected shard state: " + notPreferredShard, notPreferredShard.relocating()); + + // Not-preferred (max-write-load) shard is not moved because allocate call returns early + final var notPreferredAllocationMaxWriteLoadShard = routingTable.shardRoutingTable( + notPreferredAllocationMaxWriteLoad.getIndex().getName(), + 0 + ).primaryShard(); + assertFalse( + "unexpected shard state: " + notPreferredAllocationMaxWriteLoadShard, + notPreferredAllocationMaxWriteLoadShard.relocating() + ); // Still no rebalancing for indices with unbalanced write loads due to returning early for (int i = 0; i < numWriteLoadIndices; i++) { @@ -795,6 +874,47 @@ public void testReturnEarlyOnShardAssignmentChanges() { // Third reroute clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + { + // Not-preferred allocation is now relocating + final RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT); + final var notPreferredShard = routingTable.shardRoutingTable(notPreferredAllocation.getIndex().getName(), 0).primaryShard(); + assertTrue("unexpected shard state: " + notPreferredShard, notPreferredShard.relocating()); + + // Not-preferred (max-write-load) shard is not moved because allocate call returns early + final var notPreferredAllocationMaxWriteLoadShard = routingTable.shardRoutingTable( + notPreferredAllocationMaxWriteLoad.getIndex().getName(), + 0 + ).primaryShard(); + assertFalse( + "unexpected shard state: " + notPreferredAllocationMaxWriteLoadShard, + notPreferredAllocationMaxWriteLoadShard.relocating() + ); + + // Still no rebalancing for indices with unbalanced write loads due to returning early + for (int i = 0; i < numWriteLoadIndices; i++) { + final var writeLoadShard = routingTable.shardRoutingTable("large-write-load-" + i, 0).primaryShard(); + assertTrue("unexpected shard state: " + writeLoadShard, writeLoadShard.started()); + } + } + + // Fourth reroute + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + { + // Not-preferred (max-write-load) allocation is now relocating + final RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT); + final var notPreferredShard = routingTable.shardRoutingTable(notPreferredAllocationMaxWriteLoad.getIndex().getName(), 0) + .primaryShard(); + assertTrue("unexpected shard state: " + notPreferredShard, notPreferredShard.relocating()); + + // Still no rebalancing for indices with unbalanced write loads due to returning early + for (int i = 0; i < numWriteLoadIndices; i++) { + final var writeLoadShard = routingTable.shardRoutingTable("large-write-load-" + i, 0).primaryShard(); + assertTrue("unexpected shard state: " + writeLoadShard, writeLoadShard.started()); + } + } + + // Fifth reroute + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); { // Rebalance should happen for one and only one of the indices with unbalanced write loads due to returning early final RoutingTable routingTable = clusterState.routingTable(ProjectId.DEFAULT); @@ -809,6 +929,129 @@ public void testReturnEarlyOnShardAssignmentChanges() { applyStartedShardsUntilNoChange(clusterState, allocationService); } + /** + * Test for {@link PrioritiseByShardWriteLoadComparator}. See Comparator Javadoc for expected + * ordering. + */ + public void testShardMovementPriorityComparator() { + final double maxWriteLoad = randomDoubleBetween(0.0, 100.0, true); + final double writeLoadThreshold = maxWriteLoad * THRESHOLD_RATIO; + final int numberOfShardsWithMaxWriteLoad = between(1, 5); + final int numberOfShardsWithWriteLoadBetweenThresholdAndMax = between(0, 50); + final int numberOfShardsWithWriteLoadBelowThreshold = between(0, 50); + final int numberOfShardsWithNoWriteLoad = between(0, 50); + final int totalShards = numberOfShardsWithMaxWriteLoad + numberOfShardsWithWriteLoadBetweenThresholdAndMax + + numberOfShardsWithWriteLoadBelowThreshold + numberOfShardsWithNoWriteLoad; + + // We create single-shard indices for simplicity's sake and to make it clear the shards are independent of each other + final var indices = new ArrayList(); + for (int i = 0; i < totalShards; i++) { + indices.add(anIndex("index-" + i).numberOfShards(1).numberOfReplicas(0)); + } + + final var nodeId = randomIdentifier(); + final var clusterState = createStateWithIndices(List.of(nodeId), shardId -> nodeId, indices.toArray(IndexMetadata.Builder[]::new)); + + final var allShards = clusterState.routingTable(ProjectId.DEFAULT).allShards().collect(toSet()); + final var shardWriteLoads = new HashMap(); + addRandomWriteLoadAndRemoveShard(shardWriteLoads, allShards, numberOfShardsWithMaxWriteLoad, () -> maxWriteLoad); + addRandomWriteLoadAndRemoveShard( + shardWriteLoads, + allShards, + numberOfShardsWithWriteLoadBetweenThresholdAndMax, + () -> randomDoubleBetween(writeLoadThreshold, maxWriteLoad, true) + ); + addRandomWriteLoadAndRemoveShard( + shardWriteLoads, + allShards, + numberOfShardsWithWriteLoadBelowThreshold, + () -> randomDoubleBetween(0, writeLoadThreshold, true) + ); + assertThat(allShards, hasSize(numberOfShardsWithNoWriteLoad)); + + final var allocation = new RoutingAllocation( + new AllocationDeciders(List.of()), + clusterState, + ClusterInfo.builder().shardWriteLoads(shardWriteLoads).build(), + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES.snapshotShardSizes(), + System.nanoTime() + ); + + // Assign all shards to node + final var allocatedRoutingNodes = allocation.routingNodes().mutableCopy(); + for (ShardRouting shardRouting : allocatedRoutingNodes.unassigned()) { + allocatedRoutingNodes.initializeShard(shardRouting, nodeId, null, randomNonNegativeLong(), NOOP); + } + + final var comparator = new PrioritiseByShardWriteLoadComparator(allocation.clusterInfo(), allocatedRoutingNodes.node(nodeId)); + + logger.info("--> testing shard movement priority comparator, maxValue={}, threshold={}", maxWriteLoad, writeLoadThreshold); + var sortedShards = allocatedRoutingNodes.getAssignedShards().values().stream().flatMap(List::stream).sorted(comparator).toList(); + + for (ShardRouting shardRouting : sortedShards) { + logger.info("--> {}: {}", shardRouting.shardId(), shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)); + } + + double lastWriteLoad = 0.0; + int currentIndex = 0; + + logger.info("--> expecting {} between threshold and max in ascending order", numberOfShardsWithWriteLoadBetweenThresholdAndMax); + for (int i = 0; i < numberOfShardsWithWriteLoadBetweenThresholdAndMax; i++) { + final var currentShardId = sortedShards.get(currentIndex++).shardId(); + assertThat(shardWriteLoads, Matchers.hasKey(currentShardId)); + final double currentWriteLoad = shardWriteLoads.get(currentShardId); + if (i == 0) { + lastWriteLoad = currentWriteLoad; + } else { + assertThat(currentWriteLoad, greaterThanOrEqualTo(lastWriteLoad)); + } + } + logger.info("--> expecting {} below threshold in descending order", numberOfShardsWithWriteLoadBelowThreshold); + for (int i = 0; i < numberOfShardsWithWriteLoadBelowThreshold; i++) { + final var currentShardId = sortedShards.get(currentIndex++).shardId(); + assertThat(shardWriteLoads, Matchers.hasKey(currentShardId)); + final double currentWriteLoad = shardWriteLoads.get(currentShardId); + if (i == 0) { + lastWriteLoad = currentWriteLoad; + } else { + assertThat(currentWriteLoad, lessThanOrEqualTo(lastWriteLoad)); + } + } + logger.info("--> expecting {} at max", numberOfShardsWithMaxWriteLoad); + for (int i = 0; i < numberOfShardsWithMaxWriteLoad; i++) { + final var currentShardId = sortedShards.get(currentIndex++).shardId(); + assertThat(shardWriteLoads, Matchers.hasKey(currentShardId)); + final double currentWriteLoad = shardWriteLoads.get(currentShardId); + assertThat(currentWriteLoad, equalTo(maxWriteLoad)); + } + logger.info("--> expecting {} missing", numberOfShardsWithNoWriteLoad); + for (int i = 0; i < numberOfShardsWithNoWriteLoad; i++) { + final var currentShardId = sortedShards.get(currentIndex++); + assertThat(shardWriteLoads, Matchers.not(Matchers.hasKey(currentShardId.shardId()))); + } + } + + /** + * Randomly select a shard and add a random write-load for it + * + * @param shardWriteLoads The map of shards to write-loads, this will be added to + * @param shards The set of shards to select from, selected shards will be removed from this set + * @param count The number of shards to generate write loads for + * @param writeLoadSupplier The supplier of random write loads to use + */ + private void addRandomWriteLoadAndRemoveShard( + Map shardWriteLoads, + Set shards, + int count, + DoubleSupplier writeLoadSupplier + ) { + for (int i = 0; i < count; i++) { + final var shardRouting = randomFrom(shards); + shardWriteLoads.put(shardRouting.shardId(), writeLoadSupplier.getAsDouble()); + shards.remove(shardRouting); + } + } + private Map getTargetShardPerNodeCount(IndexRoutingTable indexRoutingTable) { var counts = new HashMap(); for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { @@ -850,7 +1093,7 @@ private static ClusterState createStateWithIndices(IndexMetadata.Builder... inde private static ClusterState createStateWithIndices( List nodeNames, - Function unbalancedAllocator, + Function shardAllocator, IndexMetadata.Builder... indexMetadataBuilders ) { var metadataBuilder = Metadata.builder(); @@ -873,9 +1116,9 @@ private static ClusterState createStateWithIndices( routingTableBuilder.add( IndexRoutingTable.builder(indexMetadata.getIndex()) .addShard( - shardRoutingBuilder(shardId, unbalancedAllocator.apply(shardId), true, ShardRoutingState.STARTED) - .withAllocationId(AllocationId.newInitializing(inSyncId)) - .build() + shardRoutingBuilder(shardId, shardAllocator.apply(shardId), true, ShardRoutingState.STARTED).withAllocationId( + AllocationId.newInitializing(inSyncId) + ).build() ) ); } @@ -989,32 +1232,66 @@ public BalancedShardsAllocator.NodeSorter sorterForShard(ShardRouting shard) { } /** - * Allocation deciders that only allow shards to be allocated to nodes whose names share the same prefix + * Allocation deciders that trigger movements based on specific index names + * + * @see MoveNotPreferredOnceDecider + * @see PrefixAllocationDecider + */ + private static AllocationDeciders prefixAllocationDeciders() { + return new AllocationDeciders( + List.of( + new PrefixAllocationDecider(), + new MoveNotPreferredOnceDecider(), + new SameShardAllocationDecider(ClusterSettings.createBuiltInClusterSettings()) + ) + ); + } + + /** + * Allocation decider that only allow shards to be allocated to nodes whose names share the same prefix * as the index they're from */ - private AllocationDeciders prefixAllocationDeciders() { - return new AllocationDeciders(List.of(new AllocationDecider() { - @Override - public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return nodePrefixMatchesIndexPrefix(shardRouting, node); - } + private static class PrefixAllocationDecider extends AllocationDecider { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return nodePrefixMatchesIndexPrefix(shardRouting, node); + } - @Override - public Decision canRemain( - IndexMetadata indexMetadata, - ShardRouting shardRouting, - RoutingNode node, - RoutingAllocation allocation - ) { - return nodePrefixMatchesIndexPrefix(shardRouting, node); - } + @Override + public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return nodePrefixMatchesIndexPrefix(shardRouting, node); + } + + private Decision nodePrefixMatchesIndexPrefix(ShardRouting shardRouting, RoutingNode node) { + var indexPrefix = prefix(shardRouting.index().getName()); + var nodePrefix = prefix(node.node().getId()); + return nodePrefix.equals(indexPrefix) ? Decision.YES : Decision.NO; + } + } + + /** + * Returns {@link Decision#NOT_PREFERRED} from {@link #canRemain(IndexMetadata, ShardRouting, RoutingNode, RoutingAllocation)} for + * any shard with 'not-preferred' in the index name. Once the shard has been moved to a different node, {@link Decision#YES} is + * returned. + */ + private static class MoveNotPreferredOnceDecider extends AllocationDecider { + + private Map originalNodes = new HashMap<>(); - private Decision nodePrefixMatchesIndexPrefix(ShardRouting shardRouting, RoutingNode node) { - var indexPrefix = prefix(shardRouting.index().getName()); - var nodePrefix = prefix(node.node().getId()); - return nodePrefix.equals(indexPrefix) ? Decision.YES : Decision.NO; + @Override + public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (shardRouting.currentNodeId() == null) { + return Decision.YES; } - }, new SameShardAllocationDecider(ClusterSettings.createBuiltInClusterSettings()))); + if (shardRouting.index().getName().contains("not-preferred") == false) { + return Decision.YES; + } + if (originalNodes.containsKey(shardRouting.shardId()) == false) { + // Remember where we first saw it + originalNodes.put(shardRouting.shardId(), shardRouting.currentNodeId()); + } + return shardRouting.currentNodeId().equals(originalNodes.get(shardRouting.shardId())) ? Decision.NOT_PREFERRED : Decision.YES; + } } private static String prefix(String value) {