diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 38be34c0ff610..a4edc9fe28b74 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -30,12 +30,12 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.time.TimeProvider; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.PriorityComparator; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; import java.util.Iterator; @@ -83,17 +83,16 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); + private final UndesiredAllocationsTracker undesiredAllocationsTracker; - public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { - this.undesiredAllocationLogInterval = new FrequencyCappedAction( - threadPool.relativeTimeInMillisSupplier(), - TimeValue.timeValueMinutes(5) - ); + public DesiredBalanceReconciler(ClusterSettings clusterSettings, TimeProvider timeProvider) { + this.undesiredAllocationLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, TimeValue.timeValueMinutes(5)); clusterSettings.initializeAndWatch(UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, this.undesiredAllocationLogInterval::setMinInterval); clusterSettings.initializeAndWatch( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, value -> this.undesiredAllocationsLogThreshold = value ); + this.undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, timeProvider); } /** @@ -114,6 +113,7 @@ public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBal public void clear() { allocationOrdering.clear(); moveOrdering.clear(); + undesiredAllocationsTracker.clear(); } /** @@ -134,6 +134,7 @@ private class Reconciliation { } DesiredBalanceMetrics.AllocationStats run() { + undesiredAllocationsTracker.cleanup(routingNodes); try (var ignored = allocation.withReconcilingFlag()) { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); @@ -493,35 +494,51 @@ private void moveShards() { if (assignment.nodeIds().contains(shardRouting.currentNodeId())) { // shard is already on a desired node + undesiredAllocationsTracker.removeTracking(shardRouting); continue; } - if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { - // cannot allocate anywhere, no point in looking for a target node - continue; - } + boolean movedUndesiredShard = false; + try { + if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { + // cannot allocate anywhere, no point in looking for a target node + continue; + } - final var routingNode = routingNodes.node(shardRouting.currentNodeId()); - final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { - // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. - // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided - // how to handle the situation. - continue; - } + final var routingNode = routingNodes.node(shardRouting.currentNodeId()); + final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) { + // If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone. + // Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already + // decided how to handle the situation. + continue; + } - final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds()); - if (moveTarget != null) { - logger.debug("Moving shard {} from {} to {}", shardRouting.shardId(), shardRouting.currentNodeId(), moveTarget.getId()); - routingNodes.relocateShard( - shardRouting, - moveTarget.getId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - "move", - allocation.changes() - ); - iterator.dePrioritizeNode(shardRouting.currentNodeId()); - moveOrdering.recordAllocation(shardRouting.currentNodeId()); + final var moveTarget = findRelocationTarget(shardRouting, assignment.nodeIds()); + if (moveTarget != null) { + logger.debug( + "Moving shard {} from {} to {}", + shardRouting.shardId(), + shardRouting.currentNodeId(), + moveTarget.getId() + ); + routingNodes.relocateShard( + shardRouting, + moveTarget.getId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "move", + allocation.changes() + ); + iterator.dePrioritizeNode(shardRouting.currentNodeId()); + moveOrdering.recordAllocation(shardRouting.currentNodeId()); + movedUndesiredShard = true; + } + } finally { + if (movedUndesiredShard) { + undesiredAllocationsTracker.removeTracking(shardRouting); + } else { + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + } } } } @@ -555,51 +572,63 @@ private DesiredBalanceMetrics.AllocationStats balance() { if (assignment.nodeIds().contains(shardRouting.currentNodeId())) { // shard is already on a desired node + undesiredAllocationsTracker.removeTracking(shardRouting); continue; } - if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { - // shard is not on a shutting down node, nor is it on a desired node per the previous check. - undesiredAllocationsExcludingShuttingDownNodes++; - undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1); - } - - if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { - // Rebalancing is disabled, we're just here to collect the AllocationStats to return. - continue; - } + boolean movedUndesiredShard = false; + try { + if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { + // shard is not on a shutting down node, nor is it on a desired node per the previous check. + undesiredAllocationsExcludingShuttingDownNodes++; + undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1); + } - if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { - // rebalancing disabled for this shard - continue; - } + if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { + // Rebalancing is disabled, we're just here to collect the AllocationStats to return. + continue; + } - if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { - // cannot allocate anywhere, no point in looking for a target node - continue; - } + if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { + // rebalancing disabled for this shard + continue; + } - final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate); - if (rebalanceTarget != null) { - logger.debug( - "Rebalancing shard {} from {} to {}", - shardRouting.shardId(), - shardRouting.currentNodeId(), - rebalanceTarget.getId() - ); + if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { + // cannot allocate anywhere, no point in looking for a target node + continue; + } - routingNodes.relocateShard( - shardRouting, - rebalanceTarget.getId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - "rebalance", - allocation.changes() - ); - iterator.dePrioritizeNode(shardRouting.currentNodeId()); - moveOrdering.recordAllocation(shardRouting.currentNodeId()); + final var rebalanceTarget = findRelocationTarget(shardRouting, assignment.nodeIds(), this::decideCanAllocate); + if (rebalanceTarget != null) { + logger.debug( + "Rebalancing shard {} from {} to {}", + shardRouting.shardId(), + shardRouting.currentNodeId(), + rebalanceTarget.getId() + ); + + routingNodes.relocateShard( + shardRouting, + rebalanceTarget.getId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "rebalance", + allocation.changes() + ); + iterator.dePrioritizeNode(shardRouting.currentNodeId()); + moveOrdering.recordAllocation(shardRouting.currentNodeId()); + movedUndesiredShard = true; + } + } finally { + if (movedUndesiredShard) { + undesiredAllocationsTracker.removeTracking(shardRouting); + } else { + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + } } } + undesiredAllocationsTracker.maybeLogUndesiredShardsWarning(routingNodes, allocation, desiredBalance); maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); return new DesiredBalanceMetrics.AllocationStats( unassignedShards, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTracker.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTracker.java new file mode 100644 index 0000000000000..804f580a22ede --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTracker.java @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.FrequencyCappedAction; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.time.TimeProvider; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.TimeValue.ONE_MINUTE; +import static org.elasticsearch.core.TimeValue.ZERO; +import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.elasticsearch.core.TimeValue.timeValueMinutes; + +/** + * Keeps track of a limited number of shards that are currently in undesired allocations. If the + * shards remain in undesired allocations for longer than a configurable threshold, it will log + * why those shards can't be allocated to desired nodes. + */ +public class UndesiredAllocationsTracker { + + private static final Logger logger = LogManager.getLogger(UndesiredAllocationsTracker.class); + + private static final TimeValue FIVE_MINUTES = timeValueMinutes(5); + + /** + * Warning logs will be periodically written if we see a shard that's been in an undesired allocation for this long + */ + public static final Setting UNDESIRED_ALLOCATION_DURATION_LOG_THRESHOLD_SETTING = Setting.timeSetting( + "cluster.routing.allocation.desired_balance.undesired_duration_logging.threshold", + FIVE_MINUTES, + ONE_MINUTE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * The minimum amount of time between warnings about persistent undesired allocations + */ + public static final Setting UNDESIRED_ALLOCATION_DURATION_LOG_INTERVAL_SETTING = Setting.timeSetting( + "cluster.routing.allocation.desired_balance.undesired_duration_logging.interval", + FIVE_MINUTES, + ONE_MINUTE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * The maximum number of undesired allocations to track. We expect this to be relatively small. + */ + public static final Setting MAX_UNDESIRED_ALLOCATIONS_TO_TRACK = Setting.intSetting( + "cluster.routing.allocation.desired_balance.undesired_duration_logging.max_to_track", + 0, + 0, + 100, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final TimeProvider timeProvider; + private final LinkedHashMap undesiredAllocations = new LinkedHashMap<>(); + private final FrequencyCappedAction undesiredAllocationDurationLogInterval; + private volatile TimeValue undesiredAllocationDurationLoggingThreshold; + private volatile int maxUndesiredAllocationsToTrack; + + UndesiredAllocationsTracker(ClusterSettings clusterSettings, TimeProvider timeProvider) { + this.timeProvider = timeProvider; + this.undesiredAllocationDurationLogInterval = new FrequencyCappedAction(timeProvider::relativeTimeInMillis, ZERO); + clusterSettings.initializeAndWatch( + UNDESIRED_ALLOCATION_DURATION_LOG_INTERVAL_SETTING, + undesiredAllocationDurationLogInterval::setMinInterval + ); + clusterSettings.initializeAndWatch( + UNDESIRED_ALLOCATION_DURATION_LOG_THRESHOLD_SETTING, + value -> undesiredAllocationDurationLoggingThreshold = value + ); + clusterSettings.initializeAndWatch(MAX_UNDESIRED_ALLOCATIONS_TO_TRACK, value -> this.maxUndesiredAllocationsToTrack = value); + } + + /** + * Track an allocation as being undesired + */ + public void trackUndesiredAllocation(ShardRouting shardRouting) { + assert shardRouting.unassigned() == false : "Shouldn't record unassigned shards as undesired allocations"; + if (undesiredAllocations.size() < maxUndesiredAllocationsToTrack) { + final var allocationId = shardRouting.allocationId().getId(); + if (undesiredAllocations.containsKey(allocationId) == false) { + undesiredAllocations.put( + allocationId, + new UndesiredAllocation(shardRouting.shardId(), timeProvider.relativeTimeInMillis()) + ); + } + } + } + + /** + * Remove any tracking of the specified allocation (a no-op if the allocation isn't being tracked) + */ + public void removeTracking(ShardRouting shardRouting) { + assert shardRouting.unassigned() == false : "Shouldn't remove tracking of unassigned shards"; + undesiredAllocations.remove(shardRouting.allocationId().getId()); + } + + /** + * Clear any {@link ShardRouting} that are no longer present in the routing nodes + */ + public void cleanup(RoutingNodes routingNodes) { + undesiredAllocations.entrySet().removeIf(e -> { + final var undesiredAllocation = e.getValue(); + final var allocationId = e.getKey(); + return routingNodes.getByAllocationId(undesiredAllocation.shardId(), allocationId) == null; + }); + shrinkIfOversized(); + } + + /** + * Clear all tracked allocations + */ + public void clear() { + undesiredAllocations.clear(); + } + + /** + * If there are shards that have been in undesired allocations for longer than the configured + * threshold, log a warning + */ + public void maybeLogUndesiredShardsWarning( + RoutingNodes routingNodes, + RoutingAllocation routingAllocation, + DesiredBalance desiredBalance + ) { + final long currentTimeMillis = timeProvider.relativeTimeInMillis(); + if (undesiredAllocations.isEmpty() == false) { + final long earliestUndesiredTimestamp = undesiredAllocations.firstEntry().getValue().undesiredSince(); + if (earliestUndesiredTimestamp < currentTimeMillis + && currentTimeMillis - earliestUndesiredTimestamp > undesiredAllocationDurationLoggingThreshold.millis()) { + undesiredAllocationDurationLogInterval.maybeExecute( + () -> logDecisionsForUndesiredShardsOverThreshold(routingNodes, routingAllocation, desiredBalance) + ); + } + } + } + + private void logDecisionsForUndesiredShardsOverThreshold( + RoutingNodes routingNodes, + RoutingAllocation routingAllocation, + DesiredBalance desiredBalance + ) { + final long currentTimeMillis = timeProvider.relativeTimeInMillis(); + final long loggingThresholdTimestamp = currentTimeMillis - undesiredAllocationDurationLoggingThreshold.millis(); + for (var allocation : undesiredAllocations.entrySet()) { + final var undesiredAllocation = allocation.getValue(); + final var allocationId = allocation.getKey(); + if (undesiredAllocation.undesiredSince() < loggingThresholdTimestamp) { + final var shardRouting = routingNodes.getByAllocationId(undesiredAllocation.shardId(), allocationId); + if (shardRouting != null) { + logUndesiredShardDetails( + shardRouting, + timeValueMillis(currentTimeMillis - undesiredAllocation.undesiredSince()), + routingNodes, + routingAllocation, + desiredBalance + ); + } else { + assert false : undesiredAllocation + " for allocationID " + allocationId + " was not cleaned up"; + } + } + } + } + + private void logUndesiredShardDetails( + ShardRouting shardRouting, + TimeValue undesiredDuration, + RoutingNodes routingNodes, + RoutingAllocation allocation, + DesiredBalance desiredBalance + ) { + final RoutingAllocation.DebugMode originalDebugMode = allocation.getDebugMode(); + allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS); + try { + final var assignment = desiredBalance.getAssignment(shardRouting.shardId()); + logger.warn("Shard {} has been in an undesired allocation for {}", shardRouting.shardId(), undesiredDuration); + for (final var nodeId : assignment.nodeIds()) { + final var decision = allocation.deciders().canAllocate(shardRouting, routingNodes.node(nodeId), allocation); + logger.warn("Shard {} allocation decision for node [{}]: {}", shardRouting.shardId(), nodeId, decision); + } + } finally { + allocation.setDebugMode(originalDebugMode); + } + } + + /** + * If the maximum to track was reduced, and we are tracking more than the new maximum, purge the most recent entries + * to bring us under the new limit + */ + private void shrinkIfOversized() { + if (undesiredAllocations.size() > maxUndesiredAllocationsToTrack) { + final var newestExcessAllocationIds = undesiredAllocations.entrySet() + .stream() + .sorted((a, b) -> Long.compare(b.getValue().undesiredSince(), a.getValue().undesiredSince())) + .limit(undesiredAllocations.size() - maxUndesiredAllocationsToTrack) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + undesiredAllocations.keySet().removeAll(newestExcessAllocationIds); + } + } + + // visible for testing + Map getUndesiredAllocations() { + return Map.copyOf(undesiredAllocations); + } + + /** + * Rather than storing the {@link ShardRouting}, we store a map of allocationId -> {@link UndesiredAllocation} + * this is because the allocation ID will persist as long as a shard stays on the same node, but the + * {@link ShardRouting} changes for a variety of reasons even when the shard doesn't move. + * + * @param shardId The shard ID + * @param undesiredSince The timestamp when the shard was first observed in an undesired allocation + */ + record UndesiredAllocation(ShardId shardId, long undesiredSince) {} +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9eac060abf90f..5ad7c2af94c81 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler; +import org.elasticsearch.cluster.routing.allocation.allocator.UndesiredAllocationsTracker; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; @@ -236,6 +237,9 @@ public void apply(Settings value, Settings current, Settings previous) { DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING, DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, + UndesiredAllocationsTracker.UNDESIRED_ALLOCATION_DURATION_LOG_THRESHOLD_SETTING, + UndesiredAllocationsTracker.UNDESIRED_ALLOCATION_DURATION_LOG_INTERVAL_SETTING, + UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index e0eb54df37fd6..fb404ecdde8be 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -57,6 +57,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.AdvancingTimeProvider; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.TimeValue; @@ -71,7 +72,6 @@ import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLog; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.BeforeClass; import java.util.Comparator; @@ -84,7 +84,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -107,8 +106,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.oneOf; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { @@ -1359,13 +1356,10 @@ public void testShouldLogOnTooManyUndesiredAllocations() { .routingTable(routingTableBuilder) .build(); - var threadPool = mock(ThreadPool.class); - final var timeInMillisSupplier = new AtomicLong(); - when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - - var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); + var timeProvider = new AdvancingTimeProvider(); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), timeProvider); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); - timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); + timeProvider.advanceByMillis(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); var expectedWarningMessage = "[100%] of assigned shards (" + shardCount @@ -1412,6 +1406,218 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ); } + public void testShouldLogOnPersistentUndesiredAllocations() { + + final int shardCount = randomIntBetween(5, 8); + + final var allShardsDesiredOnDataNode1 = Maps.newMapWithExpectedSize(shardCount); + final var allShardsDesiredOnDataNode2 = Maps.newMapWithExpectedSize(shardCount); + + final var metadataBuilder = Metadata.builder(); + final var routingTableBuilder = RoutingTable.builder(); + for (int i = 0; i < shardCount; i++) { + final var indexMetadata = IndexMetadata.builder("index-" + i).settings(indexSettings(IndexVersion.current(), 1, 0)).build(); + final var index = indexMetadata.getIndex(); + final var shardId = new ShardId(index, 0); + metadataBuilder.put(indexMetadata, false); + routingTableBuilder.add(IndexRoutingTable.builder(index).addShard(newShardRouting(shardId, "data-node-1", true, STARTED))); + + allShardsDesiredOnDataNode1.put(shardId, new ShardAssignment(Set.of("data-node-1"), 1, 0, 0)); + allShardsDesiredOnDataNode2.put(shardId, new ShardAssignment(Set.of("data-node-2"), 1, 0, 0)); + } + + final var shardToPreventMovement = "index-" + randomIntBetween(0, shardCount - 1); + + // Prevent allocation of a specific shard node 2 + final var preventAllocationOnNode2Decider = new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return node.nodeId().equals("data-node-2") && shardToPreventMovement.equals(shardRouting.index().getName()) + ? Decision.single(Decision.Type.NO, "no_decider", "Blocks allocation on node 2") + : Decision.YES; + } + }; + // Just to illustrate that yes decisions are excluded from the summary + final var yesDecider = new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return Decision.single(Decision.Type.YES, "yes_decider", "This should not be included in the summary"); + } + }; + + final var initialClusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(newNode("data-node-1")).add(newNode("data-node-2"))) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder) + .build(); + + final var undesiredAllocationThreshold = TimeValue.timeValueMinutes(randomIntBetween(10, 50)); + final var clusterSettings = createBuiltInClusterSettings( + Settings.builder() + .put(UndesiredAllocationsTracker.UNDESIRED_ALLOCATION_DURATION_LOG_THRESHOLD_SETTING.getKey(), undesiredAllocationThreshold) + .put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), 10) + .build() + ); + final var timeProvider = new AdvancingTimeProvider(); + final var reconciler = new DesiredBalanceReconciler(clusterSettings, timeProvider); + + final var currentStateHolder = new AtomicReference(); + + final var shardInUndesiredAllocationMessage = "Shard [" + shardToPreventMovement + "][0] has been in an undesired allocation for *"; + + // Desired balance not yet computed, should not log + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + initialClusterState, + DesiredBalance.BECOME_MASTER_INITIAL, + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.UnseenEventExpectation( + "Should not log if desired balance is not yet computed", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ) + ); + + // Desired assignment matches current routing table, should not log + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + currentStateHolder.get(), + new DesiredBalance(1, allShardsDesiredOnDataNode1), + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.UnseenEventExpectation( + "Should not log if all shards on desired location", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ) + ); + + // Shards are first identified as being in undesired allocations + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + currentStateHolder.get(), + new DesiredBalance(1, allShardsDesiredOnDataNode2), + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.UnseenEventExpectation( + "Should not log because we haven't passed the threshold yet", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ) + ); + + // Advance past the logging threshold + timeProvider.advanceByMillis(randomLongBetween(undesiredAllocationThreshold.millis(), undesiredAllocationThreshold.millis() * 2)); + + // If the desired balance is missing for some reason, we shouldn't log, and we shouldn't reset the became-undesired time + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + currentStateHolder.get(), + new DesiredBalance(1, Map.of()), + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.UnseenEventExpectation( + "Should not log because there is no desired allocations", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ) + ); + + // Now it should log + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + currentStateHolder.get(), + new DesiredBalance(1, allShardsDesiredOnDataNode2), + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.SeenEventExpectation( + "Should log because this is the first reconciliation after the threshold is exceeded", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ), + new MockLog.SeenEventExpectation( + "Should log the NO decisions", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + "[" + shardToPreventMovement + "][0] allocation decision for node [data-node-2]: [NO(Blocks allocation on node 2)]" + ) + ); + + // The rate limiter should prevent it logging again + assertThatLogger( + () -> currentStateHolder.set( + reconcileAndBuildNewState( + reconciler, + currentStateHolder.get(), + new DesiredBalance(1, allShardsDesiredOnDataNode2), + preventAllocationOnNode2Decider, + yesDecider + ) + ), + UndesiredAllocationsTracker.class, + new MockLog.UnseenEventExpectation( + "Should not log because the rate limiter should prevent it", + UndesiredAllocationsTracker.class.getCanonicalName(), + Level.WARN, + shardInUndesiredAllocationMessage + ) + ); + } + + /** + * Run reconciler, complete any shard movements, then return the resulting cluster state + */ + private ClusterState reconcileAndBuildNewState( + DesiredBalanceReconciler desiredBalanceReconciler, + ClusterState clusterState, + DesiredBalance balance, + AllocationDecider... allocationDeciders + ) { + final RoutingAllocation routingAllocation = createRoutingAllocationFrom(clusterState, allocationDeciders); + desiredBalanceReconciler.reconcile(balance, routingAllocation); + // start all initializing shards + routingAllocation.routingNodes().forEach(routingNode -> routingNode.forEach(shardRouting -> { + if (shardRouting.initializing()) { + routingAllocation.routingNodes().startShard(shardRouting, routingAllocation.changes(), 0L); + } + })); + return ClusterState.builder(clusterState) + .routingTable(clusterState.globalRoutingTable().rebuild(routingAllocation.routingNodes(), routingAllocation.metadata())) + .incrementVersion() + .build(); + } + private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { reconcile(routingAllocation, desiredBalance, ALLOCATION_STATS_PLACEHOLDER); } @@ -1421,10 +1627,11 @@ private static void reconcile( DesiredBalance desiredBalance, AtomicReference allocationStatsAtomicReference ) { - final var threadPool = mock(ThreadPool.class); - when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); allocationStatsAtomicReference.set( - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation) + new DesiredBalanceReconciler(createBuiltInClusterSettings(), new AdvancingTimeProvider()).reconcile( + desiredBalance, + routingAllocation + ) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTrackerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTrackerTests.java new file mode 100644 index 0000000000000..b40844436b741 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTrackerTests.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GlobalRoutingTable; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.AdvancingTimeProvider; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class UndesiredAllocationsTrackerTests extends ESTestCase { + + public void testShardsArePrunedWhenRemovedFromRoutingTable() { + final int primaryShards = randomIntBetween(2, 5); + final int numberOfIndices = randomIntBetween(2, 5); + final int numberOfNodes = randomIntBetween(2, 5); + + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), numberOfIndices * primaryShards) + .build() + ); + final var advancingTimeProvider = new AdvancingTimeProvider(); + final var undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, advancingTimeProvider); + + final var indexNames = IntStream.range(0, numberOfIndices).mapToObj(i -> "index-" + i).toArray(String[]::new); + final var state = ClusterStateCreationUtils.state(numberOfNodes, indexNames, primaryShards); + final var routingNodes = RoutingNodes.immutable(state.globalRoutingTable(), state.nodes()); + + // Mark all primary shards as undesired + routingNodes.forEach(routingNode -> { + routingNode.forEach(shardRouting -> { + if (shardRouting.primary()) { + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + } + }); + }); + assertEquals(numberOfIndices * primaryShards, undesiredAllocationsTracker.getUndesiredAllocations().size()); + + // Simulate an index being deleted + ClusterState stateWithIndexRemoved = removeRandomIndex(state); + + // Run cleanup with new RoutingNodes + undesiredAllocationsTracker.cleanup( + RoutingNodes.immutable(stateWithIndexRemoved.globalRoutingTable(), stateWithIndexRemoved.nodes()) + ); + assertEquals((numberOfIndices - 1) * primaryShards, undesiredAllocationsTracker.getUndesiredAllocations().size()); + assertTrue( + undesiredAllocationsTracker.getUndesiredAllocations() + .values() + .stream() + .allMatch( + allocation -> stateWithIndexRemoved.routingTable(ProjectId.DEFAULT).index(allocation.shardId().getIndex()) != null + ) + ); + } + + public void testNewestRecordsArePurgedWhenLimitIsDecreased() { + final var initialMaximum = randomIntBetween(10, 20); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), initialMaximum).build() + ); + final var advancingTimeProvider = new AdvancingTimeProvider(); + final var undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, advancingTimeProvider); + final var indexName = randomIdentifier(); + final var index = new Index(indexName, indexName); + final var indexRoutingTableBuilder = IndexRoutingTable.builder(index); + final var discoveryNodes = randomDiscoveryNodes(randomIntBetween(initialMaximum / 2, initialMaximum)); + + // The shards with the lowest IDs will have the earliest timestamps + for (int i = 0; i < initialMaximum; i++) { + final var routing = createAssignedRouting(index, i, discoveryNodes); + indexRoutingTableBuilder.addShard(routing); + undesiredAllocationsTracker.trackUndesiredAllocation(routing); + } + final var routingNodes = RoutingNodes.immutable( + GlobalRoutingTable.builder().put(ProjectId.DEFAULT, RoutingTable.builder().add(indexRoutingTableBuilder).build()).build(), + discoveryNodes + ); + + // Reduce the maximum + final var reducedMaximum = randomIntBetween(1, initialMaximum); + clusterSettings.applySettings( + Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), reducedMaximum).build() + ); + + // We shouldn't purge the entries from the setting updater thread + assertEquals(initialMaximum, undesiredAllocationsTracker.getUndesiredAllocations().size()); + + // We should purge the most recent entries in #cleanup + undesiredAllocationsTracker.cleanup(routingNodes); + assertEquals(reducedMaximum, undesiredAllocationsTracker.getUndesiredAllocations().size()); + final var remainingShardIds = undesiredAllocationsTracker.getUndesiredAllocations() + .values() + .stream() + .map(allocation -> allocation.shardId().id()) + .collect(Collectors.toSet()); + assertEquals(IntStream.range(0, reducedMaximum).boxed().collect(Collectors.toSet()), remainingShardIds); + } + + public void testCannotTrackMoreShardsThanTheLimit() { + final int maxToTrack = randomIntBetween(1, 10); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), maxToTrack).build() + ); + final var advancingTimeProvider = new AdvancingTimeProvider(); + final var undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, advancingTimeProvider); + final var index = new Index(randomIdentifier(), randomIdentifier()); + + final int shardsToAdd = randomIntBetween(maxToTrack + 1, maxToTrack * 2); + for (int i = 0; i < shardsToAdd; i++) { + final var routing = createAssignedRouting(index, i); + undesiredAllocationsTracker.trackUndesiredAllocation(routing); + } + + // Only the first {maxToTrack} shards should be tracked + assertEquals(maxToTrack, undesiredAllocationsTracker.getUndesiredAllocations().size()); + final var trackedShardIds = undesiredAllocationsTracker.getUndesiredAllocations() + .values() + .stream() + .map(allocation -> allocation.shardId().id()) + .collect(Collectors.toSet()); + assertEquals(IntStream.range(0, maxToTrack).boxed().collect(Collectors.toSet()), trackedShardIds); + } + + public void testUndesiredAllocationsAreIdentifiableDespiteMetadataChanges() { + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings( + Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), randomIntBetween(1, 10)).build() + ); + final var advancingTimeProvider = new AdvancingTimeProvider(); + final var undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, advancingTimeProvider); + final var index = new Index(randomIdentifier(), randomIdentifier()); + + ShardRouting shardRouting = createAssignedRouting(index, 0); + + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + assertEquals(1, undesiredAllocationsTracker.getUndesiredAllocations().size()); + + // move to started + shardRouting = shardRouting.moveToStarted(randomNonNegativeLong()); + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + assertEquals(1, undesiredAllocationsTracker.getUndesiredAllocations().size()); + + // start a relocation + shardRouting = shardRouting.relocate(randomIdentifier(), randomNonNegativeLong()); + undesiredAllocationsTracker.trackUndesiredAllocation(shardRouting); + assertEquals(1, undesiredAllocationsTracker.getUndesiredAllocations().size()); + + // cancel that relocation + shardRouting = shardRouting.cancelRelocation(); + undesiredAllocationsTracker.removeTracking(shardRouting); + assertEquals(0, undesiredAllocationsTracker.getUndesiredAllocations().size()); + } + + private ClusterState removeRandomIndex(ClusterState state) { + RoutingTable originalRoutingTable = state.routingTable(ProjectId.DEFAULT); + RoutingTable updatedRoutingTable = RoutingTable.builder(originalRoutingTable) + .remove(randomFrom(originalRoutingTable.indicesRouting().keySet())) + .build(); + return ClusterState.builder(state) + .routingTable(GlobalRoutingTable.builder().put(ProjectId.DEFAULT, updatedRoutingTable).build()) + .build(); + } + + private ShardRouting createAssignedRouting(Index index, int shardId) { + return createAssignedRouting(index, shardId, null); + } + + private ShardRouting createAssignedRouting(Index index, int shardId, @Nullable DiscoveryNodes discoveryNodes) { + final var nodeId = discoveryNodes == null ? randomAlphaOfLength(10) : randomFrom(discoveryNodes.getNodes().keySet()); + return ShardRouting.newUnassigned( + new ShardId(index, shardId), + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, randomIdentifier()), + randomFrom(ShardRouting.Role.DEFAULT, ShardRouting.Role.INDEX_ONLY) + ).initialize(nodeId, null, randomNonNegativeLong()); + } + + private DiscoveryNodes randomDiscoveryNodes(int numberOfNodes) { + final var nodes = DiscoveryNodes.builder(); + for (int i = 0; i < numberOfNodes; i++) { + nodes.add(DiscoveryNodeUtils.create("node-" + i)); + } + return nodes.build(); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/common/time/AdvancingTimeProvider.java b/test/framework/src/main/java/org/elasticsearch/common/time/AdvancingTimeProvider.java new file mode 100644 index 0000000000000..d96a125cf741c --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/common/time/AdvancingTimeProvider.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.time; + +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * A time-provider that advances each time it's asked the time + */ +public class AdvancingTimeProvider implements TimeProvider { + + private final AtomicLong currentTimeMillis = new AtomicLong(System.currentTimeMillis()); + + public void advanceByMillis(long milliseconds) { + currentTimeMillis.addAndGet(milliseconds); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeMillis.incrementAndGet(); + } + + @Override + public long relativeTimeInNanos() { + return NANOSECONDS.toNanos(relativeTimeInMillis()); + } + + @Override + public long rawRelativeTimeInMillis() { + return relativeTimeInMillis(); + } + + @Override + public long absoluteTimeInMillis() { + throw new UnsupportedOperationException("not supported"); + } +}