diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index fddf9267cdbb1..2976d98d52e07 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -32,8 +32,6 @@ public record AllocationStats(long unassignedShards, long totalAllocations, long public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {} - public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP); - public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current"; public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current"; public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current"; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 909a7a7a99a61..d5decb1ea4258 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,10 +20,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -36,9 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -82,16 +77,8 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); - private final DesiredBalanceMetrics desiredBalanceMetrics; - private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; - - public DesiredBalanceReconciler( - ClusterSettings clusterSettings, - ThreadPool threadPool, - DesiredBalanceMetrics desiredBalanceMetrics, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator - ) { - this.desiredBalanceMetrics = desiredBalanceMetrics; + + public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { this.undesiredAllocationLogInterval = new FrequencyCappedAction( threadPool.relativeTimeInMillisSupplier(), TimeValue.timeValueMinutes(5) @@ -101,7 +88,6 @@ public DesiredBalanceReconciler( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, value -> this.undesiredAllocationsLogThreshold = value ); - this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; } /** @@ -110,12 +96,13 @@ public DesiredBalanceReconciler( * @param desiredBalance The new desired cluster shard allocation * @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to * reach the given desired balance. + * @return {@link DesiredBalanceMetrics.AllocationStats} for this round of reconciliation changes. */ - public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + public DesiredBalanceMetrics.AllocationStats reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { var nodeIds = allocation.routingNodes().getAllNodeIds(); allocationOrdering.retainNodes(nodeIds); moveOrdering.retainNodes(nodeIds); - new Reconciliation(desiredBalance, allocation).run(); + return new Reconciliation(desiredBalance, allocation).run(); } public void clear() { @@ -135,7 +122,7 @@ private class Reconciliation { this.routingNodes = allocation.routingNodes(); } - void run() { + DesiredBalanceMetrics.AllocationStats run() { try (var ignored = allocation.withReconcilingFlag()) { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); @@ -144,13 +131,13 @@ void run() { // no data nodes, so fail allocation to report red health failAllocationOfNewPrimaries(allocation); logger.trace("no nodes available, nothing to reconcile"); - return; + return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; } if (desiredBalance.assignments().isEmpty()) { // no desired state yet but it is on its way and we'll reroute again when it is ready logger.trace("desired balance is empty, nothing to reconcile"); - return; + return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; } // compute next moves towards current desired balance: @@ -165,31 +152,11 @@ void run() { moveShards(); // 3. move any other shards that are desired elsewhere logger.trace("Reconciler#balance"); - var allocationStats = balance(); + DesiredBalanceMetrics.AllocationStats allocationStats = balance(); logger.debug("Reconciliation is complete"); - - updateDesireBalanceMetrics(allocationStats); - } - } - - private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( - allocation.metadata(), - allocation.routingNodes(), - allocation.clusterInfo(), - desiredBalance - ); - Map filteredNodeAllocationStatsAndWeights = new HashMap<>( - nodesStatsAndWeights.size() - ); - for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { - var node = allocation.nodes().get(nodeStatsAndWeight.getKey()); - if (node != null) { - filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); - } + return allocationStats; } - desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); } private boolean allocateUnassignedInvariant() { @@ -512,7 +479,7 @@ private void moveShards() { } } - private AllocationStats balance() { + private DesiredBalanceMetrics.AllocationStats balance() { if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; } @@ -581,8 +548,11 @@ private AllocationStats balance() { } maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); - - return new AllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); + return new DesiredBalanceMetrics.AllocationStats( + unassignedShards, + totalAllocations, + undesiredAllocationsExcludingShuttingDownNodes + ); } private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index d9fba492fb9d0..cb3f3b306d806 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy; @@ -39,6 +40,7 @@ import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -87,6 +89,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final AtomicReference currentDesiredBalanceRef = new AtomicReference<>(DesiredBalance.NOT_MASTER); private volatile boolean resetCurrentDesiredBalance = false; private final Set processedNodeShutdowns = new HashSet<>(); + private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; private final DesiredBalanceMetrics desiredBalanceMetrics; /** * Manages balancer round results in order to report on the balancer activity in a configurable manner. @@ -136,17 +139,13 @@ public DesiredBalanceShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); + this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(threadPool, clusterService.getClusterSettings()); this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; this.reconciler = reconciler; this.desiredBalanceComputer = desiredBalanceComputer; - this.desiredBalanceReconciler = new DesiredBalanceReconciler( - clusterService.getClusterSettings(), - threadPool, - desiredBalanceMetrics, - nodeAllocationStatsAndWeightsCalculator - ); + this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool); this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) { @Override @@ -347,6 +346,10 @@ private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldD return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance)); } + /** + * Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new + * cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards. + */ protected void submitReconcileTask(DesiredBalance desiredBalance) { masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null); } @@ -357,7 +360,11 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca } else { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); } - recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation)); + recordTime(cumulativeReconciliationTime, () -> { + DesiredBalanceMetrics.AllocationStats allocationStats = desiredBalanceReconciler.reconcile(desiredBalance, allocation); + updateDesireBalanceMetrics(desiredBalance, allocation, allocationStats); + }); + if (logger.isTraceEnabled()) { logger.trace("Reconciled desired balance: {}", desiredBalance); } else { @@ -391,6 +398,28 @@ public void resetDesiredBalance() { resetCurrentDesiredBalance = true; } + private void updateDesireBalanceMetrics( + DesiredBalance desiredBalance, + RoutingAllocation routingAllocation, + DesiredBalanceMetrics.AllocationStats allocationStats + ) { + var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + routingAllocation.metadata(), + routingAllocation.routingNodes(), + routingAllocation.clusterInfo(), + desiredBalance + ); + Map filteredNodeAllocationStatsAndWeights = + new HashMap<>(nodesStatsAndWeights.size()); + for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { + var node = routingAllocation.nodes().get(nodeStatsAndWeight.getKey()); + if (node != null) { + filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); + } + } + desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); + } + public DesiredBalanceStats getStats() { return new DesiredBalanceStats( Math.max(currentDesiredBalanceRef.get().lastConvergedIndex(), 0L), diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index cd94c87bb4b57..81aa1a60eb45e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -1212,12 +1212,7 @@ public void testRebalanceDoesNotCauseHotSpots() { new ConcurrentRebalanceAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings) }; - var reconciler = new DesiredBalanceReconciler( - clusterSettings, - new DeterministicTaskQueue().getThreadPool(), - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool()); var totalOutgoingMoves = new HashMap(); for (int i = 0; i < numberOfNodes; i++) { @@ -1299,12 +1294,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { final var timeInMillisSupplier = new AtomicLong(); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - var reconciler = new DesiredBalanceReconciler( - createBuiltInClusterSettings(), - threadPool, - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); @@ -1356,8 +1346,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, DesiredBalanceMetrics.NOOP, EMPTY_NODE_ALLOCATION_STATS) - .reconcile(desiredBalance, routingAllocation); + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation); } private static boolean isReconciled(RoutingNode node, DesiredBalance balance) {